BodyBalanceEvaluation/backend/devices/utils/license_manager.py

432 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
软件授权管理模块
提供硬件指纹生成、授权文件验证、数字签名校验等功能
"""
import json
import hashlib
import platform
import subprocess
import os
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidSignature
import base64
logger = logging.getLogger(__name__)
# 授权签名与验证参数(跨语言一致性):
# - 算法RSA-PSS + SHA-256
# - MGFMGF1(SHA-256)
# - 盐长度:固定 32 字节
# - 数据序列化(签名输入):
# * 去除 signature 字段
# * 按键名排序(包括嵌套对象)
# * 紧凑 JSON等价于 json.dumps(obj, sort_keys=True, separators=(",", ":"))
PSS_SALT_LENGTH_BYTES = 32
@dataclass
class LicenseStatus:
"""授权状态数据类"""
valid: bool = False
message: str = ""
license_type: str = ""
expires_at: Optional[datetime] = None
features: Dict[str, Any] = None
license_id: str = ""
def __post_init__(self):
if self.features is None:
self.features = {}
class LicenseManager:
"""授权管理器"""
def __init__(self, config_manager=None):
self.config_manager = config_manager
self._machine_id = None
self._license_cache = None
self._cache_timestamp = None
def get_machine_id(self) -> str:
"""生成机器硬件指纹"""
if self._machine_id:
return self._machine_id
try:
# 收集硬件信息
hardware_info = []
# CPU信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'cpu', 'get', 'ProcessorId', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'ProcessorId=' in line:
cpu_id = line.split('=')[1].strip()
if cpu_id:
hardware_info.append(f"CPU:{cpu_id}")
break
else:
# Linux/Mac 可以使用其他方法获取CPU信息
pass
except Exception as e:
logger.warning(f"获取CPU信息失败: {e}")
# 主板信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'baseboard', 'get', 'SerialNumber', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'SerialNumber=' in line:
board_serial = line.split('=')[1].strip()
if board_serial and board_serial != "To be filled by O.E.M.":
hardware_info.append(f"BOARD:{board_serial}")
break
except Exception as e:
logger.warning(f"获取主板信息失败: {e}")
# 磁盘信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'diskdrive', 'get', 'SerialNumber', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'SerialNumber=' in line:
disk_serial = line.split('=')[1].strip()
if disk_serial:
hardware_info.append(f"DISK:{disk_serial}")
break
except Exception as e:
logger.warning(f"获取磁盘信息失败: {e}")
# MAC地址
try:
import uuid
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
for elements in range(0,2*6,2)][::-1])
hardware_info.append(f"MAC:{mac}")
except Exception as e:
logger.warning(f"获取MAC地址失败: {e}")
# 系统信息作为补充
hardware_info.append(f"OS:{platform.system()}")
hardware_info.append(f"MACHINE:{platform.machine()}")
# 如果没有获取到足够的硬件信息使用系统信息作为fallback
if len(hardware_info) < 2:
hardware_info.append(f"NODE:{platform.node()}")
hardware_info.append(f"PROCESSOR:{platform.processor()}")
# 生成指纹哈希
combined_info = "|".join(sorted(hardware_info))
machine_id = hashlib.sha256(combined_info.encode('utf-8')).hexdigest()[:16].upper()
self._machine_id = f"W10-{machine_id}"
logger.info(f"生成机器指纹: {self._machine_id}")
return self._machine_id
except Exception as e:
logger.error(f"生成机器指纹失败: {e}")
# 使用fallback方案
fallback_info = f"{platform.system()}-{platform.node()}-{platform.machine()}"
fallback_id = hashlib.md5(fallback_info.encode('utf-8')).hexdigest()[:12].upper()
self._machine_id = f"FB-{fallback_id}"
return self._machine_id
def load_license(self, license_path: str) -> Optional[Dict[str, Any]]:
"""加载授权文件"""
try:
if not os.path.exists(license_path):
logger.warning(f"授权文件不存在: {license_path}")
return None
with open(license_path, 'r', encoding='utf-8') as f:
license_data = json.load(f)
# 基本字段验证
required_fields = ['product', 'license_id', 'license_type', 'machine_id',
'issued_at', 'expires_at', 'signature']
for field in required_fields:
if field not in license_data:
logger.error(f"授权文件缺少必要字段: {field}")
return None
return license_data
except json.JSONDecodeError as e:
logger.error(f"授权文件格式错误: {e}")
return None
except Exception as e:
logger.error(f"加载授权文件失败: {e}")
return None
def verify_signature(self, license_data: Dict[str, Any], public_key_path: str) -> bool:
"""验证授权文件数字签名
规则RSA-PSS + SHA256MGF1(SHA256),盐长度固定为 32 字节;
待签名数据为去除 signature 后的紧凑、排序键 JSON。
"""
try:
if not os.path.exists(public_key_path):
logger.error(f"公钥文件不存在: {public_key_path}")
return False
# 加载公钥
with open(public_key_path, 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
# 提取签名
signature_b64 = license_data.get('signature', '')
if not signature_b64:
logger.error("授权文件缺少签名")
return False
try:
signature = base64.b64decode(signature_b64)
except Exception as e:
logger.error(f"签名格式错误: {e}")
return False
# 构建待验证的数据排除signature字段
license_copy = license_data.copy()
license_copy.pop('signature', None)
# 按键排序确保一致性
sorted_data = json.dumps(license_copy, sort_keys=True, separators=(',', ':'))
message = sorted_data.encode('utf-8')
# 验证签名(固定 PSS 盐长度为 32 字节,确保跨语言一致)
try:
public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=PSS_SALT_LENGTH_BYTES
),
hashes.SHA256()
)
return True
except InvalidSignature:
logger.error("授权文件签名验证失败")
return False
except Exception as e:
logger.error(f"签名验证过程出错: {e}")
return False
def check_validity(self, license_data: Dict[str, Any], machine_id: str,
grace_days: int = 0) -> Tuple[bool, str]:
"""检查授权有效性"""
try:
# 检查产品匹配
if license_data.get('product') != 'BodyBalanceEvaluation':
return False, "授权文件产品不匹配"
# 检查机器绑定
license_machine_id = license_data.get('machine_id', '')
if license_machine_id != machine_id:
return False, f"授权文件与当前机器不匹配 (当前: {machine_id}, 授权: {license_machine_id})"
# 检查有效期
now = datetime.now(timezone.utc)
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except Exception as e:
return False, f"授权过期时间格式错误: {e}"
# 应用宽限期
if grace_days > 0:
from datetime import timedelta
expires_at += timedelta(days=grace_days)
if now > expires_at:
return False, f"授权已过期 (过期时间: {expires_at.strftime('%Y-%m-%d %H:%M:%S')})"
return True, "授权有效"
except Exception as e:
return False, f"授权验证过程出错: {e}"
def get_license_status(self, force_reload: bool = False) -> LicenseStatus:
"""获取当前授权状态"""
try:
# 检查缓存(避免频繁文件访问)
if not force_reload and self._license_cache and self._cache_timestamp:
cache_age = datetime.now().timestamp() - self._cache_timestamp
if cache_age < 300: # 5分钟缓存
return self._license_cache
# 获取配置
if not self.config_manager:
return LicenseStatus(valid=False, message="配置管理器未初始化")
license_path = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json')
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
# 转换为绝对路径
if not os.path.isabs(license_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
license_path = os.path.join(base_dir, license_path)
if not os.path.isabs(public_key_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
public_key_path = os.path.join(base_dir, public_key_path)
# 获取机器指纹
machine_id = self.get_machine_id()
# 加载授权文件
license_data = self.load_license(license_path)
if not license_data:
status = LicenseStatus(
valid=False,
message="授权文件不存在或格式错误,当前为试用模式",
license_type="trial",
features={"recording": True, "export": False, "trial_limit_minutes": 30}
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
# 验证签名
if not self.verify_signature(license_data, public_key_path):
status = LicenseStatus(
valid=False,
message="授权文件签名验证失败",
license_type="invalid"
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
# 检查有效性
is_valid, message = self.check_validity(license_data, machine_id, grace_days)
if is_valid:
# 解析过期时间
expires_at = None
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except:
pass
status = LicenseStatus(
valid=True,
message=message,
license_type=license_data.get('license_type', 'unknown'),
expires_at=expires_at,
features=license_data.get('features', {}),
license_id=license_data.get('license_id', '')
)
else:
status = LicenseStatus(
valid=False,
message=message,
license_type=license_data.get('license_type', 'unknown'),
license_id=license_data.get('license_id', '')
)
# 缓存结果
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
except Exception as e:
logger.error(f"获取授权状态失败: {e}")
status = LicenseStatus(
valid=False,
message=f"授权检查出错: {str(e)}",
license_type="error"
)
return status
def verify_license_file(self, license_path: str) -> Tuple[bool, str]:
"""验证指定授权文件的签名与有效性
Returns:
(bool, str): 验证结果与消息
"""
try:
if not self.config_manager:
return False, "配置管理器未初始化"
# 解析公钥路径与宽限期
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
# 转换为绝对路径相对backend目录
if not os.path.isabs(public_key_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
public_key_path = os.path.join(base_dir, public_key_path)
if not os.path.exists(license_path):
return False, f"授权文件不存在: {license_path}"
# 加载授权文件
data = self.load_license(license_path)
if not data:
return False, "授权文件格式错误或缺少必要字段"
# 验证签名
if not self.verify_signature(data, public_key_path):
return False, "授权文件签名验证失败"
# 检查有效性(机器绑定 + 过期判断)
machine_id = self.get_machine_id()
is_valid, message = self.check_validity(data, machine_id, grace_days)
return is_valid, message
except Exception as e:
logger.error(f"验证授权文件失败: {e}")
return False, f"验证过程出错: {str(e)}"
def generate_activation_request(self, output_path: str = None, company_name: str = None, contact_info: str = None) -> str:
"""生成激活请求文件(离线激活用)"""
try:
machine_id = self.get_machine_id()
request_data = {
"product": "BodyBalanceEvaluation",
"version": "1.0.0",
"machine_id": machine_id,
"platform": platform.system(),
"request_time": datetime.now(timezone.utc).isoformat(),
"hardware_info": {
"system": platform.system(),
"machine": platform.machine(),
"processor": platform.processor(),
"node": platform.node()
}
}
if company_name:
request_data["company_name"] = company_name
if contact_info:
request_data["contact_info"] = contact_info
if not output_path:
output_path = f"activation_request_{machine_id}.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(request_data, f, indent=2, ensure_ascii=False)
logger.info(f"激活请求文件已生成: {output_path}")
return output_path
except Exception as e:
logger.error(f"生成激活请求失败: {e}")
raise