""" 软件授权管理模块 提供硬件指纹生成、授权文件验证、数字签名校验等功能 """ import json import hashlib import platform import subprocess import os import logging from datetime import datetime, timezone from typing import Dict, Any, Optional, Tuple from dataclasses import dataclass from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding from cryptography.exceptions import InvalidSignature import base64 import shutil logger = logging.getLogger(__name__) # 授权签名与验证参数(跨语言一致性): # - 算法:RSA-PSS + SHA-256 # - MGF:MGF1(SHA-256) # - 盐长度:固定 32 字节 # - 数据序列化(签名输入): # * 去除 signature 字段 # * 按键名排序(包括嵌套对象) # * 紧凑 JSON:等价于 json.dumps(obj, sort_keys=True, separators=(",", ":")) PSS_SALT_LENGTH_BYTES = 32 @dataclass class LicenseStatus: """授权状态数据类""" valid: bool = False message: str = "" license_type: str = "" expires_at: Optional[datetime] = None features: Dict[str, Any] = None license_id: str = "" def __post_init__(self): if self.features is None: self.features = {} class LicenseManager: """授权管理器""" def __init__(self, config_manager=None): self.config_manager = config_manager self._machine_id = None self._machine_id_candidates = None self._license_cache = None self._cache_timestamp = None def _get_backend_base_dir(self) -> str: return os.path.dirname(os.path.dirname(os.path.dirname(__file__))) def _get_persistent_license_dir(self) -> str: system = platform.system() if system == "Windows": base = os.environ.get("PROGRAMDATA") or os.environ.get("ALLUSERSPROFILE") or "" if not base: base = os.path.expanduser("~\\AppData\\Local") else: base = os.path.expanduser("~/.config") return os.path.join(base, "BodyCheck", "license") def _get_configured_license_paths(self) -> Tuple[str, str, int]: license_path = "data/license.json" public_key_path = "backend/license_pub.pem" grace_days = 3 if self.config_manager: license_path = self.config_manager.get_config_value("LICENSE", "path", license_path) public_key_path = self.config_manager.get_config_value("LICENSE", "public_key", public_key_path) grace_days = int(self.config_manager.get_config_value("LICENSE", "grace_days", str(grace_days))) if not os.path.isabs(license_path): license_path = os.path.join(self._get_backend_base_dir(), license_path) if not os.path.isabs(public_key_path): public_key_path = os.path.join(self._get_backend_base_dir(), public_key_path) return license_path, public_key_path, grace_days def _get_persistent_license_paths(self) -> Tuple[str, str]: pdir = self._get_persistent_license_dir() return os.path.join(pdir, "license.json"), os.path.join(pdir, "license_public_key.pem") def _resolve_license_paths(self) -> Tuple[str, str, int]: cfg_license_path, cfg_public_key_path, grace_days = self._get_configured_license_paths() p_license_path, p_public_key_path = self._get_persistent_license_paths() effective_license_path = cfg_license_path if not os.path.exists(effective_license_path) and os.path.exists(p_license_path): effective_license_path = p_license_path effective_public_key_path = cfg_public_key_path if not os.path.exists(effective_public_key_path) and os.path.exists(p_public_key_path): effective_public_key_path = p_public_key_path return effective_license_path, effective_public_key_path, grace_days def _mirror_license_assets(self, license_path: Optional[str] = None, public_key_path: Optional[str] = None) -> None: p_license_path, p_public_key_path = self._get_persistent_license_paths() pdir = os.path.dirname(p_license_path) try: os.makedirs(pdir, exist_ok=True) except Exception: return if license_path and os.path.exists(license_path): try: shutil.copyfile(license_path, p_license_path) except Exception: pass if public_key_path and os.path.exists(public_key_path): try: shutil.copyfile(public_key_path, p_public_key_path) except Exception: pass def install_license_file(self, source_license_path: str) -> Tuple[bool, str]: try: if not os.path.exists(source_license_path): return False, "源授权文件不存在" cfg_license_path, cfg_public_key_path, _ = self._get_configured_license_paths() os.makedirs(os.path.dirname(cfg_license_path), exist_ok=True) shutil.copyfile(source_license_path, cfg_license_path) self._mirror_license_assets(license_path=cfg_license_path, public_key_path=cfg_public_key_path) return True, cfg_license_path except Exception as e: return False, str(e) def mirror_license_dir(self, source_dir: str) -> None: if not source_dir or not os.path.isdir(source_dir): return license_candidate = os.path.join(source_dir, "license.json") pub_candidates = [ os.path.join(source_dir, "license_public_key.pem"), os.path.join(source_dir, "license_pub.pem"), os.path.join(source_dir, "public_key.pem"), ] pub_path = "" for c in pub_candidates: if os.path.exists(c): pub_path = c break self._mirror_license_assets(license_path=license_candidate if os.path.exists(license_candidate) else None, public_key_path=pub_path or None) def _run_powershell(self, command: str, timeout: int = 10) -> str: try: result = subprocess.run( [ "powershell", "-NoProfile", "-NonInteractive", "-ExecutionPolicy", "Bypass", "-Command", command, ], capture_output=True, text=True, timeout=timeout, ) return (result.stdout or "").strip() except Exception: return "" def _get_windows_cpu_id(self) -> str: out = self._run_powershell( "(Get-CimInstance -ClassName Win32_Processor | Select-Object -First 1 -ExpandProperty ProcessorId)" ) return out.strip() def _get_windows_baseboard_serial(self) -> str: out = self._run_powershell( "(Get-CimInstance -ClassName Win32_BaseBoard | Select-Object -First 1 -ExpandProperty SerialNumber)" ) serial = out.strip() if serial and serial != "To be filled by O.E.M.": return serial return "" def _get_windows_disk_serials(self) -> list: raw = self._run_powershell( "$d=Get-CimInstance -ClassName Win32_DiskDrive | Select-Object SerialNumber,InterfaceType,PNPDeviceID,MediaType; $d | ConvertTo-Json -Compress" ) if not raw: return [] try: data = json.loads(raw) except Exception: return [] disks = data if isinstance(data, list) else ([data] if isinstance(data, dict) else []) serials = [] for d in disks: serial = str((d.get("SerialNumber") or "")).strip() iface = str((d.get("InterfaceType") or "")).strip().upper() pnp = str((d.get("PNPDeviceID") or "")).strip().upper() media = str((d.get("MediaType") or "")).strip().upper() if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media: serials.append(serial) serials = sorted(set(serials)) return serials def _get_windows_identifiers(self) -> Dict[str, Any]: cpu_id = self._get_windows_cpu_id() board_serial = self._get_windows_baseboard_serial() disk_serials = self._get_windows_disk_serials() if not cpu_id: try: result = subprocess.run( ["wmic", "cpu", "get", "ProcessorId", "/value"], capture_output=True, text=True, timeout=10, ) for line in (result.stdout or "").split("\n"): if "ProcessorId=" in line: cpu_id = line.split("=", 1)[1].strip() if cpu_id: break except Exception: pass if not board_serial: try: result = subprocess.run( ["wmic", "baseboard", "get", "SerialNumber", "/value"], capture_output=True, text=True, timeout=10, ) for line in (result.stdout or "").split("\n"): if "SerialNumber=" in line: board_serial = line.split("=", 1)[1].strip() if board_serial and board_serial != "To be filled by O.E.M.": break board_serial = "" except Exception: pass if not disk_serials: try: result = subprocess.run( [ "wmic", "path", "Win32_DiskDrive", "get", "SerialNumber,InterfaceType,PNPDeviceID,MediaType", "/value", ], capture_output=True, text=True, timeout=10, ) block = {} serials = [] for line in (result.stdout or "").split("\n"): line = line.strip() if not line: serial = (block.get("SerialNumber") or "").strip() iface = (block.get("InterfaceType") or "").strip().upper() pnp = (block.get("PNPDeviceID") or "").strip().upper() media = (block.get("MediaType") or "").strip().upper() if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media: serials.append(serial) block = {} continue if "=" in line: k, v = line.split("=", 1) block[k] = v if block: serial = (block.get("SerialNumber") or "").strip() iface = (block.get("InterfaceType") or "").strip().upper() pnp = (block.get("PNPDeviceID") or "").strip().upper() media = (block.get("MediaType") or "").strip().upper() if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media: serials.append(serial) disk_serials = sorted(set(serials)) except Exception: pass mac = "" try: import uuid mac = ":".join( ["{:02x}".format((uuid.getnode() >> elements) & 0xFF) for elements in range(0, 2 * 6, 2)][::-1] ) except Exception: mac = "" return { "cpu_id": cpu_id.strip() if cpu_id else "", "board_serial": board_serial.strip() if board_serial else "", "disk_serials": disk_serials or [], "mac": mac, "node": platform.node(), "processor": platform.processor(), } def _hash_core_info(self, core_info: list) -> str: combined_info = "|".join(sorted(core_info)) return hashlib.sha256(combined_info.encode("utf-8")).hexdigest()[:16].upper() def _build_machine_id_candidates(self) -> list: system = platform.system() info: Dict[str, Any] = {} if system == "Windows": info = self._get_windows_identifiers() else: info = { "cpu_id": "", "board_serial": "", "disk_serials": [], "mac": "", "node": platform.node(), "processor": platform.processor(), } cpu_id = info.get("cpu_id") or "" board_serial = info.get("board_serial") or "" disk_serials = info.get("disk_serials") or [] mac = info.get("mac") or "" variants = [] full_core = [] if cpu_id: full_core.append(f"CPU:{cpu_id}") if board_serial: full_core.append(f"BOARD:{board_serial}") for s in disk_serials: full_core.append(f"DISK:{s}") if full_core: variants.append(full_core) no_disk = [] if cpu_id: no_disk.append(f"CPU:{cpu_id}") if board_serial: no_disk.append(f"BOARD:{board_serial}") if no_disk: variants.append(no_disk) if cpu_id: variants.append([f"CPU:{cpu_id}"]) if board_serial: variants.append([f"BOARD:{board_serial}"]) if mac: variants.append([f"MAC:{mac}"]) if cpu_id and mac: variants.append([f"CPU:{cpu_id}", f"MAC:{mac}"]) fallback_core = [] node = (info.get("node") or "").strip() proc = (info.get("processor") or "").strip() if node: fallback_core.append(f"NODE:{node}") if proc: fallback_core.append(f"PROCESSOR:{proc}") if fallback_core: variants.append(fallback_core) prefix = "W10-" if system == "Windows" else "FB-" candidates = [] for core in variants: try: mid = f"{prefix}{self._hash_core_info(core)}" except Exception: continue if mid not in candidates: candidates.append(mid) return candidates def get_machine_id(self) -> str: """生成机器硬件指纹""" if self._machine_id: return self._machine_id try: candidates = self._build_machine_id_candidates() if not candidates: raise RuntimeError("无法生成机器指纹") self._machine_id_candidates = candidates self._machine_id = candidates[0] logger.info(f"生成机器指纹: {self._machine_id}") return self._machine_id except Exception as e: logger.error(f"生成机器指纹失败: {e}") fallback_info = f"{platform.system()}-{platform.node()}-{platform.machine()}" fallback_id = hashlib.md5(fallback_info.encode('utf-8')).hexdigest()[:12].upper() self._machine_id = f"FB-{fallback_id}" self._machine_id_candidates = [self._machine_id] return self._machine_id def get_machine_id_candidates(self) -> list: if self._machine_id_candidates is None: self.get_machine_id() return list(self._machine_id_candidates or []) def load_license(self, license_path: str) -> Optional[Dict[str, Any]]: """加载授权文件""" try: if not os.path.exists(license_path): logger.warning(f"授权文件不存在: {license_path}") return None with open(license_path, 'r', encoding='utf-8') as f: license_data = json.load(f) # 基本字段验证 required_fields = ['product', 'license_id', 'license_type', 'machine_id', 'issued_at', 'expires_at', 'signature'] for field in required_fields: if field not in license_data: logger.error(f"授权文件缺少必要字段: {field}") return None return license_data except json.JSONDecodeError as e: logger.error(f"授权文件格式错误: {e}") return None except Exception as e: logger.error(f"加载授权文件失败: {e}") return None def verify_signature(self, license_data: Dict[str, Any], public_key_path: str) -> bool: """验证授权文件数字签名 规则:RSA-PSS + SHA256,MGF1(SHA256),盐长度固定为 32 字节; 待签名数据为去除 signature 后的紧凑、排序键 JSON。 """ try: if not os.path.exists(public_key_path): logger.error(f"公钥文件不存在: {public_key_path}") return False # 加载公钥 with open(public_key_path, 'rb') as f: public_key = serialization.load_pem_public_key(f.read()) # 提取签名 signature_b64 = license_data.get('signature', '') if not signature_b64: logger.error("授权文件缺少签名") return False try: signature = base64.b64decode(signature_b64) except Exception as e: logger.error(f"签名格式错误: {e}") return False # 构建待验证的数据(排除signature字段) license_copy = license_data.copy() license_copy.pop('signature', None) # 按键排序确保一致性 sorted_data = json.dumps(license_copy, sort_keys=True, separators=(',', ':')) message = sorted_data.encode('utf-8') # 验证签名(固定 PSS 盐长度为 32 字节,确保跨语言一致) try: public_key.verify( signature, message, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=PSS_SALT_LENGTH_BYTES ), hashes.SHA256() ) return True except InvalidSignature: logger.error("授权文件签名验证失败") return False except Exception as e: logger.error(f"签名验证过程出错: {e}") return False def check_validity(self, license_data: Dict[str, Any], machine_id: str, grace_days: int = 0) -> Tuple[bool, str]: """检查授权有效性""" try: # 检查产品匹配 if license_data.get('product') != 'BodyBalanceEvaluation': return False, "授权文件产品不匹配" # 检查机器绑定 license_machine_id = license_data.get('machine_id', '') candidates = [] if machine_id: candidates.append(machine_id) for mid in self.get_machine_id_candidates(): if mid not in candidates: candidates.append(mid) if license_machine_id not in candidates: return False, f"授权文件与当前机器不匹配 (当前: {machine_id}, 授权: {license_machine_id})" # 检查有效期 now = datetime.now(timezone.utc) try: expires_at_str = license_data.get('expires_at', '') expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) except Exception as e: return False, f"授权过期时间格式错误: {e}" # 应用宽限期 if grace_days > 0: from datetime import timedelta expires_at += timedelta(days=grace_days) if now > expires_at: return False, f"授权已过期 (过期时间: {expires_at.strftime('%Y-%m-%d %H:%M:%S')})" return True, "授权有效" except Exception as e: return False, f"授权验证过程出错: {e}" def get_license_status(self, force_reload: bool = False) -> LicenseStatus: """获取当前授权状态""" try: # 检查缓存(避免频繁文件访问) if not force_reload and self._license_cache and self._cache_timestamp: cache_age = datetime.now().timestamp() - self._cache_timestamp if cache_age < 300: # 5分钟缓存 return self._license_cache # 获取配置 if not self.config_manager: return LicenseStatus(valid=False, message="配置管理器未初始化") license_path, public_key_path, grace_days = self._resolve_license_paths() # 获取机器指纹 machine_id = self.get_machine_id() # 加载授权文件 license_data = self.load_license(license_path) if not license_data: status = LicenseStatus( valid=False, message="授权文件不存在或格式错误,当前为试用模式", license_type="trial", features={"recording": True, "export": False, "trial_limit_minutes": 30} ) self._license_cache = status self._cache_timestamp = datetime.now().timestamp() return status # 验证签名 if not self.verify_signature(license_data, public_key_path): status = LicenseStatus( valid=False, message="授权文件签名验证失败", license_type="invalid" ) self._license_cache = status self._cache_timestamp = datetime.now().timestamp() return status self._mirror_license_assets(license_path=license_path, public_key_path=public_key_path) # 检查有效性 is_valid, message = self.check_validity(license_data, machine_id, grace_days) if is_valid: # 解析过期时间 expires_at = None try: expires_at_str = license_data.get('expires_at', '') expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) except: pass status = LicenseStatus( valid=True, message=message, license_type=license_data.get('license_type', 'unknown'), expires_at=expires_at, features=license_data.get('features', {}), license_id=license_data.get('license_id', '') ) else: status = LicenseStatus( valid=False, message=message, license_type=license_data.get('license_type', 'unknown'), license_id=license_data.get('license_id', '') ) # 缓存结果 self._license_cache = status self._cache_timestamp = datetime.now().timestamp() return status except Exception as e: logger.error(f"获取授权状态失败: {e}") status = LicenseStatus( valid=False, message=f"授权检查出错: {str(e)}", license_type="error" ) return status def verify_license_file(self, license_path: str) -> Tuple[bool, str]: """验证指定授权文件的签名与有效性 Returns: (bool, str): 验证结果与消息 """ try: if not self.config_manager: return False, "配置管理器未初始化" _, public_key_path, grace_days = self._resolve_license_paths() if not os.path.exists(license_path): return False, f"授权文件不存在: {license_path}" # 加载授权文件 data = self.load_license(license_path) if not data: return False, "授权文件格式错误或缺少必要字段" # 验证签名 if not self.verify_signature(data, public_key_path): return False, "授权文件签名验证失败" # 检查有效性(机器绑定 + 过期判断) machine_id = self.get_machine_id() is_valid, message = self.check_validity(data, machine_id, grace_days) return is_valid, message except Exception as e: logger.error(f"验证授权文件失败: {e}") return False, f"验证过程出错: {str(e)}" def generate_activation_request(self, output_path: str = None, company_name: str = None, contact_info: str = None) -> str: """生成激活请求文件(离线激活用)""" try: machine_id = self.get_machine_id() request_data = { "product": "BodyBalanceEvaluation", "version": "1.5.0", "machine_id": machine_id, "platform": platform.system(), "request_time": datetime.now(timezone.utc).isoformat(), "hardware_info": { "system": platform.system(), "machine": platform.machine(), "processor": platform.processor(), "node": platform.node() } } if company_name: request_data["company_name"] = company_name if contact_info: request_data["contact_info"] = contact_info if not output_path: output_path = f"activation_request_{machine_id}.json" with open(output_path, 'w', encoding='utf-8') as f: json.dump(request_data, f, indent=2, ensure_ascii=False) logger.info(f"激活请求文件已生成: {output_path}") return output_path except Exception as e: logger.error(f"生成激活请求失败: {e}") raise