BodyBalanceEvaluation/backend/devices/utils/license_manager.py

687 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
软件授权管理模块
提供硬件指纹生成、授权文件验证、数字签名校验等功能
"""
import json
import hashlib
import platform
import subprocess
import os
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidSignature
import base64
import shutil
logger = logging.getLogger(__name__)
# 授权签名与验证参数(跨语言一致性):
# - 算法RSA-PSS + SHA-256
# - MGFMGF1(SHA-256)
# - 盐长度:固定 32 字节
# - 数据序列化(签名输入):
# * 去除 signature 字段
# * 按键名排序(包括嵌套对象)
# * 紧凑 JSON等价于 json.dumps(obj, sort_keys=True, separators=(",", ":"))
PSS_SALT_LENGTH_BYTES = 32
@dataclass
class LicenseStatus:
"""授权状态数据类"""
valid: bool = False
message: str = ""
license_type: str = ""
expires_at: Optional[datetime] = None
features: Dict[str, Any] = None
license_id: str = ""
def __post_init__(self):
if self.features is None:
self.features = {}
class LicenseManager:
"""授权管理器"""
def __init__(self, config_manager=None):
self.config_manager = config_manager
self._machine_id = None
self._machine_id_candidates = None
self._license_cache = None
self._cache_timestamp = None
def _get_backend_base_dir(self) -> str:
return os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
def _get_persistent_license_dir(self) -> str:
system = platform.system()
if system == "Windows":
base = os.environ.get("PROGRAMDATA") or os.environ.get("ALLUSERSPROFILE") or ""
if not base:
base = os.path.expanduser("~\\AppData\\Local")
else:
base = os.path.expanduser("~/.config")
return os.path.join(base, "BodyCheck", "license")
def _get_configured_license_paths(self) -> Tuple[str, str, int]:
license_path = "data/license.json"
public_key_path = "backend/license_pub.pem"
grace_days = 3
if self.config_manager:
license_path = self.config_manager.get_config_value("LICENSE", "path", license_path)
public_key_path = self.config_manager.get_config_value("LICENSE", "public_key", public_key_path)
grace_days = int(self.config_manager.get_config_value("LICENSE", "grace_days", str(grace_days)))
if not os.path.isabs(license_path):
license_path = os.path.join(self._get_backend_base_dir(), license_path)
if not os.path.isabs(public_key_path):
public_key_path = os.path.join(self._get_backend_base_dir(), public_key_path)
return license_path, public_key_path, grace_days
def _get_persistent_license_paths(self) -> Tuple[str, str]:
pdir = self._get_persistent_license_dir()
return os.path.join(pdir, "license.json"), os.path.join(pdir, "license_public_key.pem")
def _resolve_license_paths(self) -> Tuple[str, str, int]:
cfg_license_path, cfg_public_key_path, grace_days = self._get_configured_license_paths()
p_license_path, p_public_key_path = self._get_persistent_license_paths()
effective_license_path = cfg_license_path
if not os.path.exists(effective_license_path) and os.path.exists(p_license_path):
effective_license_path = p_license_path
effective_public_key_path = cfg_public_key_path
if not os.path.exists(effective_public_key_path) and os.path.exists(p_public_key_path):
effective_public_key_path = p_public_key_path
return effective_license_path, effective_public_key_path, grace_days
def _mirror_license_assets(self, license_path: Optional[str] = None, public_key_path: Optional[str] = None) -> None:
p_license_path, p_public_key_path = self._get_persistent_license_paths()
pdir = os.path.dirname(p_license_path)
try:
os.makedirs(pdir, exist_ok=True)
except Exception:
return
if license_path and os.path.exists(license_path):
try:
shutil.copyfile(license_path, p_license_path)
except Exception:
pass
if public_key_path and os.path.exists(public_key_path):
try:
shutil.copyfile(public_key_path, p_public_key_path)
except Exception:
pass
def install_license_file(self, source_license_path: str) -> Tuple[bool, str]:
try:
if not os.path.exists(source_license_path):
return False, "源授权文件不存在"
cfg_license_path, cfg_public_key_path, _ = self._get_configured_license_paths()
os.makedirs(os.path.dirname(cfg_license_path), exist_ok=True)
shutil.copyfile(source_license_path, cfg_license_path)
self._mirror_license_assets(license_path=cfg_license_path, public_key_path=cfg_public_key_path)
return True, cfg_license_path
except Exception as e:
return False, str(e)
def mirror_license_dir(self, source_dir: str) -> None:
if not source_dir or not os.path.isdir(source_dir):
return
license_candidate = os.path.join(source_dir, "license.json")
pub_candidates = [
os.path.join(source_dir, "license_public_key.pem"),
os.path.join(source_dir, "license_pub.pem"),
os.path.join(source_dir, "public_key.pem"),
]
pub_path = ""
for c in pub_candidates:
if os.path.exists(c):
pub_path = c
break
self._mirror_license_assets(license_path=license_candidate if os.path.exists(license_candidate) else None, public_key_path=pub_path or None)
def _run_powershell(self, command: str, timeout: int = 10) -> str:
try:
result = subprocess.run(
[
"powershell",
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-Command",
command,
],
capture_output=True,
text=True,
timeout=timeout,
)
return (result.stdout or "").strip()
except Exception:
return ""
def _get_windows_cpu_id(self) -> str:
out = self._run_powershell(
"(Get-CimInstance -ClassName Win32_Processor | Select-Object -First 1 -ExpandProperty ProcessorId)"
)
return out.strip()
def _get_windows_baseboard_serial(self) -> str:
out = self._run_powershell(
"(Get-CimInstance -ClassName Win32_BaseBoard | Select-Object -First 1 -ExpandProperty SerialNumber)"
)
serial = out.strip()
if serial and serial != "To be filled by O.E.M.":
return serial
return ""
def _get_windows_disk_serials(self) -> list:
raw = self._run_powershell(
"$d=Get-CimInstance -ClassName Win32_DiskDrive | Select-Object SerialNumber,InterfaceType,PNPDeviceID,MediaType; $d | ConvertTo-Json -Compress"
)
if not raw:
return []
try:
data = json.loads(raw)
except Exception:
return []
disks = data if isinstance(data, list) else ([data] if isinstance(data, dict) else [])
serials = []
for d in disks:
serial = str((d.get("SerialNumber") or "")).strip()
iface = str((d.get("InterfaceType") or "")).strip().upper()
pnp = str((d.get("PNPDeviceID") or "")).strip().upper()
media = str((d.get("MediaType") or "")).strip().upper()
if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media:
serials.append(serial)
serials = sorted(set(serials))
return serials
def _get_windows_identifiers(self) -> Dict[str, Any]:
cpu_id = self._get_windows_cpu_id()
board_serial = self._get_windows_baseboard_serial()
disk_serials = self._get_windows_disk_serials()
if not cpu_id:
try:
result = subprocess.run(
["wmic", "cpu", "get", "ProcessorId", "/value"],
capture_output=True,
text=True,
timeout=10,
)
for line in (result.stdout or "").split("\n"):
if "ProcessorId=" in line:
cpu_id = line.split("=", 1)[1].strip()
if cpu_id:
break
except Exception:
pass
if not board_serial:
try:
result = subprocess.run(
["wmic", "baseboard", "get", "SerialNumber", "/value"],
capture_output=True,
text=True,
timeout=10,
)
for line in (result.stdout or "").split("\n"):
if "SerialNumber=" in line:
board_serial = line.split("=", 1)[1].strip()
if board_serial and board_serial != "To be filled by O.E.M.":
break
board_serial = ""
except Exception:
pass
if not disk_serials:
try:
result = subprocess.run(
[
"wmic",
"path",
"Win32_DiskDrive",
"get",
"SerialNumber,InterfaceType,PNPDeviceID,MediaType",
"/value",
],
capture_output=True,
text=True,
timeout=10,
)
block = {}
serials = []
for line in (result.stdout or "").split("\n"):
line = line.strip()
if not line:
serial = (block.get("SerialNumber") or "").strip()
iface = (block.get("InterfaceType") or "").strip().upper()
pnp = (block.get("PNPDeviceID") or "").strip().upper()
media = (block.get("MediaType") or "").strip().upper()
if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media:
serials.append(serial)
block = {}
continue
if "=" in line:
k, v = line.split("=", 1)
block[k] = v
if block:
serial = (block.get("SerialNumber") or "").strip()
iface = (block.get("InterfaceType") or "").strip().upper()
pnp = (block.get("PNPDeviceID") or "").strip().upper()
media = (block.get("MediaType") or "").strip().upper()
if serial and iface != "USB" and not pnp.startswith("USBSTOR") and "REMOVABLE" not in media:
serials.append(serial)
disk_serials = sorted(set(serials))
except Exception:
pass
mac = ""
try:
import uuid
mac = ":".join(
["{:02x}".format((uuid.getnode() >> elements) & 0xFF) for elements in range(0, 2 * 6, 2)][::-1]
)
except Exception:
mac = ""
return {
"cpu_id": cpu_id.strip() if cpu_id else "",
"board_serial": board_serial.strip() if board_serial else "",
"disk_serials": disk_serials or [],
"mac": mac,
"node": platform.node(),
"processor": platform.processor(),
}
def _hash_core_info(self, core_info: list) -> str:
combined_info = "|".join(sorted(core_info))
return hashlib.sha256(combined_info.encode("utf-8")).hexdigest()[:16].upper()
def _build_machine_id_candidates(self) -> list:
system = platform.system()
info: Dict[str, Any] = {}
if system == "Windows":
info = self._get_windows_identifiers()
else:
info = {
"cpu_id": "",
"board_serial": "",
"disk_serials": [],
"mac": "",
"node": platform.node(),
"processor": platform.processor(),
}
cpu_id = info.get("cpu_id") or ""
board_serial = info.get("board_serial") or ""
disk_serials = info.get("disk_serials") or []
mac = info.get("mac") or ""
variants = []
full_core = []
if cpu_id:
full_core.append(f"CPU:{cpu_id}")
if board_serial:
full_core.append(f"BOARD:{board_serial}")
for s in disk_serials:
full_core.append(f"DISK:{s}")
if full_core:
variants.append(full_core)
no_disk = []
if cpu_id:
no_disk.append(f"CPU:{cpu_id}")
if board_serial:
no_disk.append(f"BOARD:{board_serial}")
if no_disk:
variants.append(no_disk)
if cpu_id:
variants.append([f"CPU:{cpu_id}"])
if board_serial:
variants.append([f"BOARD:{board_serial}"])
if mac:
variants.append([f"MAC:{mac}"])
if cpu_id and mac:
variants.append([f"CPU:{cpu_id}", f"MAC:{mac}"])
fallback_core = []
node = (info.get("node") or "").strip()
proc = (info.get("processor") or "").strip()
if node:
fallback_core.append(f"NODE:{node}")
if proc:
fallback_core.append(f"PROCESSOR:{proc}")
if fallback_core:
variants.append(fallback_core)
prefix = "W10-" if system == "Windows" else "FB-"
candidates = []
for core in variants:
try:
mid = f"{prefix}{self._hash_core_info(core)}"
except Exception:
continue
if mid not in candidates:
candidates.append(mid)
return candidates
def get_machine_id(self) -> str:
"""生成机器硬件指纹"""
if self._machine_id:
return self._machine_id
try:
candidates = self._build_machine_id_candidates()
if not candidates:
raise RuntimeError("无法生成机器指纹")
self._machine_id_candidates = candidates
self._machine_id = candidates[0]
logger.info(f"生成机器指纹: {self._machine_id}")
return self._machine_id
except Exception as e:
logger.error(f"生成机器指纹失败: {e}")
fallback_info = f"{platform.system()}-{platform.node()}-{platform.machine()}"
fallback_id = hashlib.md5(fallback_info.encode('utf-8')).hexdigest()[:12].upper()
self._machine_id = f"FB-{fallback_id}"
self._machine_id_candidates = [self._machine_id]
return self._machine_id
def get_machine_id_candidates(self) -> list:
if self._machine_id_candidates is None:
self.get_machine_id()
return list(self._machine_id_candidates or [])
def load_license(self, license_path: str) -> Optional[Dict[str, Any]]:
"""加载授权文件"""
try:
if not os.path.exists(license_path):
logger.warning(f"授权文件不存在: {license_path}")
return None
with open(license_path, 'r', encoding='utf-8') as f:
license_data = json.load(f)
# 基本字段验证
required_fields = ['product', 'license_id', 'license_type', 'machine_id',
'issued_at', 'expires_at', 'signature']
for field in required_fields:
if field not in license_data:
logger.error(f"授权文件缺少必要字段: {field}")
return None
return license_data
except json.JSONDecodeError as e:
logger.error(f"授权文件格式错误: {e}")
return None
except Exception as e:
logger.error(f"加载授权文件失败: {e}")
return None
def verify_signature(self, license_data: Dict[str, Any], public_key_path: str) -> bool:
"""验证授权文件数字签名
规则RSA-PSS + SHA256MGF1(SHA256),盐长度固定为 32 字节;
待签名数据为去除 signature 后的紧凑、排序键 JSON。
"""
try:
if not os.path.exists(public_key_path):
logger.error(f"公钥文件不存在: {public_key_path}")
return False
# 加载公钥
with open(public_key_path, 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
# 提取签名
signature_b64 = license_data.get('signature', '')
if not signature_b64:
logger.error("授权文件缺少签名")
return False
try:
signature = base64.b64decode(signature_b64)
except Exception as e:
logger.error(f"签名格式错误: {e}")
return False
# 构建待验证的数据排除signature字段
license_copy = license_data.copy()
license_copy.pop('signature', None)
# 按键排序确保一致性
sorted_data = json.dumps(license_copy, sort_keys=True, separators=(',', ':'))
message = sorted_data.encode('utf-8')
# 验证签名(固定 PSS 盐长度为 32 字节,确保跨语言一致)
try:
public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=PSS_SALT_LENGTH_BYTES
),
hashes.SHA256()
)
return True
except InvalidSignature:
logger.error("授权文件签名验证失败")
return False
except Exception as e:
logger.error(f"签名验证过程出错: {e}")
return False
def check_validity(self, license_data: Dict[str, Any], machine_id: str,
grace_days: int = 0) -> Tuple[bool, str]:
"""检查授权有效性"""
try:
# 检查产品匹配
if license_data.get('product') != 'BodyBalanceEvaluation':
return False, "授权文件产品不匹配"
# 检查机器绑定
license_machine_id = license_data.get('machine_id', '')
candidates = []
if machine_id:
candidates.append(machine_id)
for mid in self.get_machine_id_candidates():
if mid not in candidates:
candidates.append(mid)
if license_machine_id not in candidates:
return False, f"授权文件与当前机器不匹配 (当前: {machine_id}, 授权: {license_machine_id})"
# 检查有效期
now = datetime.now(timezone.utc)
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except Exception as e:
return False, f"授权过期时间格式错误: {e}"
# 应用宽限期
if grace_days > 0:
from datetime import timedelta
expires_at += timedelta(days=grace_days)
if now > expires_at:
return False, f"授权已过期 (过期时间: {expires_at.strftime('%Y-%m-%d %H:%M:%S')})"
return True, "授权有效"
except Exception as e:
return False, f"授权验证过程出错: {e}"
def get_license_status(self, force_reload: bool = False) -> LicenseStatus:
"""获取当前授权状态"""
try:
# 检查缓存(避免频繁文件访问)
if not force_reload and self._license_cache and self._cache_timestamp:
cache_age = datetime.now().timestamp() - self._cache_timestamp
if cache_age < 300: # 5分钟缓存
return self._license_cache
# 获取配置
if not self.config_manager:
return LicenseStatus(valid=False, message="配置管理器未初始化")
license_path, public_key_path, grace_days = self._resolve_license_paths()
# 获取机器指纹
machine_id = self.get_machine_id()
# 加载授权文件
license_data = self.load_license(license_path)
if not license_data:
status = LicenseStatus(
valid=False,
message="授权文件不存在或格式错误,当前为试用模式",
license_type="trial",
features={"recording": True, "export": False, "trial_limit_minutes": 30}
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
# 验证签名
if not self.verify_signature(license_data, public_key_path):
status = LicenseStatus(
valid=False,
message="授权文件签名验证失败",
license_type="invalid"
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
self._mirror_license_assets(license_path=license_path, public_key_path=public_key_path)
# 检查有效性
is_valid, message = self.check_validity(license_data, machine_id, grace_days)
if is_valid:
# 解析过期时间
expires_at = None
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except:
pass
status = LicenseStatus(
valid=True,
message=message,
license_type=license_data.get('license_type', 'unknown'),
expires_at=expires_at,
features=license_data.get('features', {}),
license_id=license_data.get('license_id', '')
)
else:
status = LicenseStatus(
valid=False,
message=message,
license_type=license_data.get('license_type', 'unknown'),
license_id=license_data.get('license_id', '')
)
# 缓存结果
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
except Exception as e:
logger.error(f"获取授权状态失败: {e}")
status = LicenseStatus(
valid=False,
message=f"授权检查出错: {str(e)}",
license_type="error"
)
return status
def verify_license_file(self, license_path: str) -> Tuple[bool, str]:
"""验证指定授权文件的签名与有效性
Returns:
(bool, str): 验证结果与消息
"""
try:
if not self.config_manager:
return False, "配置管理器未初始化"
_, public_key_path, grace_days = self._resolve_license_paths()
if not os.path.exists(license_path):
return False, f"授权文件不存在: {license_path}"
# 加载授权文件
data = self.load_license(license_path)
if not data:
return False, "授权文件格式错误或缺少必要字段"
# 验证签名
if not self.verify_signature(data, public_key_path):
return False, "授权文件签名验证失败"
# 检查有效性(机器绑定 + 过期判断)
machine_id = self.get_machine_id()
is_valid, message = self.check_validity(data, machine_id, grace_days)
return is_valid, message
except Exception as e:
logger.error(f"验证授权文件失败: {e}")
return False, f"验证过程出错: {str(e)}"
def generate_activation_request(self, output_path: str = None, company_name: str = None, contact_info: str = None) -> str:
"""生成激活请求文件(离线激活用)"""
try:
machine_id = self.get_machine_id()
request_data = {
"product": "BodyBalanceEvaluation",
"version": "1.5.0",
"machine_id": machine_id,
"platform": platform.system(),
"request_time": datetime.now(timezone.utc).isoformat(),
"hardware_info": {
"system": platform.system(),
"machine": platform.machine(),
"processor": platform.processor(),
"node": platform.node()
}
}
if company_name:
request_data["company_name"] = company_name
if contact_info:
request_data["contact_info"] = contact_info
if not output_path:
output_path = f"activation_request_{machine_id}.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(request_data, f, indent=2, ensure_ascii=False)
logger.info(f"激活请求文件已生成: {output_path}")
return output_path
except Exception as e:
logger.error(f"生成激活请求失败: {e}")
raise