418 lines
17 KiB
Python
418 lines
17 KiB
Python
"""
|
||
软件授权管理模块
|
||
提供硬件指纹生成、授权文件验证、数字签名校验等功能
|
||
"""
|
||
|
||
import json
|
||
import hashlib
|
||
import platform
|
||
import subprocess
|
||
import os
|
||
import logging
|
||
from datetime import datetime, timezone
|
||
from typing import Dict, Any, Optional, Tuple
|
||
from dataclasses import dataclass
|
||
from cryptography.hazmat.primitives import hashes, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import rsa, padding
|
||
from cryptography.exceptions import InvalidSignature
|
||
import base64
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
@dataclass
|
||
class LicenseStatus:
|
||
"""授权状态数据类"""
|
||
valid: bool = False
|
||
message: str = ""
|
||
license_type: str = ""
|
||
expires_at: Optional[datetime] = None
|
||
features: Dict[str, Any] = None
|
||
license_id: str = ""
|
||
|
||
def __post_init__(self):
|
||
if self.features is None:
|
||
self.features = {}
|
||
|
||
|
||
class LicenseManager:
|
||
"""授权管理器"""
|
||
|
||
def __init__(self, config_manager=None):
|
||
self.config_manager = config_manager
|
||
self._machine_id = None
|
||
self._license_cache = None
|
||
self._cache_timestamp = None
|
||
|
||
def get_machine_id(self) -> str:
|
||
"""生成机器硬件指纹"""
|
||
if self._machine_id:
|
||
return self._machine_id
|
||
|
||
try:
|
||
# 收集硬件信息
|
||
hardware_info = []
|
||
|
||
# CPU信息
|
||
try:
|
||
if platform.system() == "Windows":
|
||
result = subprocess.run(['wmic', 'cpu', 'get', 'ProcessorId', '/value'],
|
||
capture_output=True, text=True, timeout=10)
|
||
for line in result.stdout.split('\n'):
|
||
if 'ProcessorId=' in line:
|
||
cpu_id = line.split('=')[1].strip()
|
||
if cpu_id:
|
||
hardware_info.append(f"CPU:{cpu_id}")
|
||
break
|
||
else:
|
||
# Linux/Mac 可以使用其他方法获取CPU信息
|
||
pass
|
||
except Exception as e:
|
||
logger.warning(f"获取CPU信息失败: {e}")
|
||
|
||
# 主板信息
|
||
try:
|
||
if platform.system() == "Windows":
|
||
result = subprocess.run(['wmic', 'baseboard', 'get', 'SerialNumber', '/value'],
|
||
capture_output=True, text=True, timeout=10)
|
||
for line in result.stdout.split('\n'):
|
||
if 'SerialNumber=' in line:
|
||
board_serial = line.split('=')[1].strip()
|
||
if board_serial and board_serial != "To be filled by O.E.M.":
|
||
hardware_info.append(f"BOARD:{board_serial}")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"获取主板信息失败: {e}")
|
||
|
||
# 磁盘信息
|
||
try:
|
||
if platform.system() == "Windows":
|
||
result = subprocess.run(['wmic', 'diskdrive', 'get', 'SerialNumber', '/value'],
|
||
capture_output=True, text=True, timeout=10)
|
||
for line in result.stdout.split('\n'):
|
||
if 'SerialNumber=' in line:
|
||
disk_serial = line.split('=')[1].strip()
|
||
if disk_serial:
|
||
hardware_info.append(f"DISK:{disk_serial}")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"获取磁盘信息失败: {e}")
|
||
|
||
# MAC地址
|
||
try:
|
||
import uuid
|
||
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
|
||
for elements in range(0,2*6,2)][::-1])
|
||
hardware_info.append(f"MAC:{mac}")
|
||
except Exception as e:
|
||
logger.warning(f"获取MAC地址失败: {e}")
|
||
|
||
# 系统信息作为补充
|
||
hardware_info.append(f"OS:{platform.system()}")
|
||
hardware_info.append(f"MACHINE:{platform.machine()}")
|
||
|
||
# 如果没有获取到足够的硬件信息,使用系统信息作为fallback
|
||
if len(hardware_info) < 2:
|
||
hardware_info.append(f"NODE:{platform.node()}")
|
||
hardware_info.append(f"PROCESSOR:{platform.processor()}")
|
||
|
||
# 生成指纹哈希
|
||
combined_info = "|".join(sorted(hardware_info))
|
||
machine_id = hashlib.sha256(combined_info.encode('utf-8')).hexdigest()[:16].upper()
|
||
|
||
self._machine_id = f"W10-{machine_id}"
|
||
logger.info(f"生成机器指纹: {self._machine_id}")
|
||
return self._machine_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成机器指纹失败: {e}")
|
||
# 使用fallback方案
|
||
fallback_info = f"{platform.system()}-{platform.node()}-{platform.machine()}"
|
||
fallback_id = hashlib.md5(fallback_info.encode('utf-8')).hexdigest()[:12].upper()
|
||
self._machine_id = f"FB-{fallback_id}"
|
||
return self._machine_id
|
||
|
||
def load_license(self, license_path: str) -> Optional[Dict[str, Any]]:
|
||
"""加载授权文件"""
|
||
try:
|
||
if not os.path.exists(license_path):
|
||
logger.warning(f"授权文件不存在: {license_path}")
|
||
return None
|
||
|
||
with open(license_path, 'r', encoding='utf-8') as f:
|
||
license_data = json.load(f)
|
||
|
||
# 基本字段验证
|
||
required_fields = ['product', 'license_id', 'license_type', 'machine_id',
|
||
'issued_at', 'expires_at', 'signature']
|
||
for field in required_fields:
|
||
if field not in license_data:
|
||
logger.error(f"授权文件缺少必要字段: {field}")
|
||
return None
|
||
|
||
return license_data
|
||
|
||
except json.JSONDecodeError as e:
|
||
logger.error(f"授权文件格式错误: {e}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"加载授权文件失败: {e}")
|
||
return None
|
||
|
||
def verify_signature(self, license_data: Dict[str, Any], public_key_path: str) -> bool:
|
||
"""验证授权文件数字签名"""
|
||
try:
|
||
if not os.path.exists(public_key_path):
|
||
logger.error(f"公钥文件不存在: {public_key_path}")
|
||
return False
|
||
|
||
# 加载公钥
|
||
with open(public_key_path, 'rb') as f:
|
||
public_key = serialization.load_pem_public_key(f.read())
|
||
|
||
# 提取签名
|
||
signature_b64 = license_data.get('signature', '')
|
||
if not signature_b64:
|
||
logger.error("授权文件缺少签名")
|
||
return False
|
||
|
||
try:
|
||
signature = base64.b64decode(signature_b64)
|
||
except Exception as e:
|
||
logger.error(f"签名格式错误: {e}")
|
||
return False
|
||
|
||
# 构建待验证的数据(排除signature字段)
|
||
license_copy = license_data.copy()
|
||
license_copy.pop('signature', None)
|
||
|
||
# 按键排序确保一致性
|
||
sorted_data = json.dumps(license_copy, sort_keys=True, separators=(',', ':'))
|
||
message = sorted_data.encode('utf-8')
|
||
|
||
# 验证签名
|
||
try:
|
||
public_key.verify(
|
||
signature,
|
||
message,
|
||
padding.PSS(
|
||
mgf=padding.MGF1(hashes.SHA256()),
|
||
salt_length=padding.PSS.MAX_LENGTH
|
||
),
|
||
hashes.SHA256()
|
||
)
|
||
return True
|
||
except InvalidSignature:
|
||
logger.error("授权文件签名验证失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"签名验证过程出错: {e}")
|
||
return False
|
||
|
||
def check_validity(self, license_data: Dict[str, Any], machine_id: str,
|
||
grace_days: int = 0) -> Tuple[bool, str]:
|
||
"""检查授权有效性"""
|
||
try:
|
||
# 检查产品匹配
|
||
if license_data.get('product') != 'BodyBalanceEvaluation':
|
||
return False, "授权文件产品不匹配"
|
||
|
||
# 检查机器绑定
|
||
license_machine_id = license_data.get('machine_id', '')
|
||
if license_machine_id != machine_id:
|
||
return False, f"授权文件与当前机器不匹配 (当前: {machine_id}, 授权: {license_machine_id})"
|
||
|
||
# 检查有效期
|
||
now = datetime.now(timezone.utc)
|
||
|
||
try:
|
||
expires_at_str = license_data.get('expires_at', '')
|
||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||
except Exception as e:
|
||
return False, f"授权过期时间格式错误: {e}"
|
||
|
||
# 应用宽限期
|
||
if grace_days > 0:
|
||
from datetime import timedelta
|
||
expires_at += timedelta(days=grace_days)
|
||
|
||
if now > expires_at:
|
||
return False, f"授权已过期 (过期时间: {expires_at.strftime('%Y-%m-%d %H:%M:%S')})"
|
||
|
||
return True, "授权有效"
|
||
|
||
except Exception as e:
|
||
return False, f"授权验证过程出错: {e}"
|
||
|
||
def get_license_status(self, force_reload: bool = False) -> LicenseStatus:
|
||
"""获取当前授权状态"""
|
||
try:
|
||
# 检查缓存(避免频繁文件访问)
|
||
if not force_reload and self._license_cache and self._cache_timestamp:
|
||
cache_age = datetime.now().timestamp() - self._cache_timestamp
|
||
if cache_age < 300: # 5分钟缓存
|
||
return self._license_cache
|
||
|
||
# 获取配置
|
||
if not self.config_manager:
|
||
return LicenseStatus(valid=False, message="配置管理器未初始化")
|
||
|
||
license_path = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json')
|
||
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
|
||
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
|
||
|
||
# 转换为绝对路径
|
||
if not os.path.isabs(license_path):
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
|
||
license_path = os.path.join(base_dir, license_path)
|
||
|
||
if not os.path.isabs(public_key_path):
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
|
||
public_key_path = os.path.join(base_dir, public_key_path)
|
||
|
||
# 获取机器指纹
|
||
machine_id = self.get_machine_id()
|
||
|
||
# 加载授权文件
|
||
license_data = self.load_license(license_path)
|
||
if not license_data:
|
||
status = LicenseStatus(
|
||
valid=False,
|
||
message="授权文件不存在或格式错误,当前为试用模式",
|
||
license_type="trial",
|
||
features={"recording": True, "export": False, "trial_limit_minutes": 30}
|
||
)
|
||
self._license_cache = status
|
||
self._cache_timestamp = datetime.now().timestamp()
|
||
return status
|
||
|
||
# 验证签名
|
||
if not self.verify_signature(license_data, public_key_path):
|
||
status = LicenseStatus(
|
||
valid=False,
|
||
message="授权文件签名验证失败",
|
||
license_type="invalid"
|
||
)
|
||
self._license_cache = status
|
||
self._cache_timestamp = datetime.now().timestamp()
|
||
return status
|
||
|
||
# 检查有效性
|
||
is_valid, message = self.check_validity(license_data, machine_id, grace_days)
|
||
|
||
if is_valid:
|
||
# 解析过期时间
|
||
expires_at = None
|
||
try:
|
||
expires_at_str = license_data.get('expires_at', '')
|
||
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
|
||
except:
|
||
pass
|
||
|
||
status = LicenseStatus(
|
||
valid=True,
|
||
message=message,
|
||
license_type=license_data.get('license_type', 'unknown'),
|
||
expires_at=expires_at,
|
||
features=license_data.get('features', {}),
|
||
license_id=license_data.get('license_id', '')
|
||
)
|
||
else:
|
||
status = LicenseStatus(
|
||
valid=False,
|
||
message=message,
|
||
license_type=license_data.get('license_type', 'unknown'),
|
||
license_id=license_data.get('license_id', '')
|
||
)
|
||
|
||
# 缓存结果
|
||
self._license_cache = status
|
||
self._cache_timestamp = datetime.now().timestamp()
|
||
|
||
return status
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取授权状态失败: {e}")
|
||
status = LicenseStatus(
|
||
valid=False,
|
||
message=f"授权检查出错: {str(e)}",
|
||
license_type="error"
|
||
)
|
||
return status
|
||
|
||
def verify_license_file(self, license_path: str) -> Tuple[bool, str]:
|
||
"""验证指定授权文件的签名与有效性
|
||
|
||
Returns:
|
||
(bool, str): 验证结果与消息
|
||
"""
|
||
try:
|
||
if not self.config_manager:
|
||
return False, "配置管理器未初始化"
|
||
|
||
# 解析公钥路径与宽限期
|
||
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
|
||
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
|
||
|
||
# 转换为绝对路径(相对backend目录)
|
||
if not os.path.isabs(public_key_path):
|
||
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
|
||
public_key_path = os.path.join(base_dir, public_key_path)
|
||
|
||
if not os.path.exists(license_path):
|
||
return False, f"授权文件不存在: {license_path}"
|
||
|
||
# 加载授权文件
|
||
data = self.load_license(license_path)
|
||
if not data:
|
||
return False, "授权文件格式错误或缺少必要字段"
|
||
|
||
# 验证签名
|
||
if not self.verify_signature(data, public_key_path):
|
||
return False, "授权文件签名验证失败"
|
||
|
||
# 检查有效性(机器绑定 + 过期判断)
|
||
machine_id = self.get_machine_id()
|
||
is_valid, message = self.check_validity(data, machine_id, grace_days)
|
||
return is_valid, message
|
||
except Exception as e:
|
||
logger.error(f"验证授权文件失败: {e}")
|
||
return False, f"验证过程出错: {str(e)}"
|
||
|
||
def generate_activation_request(self, output_path: str = None, company_name: str = None, contact_info: str = None) -> str:
|
||
"""生成激活请求文件(离线激活用)"""
|
||
try:
|
||
machine_id = self.get_machine_id()
|
||
|
||
request_data = {
|
||
"product": "BodyBalanceEvaluation",
|
||
"version": "1.0.0",
|
||
"machine_id": machine_id,
|
||
"platform": platform.system(),
|
||
"request_time": datetime.now(timezone.utc).isoformat(),
|
||
"hardware_info": {
|
||
"system": platform.system(),
|
||
"machine": platform.machine(),
|
||
"processor": platform.processor(),
|
||
"node": platform.node()
|
||
}
|
||
}
|
||
|
||
if company_name:
|
||
request_data["company_name"] = company_name
|
||
if contact_info:
|
||
request_data["contact_info"] = contact_info
|
||
|
||
if not output_path:
|
||
output_path = f"activation_request_{machine_id}.json"
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(request_data, f, indent=2, ensure_ascii=False)
|
||
|
||
logger.info(f"激活请求文件已生成: {output_path}")
|
||
return output_path
|
||
|
||
except Exception as e:
|
||
logger.error(f"生成激活请求失败: {e}")
|
||
raise |