增加了软件系统授权检查的功能

This commit is contained in:
root 2025-10-29 22:42:25 +08:00
parent a948501aaf
commit bfa91c5ef4
12 changed files with 1259 additions and 0 deletions

198
backend/LICENSE_README.md Normal file
View File

@ -0,0 +1,198 @@
# 软件授权控制系统使用说明
## 概述
本系统实现了基于数字签名的软件授权控制功能,支持离线授权验证和多种授权类型。
## 文件结构
```
backend/
├── devices/utils/license_manager.py # 授权管理核心模块
├── tools/license_generator.py # 授权文件生成工具
├── license_public_key.pem # 授权验证公钥
├── sample_license.json # 示例授权文件
├── dev_license.json # 开发测试授权文件
├── config.ini # 配置文件(包含[LICENSE]节)
└── LICENSE_README.md # 本说明文档
```
## 配置说明
### 1. 配置文件设置
`config.ini` 中的 `[LICENSE]` 节配置授权相关参数:
```ini
[LICENSE]
# 授权文件路径
path = dev_license.json
# 公钥文件路径
public_key = license_public_key.pem
# 授权过期宽限期(天)
grace_days = 7
# 开发模式(跳过授权检查)
dev_mode = True
```
### 2. 开发模式
- 设置 `dev_mode = True` 可跳过授权检查,便于开发调试
- 生产环境必须设置 `dev_mode = False`
## 授权类型
### 1. 试用版 (trial)
- 功能限制录制时长30分钟最多10个会话
- 适用于产品试用
### 2. 标准版 (standard)
- 功能:完整录制和导出功能
- 限制最多1000个会话10个用户
- 适用于中小型机构
### 3. 专业版 (professional)
- 功能:所有功能无限制使用
- 包含高级分析功能
- 适用于大型机构
## 使用流程
### 1. 生成密钥对(首次部署)
```bash
cd backend
python tools/license_generator.py --generate-keys --private-key private_key.pem --public-key license_public_key.pem
```
### 2. 生成授权文件
```bash
# 生成标准版授权
python tools/license_generator.py \
--private-key private_key.pem \
--license-id "STANDARD-2025-001" \
--license-type standard \
--company-name "天宏博科技" \
--contact-info "contact@example.com" \
--expires-days 365 \
--output customer_license.json
# 生成试用版授权
python tools/license_generator.py \
--private-key private_key.pem \
--license-id "TRIAL-2024-001" \
--license-type trial \
--company-name "试用客户" \
--expires-days 30 \
--output trial_license.json
```
### 3. 部署授权文件
1. 将生成的授权文件复制到客户端
2. 更新 `config.ini` 中的 `path` 参数指向授权文件
3. 确保 `dev_mode = False`
4. 重启应用
## API接口
### 1. 获取授权信息
```http
GET /api/license/info
```
响应示例:
```json
{
"success": true,
"data": {
"valid": true,
"message": "授权有效",
"license_type": "standard",
"license_id": "STANDARD-2024-001",
"expires_at": "2025-12-31T23:59:59Z",
"features": {
"recording": true,
"export": true,
"max_sessions": 1000
},
"machine_id": "ABC123..."
}
}
```
### 2. 生成激活请求
```http
POST /api/license/activation-request
Content-Type: application/json
{
"company_name": "客户公司",
"contact_info": "contact@customer.com"
}
```
### 3. 验证授权文件
```http
POST /api/license/verify
Content-Type: multipart/form-data
license_file: [授权文件]
```
## 受保护的API
以下API需要有效授权才能访问
- `POST /api/detection/start` - 需要 `recording` 功能
- `POST /api/detection/{session_id}/stop` - 需要 `recording` 功能
- `GET /api/detection/data/*` - 需要基础授权
- `DELETE /api/detection/data/*` - 需要基础授权
## 安全注意事项
### 1. 私钥保护
- 私钥文件必须妥善保管,不得泄露
- 建议使用硬件安全模块(HSM)存储私钥
- 定期轮换密钥对
### 2. 授权文件保护
- 授权文件包含数字签名,不可篡改
- 建议通过安全渠道分发授权文件
- 客户端应定期验证授权有效性
### 3. 机器绑定
- 可通过 `machine_id` 参数绑定特定机器
- 使用 `*` 表示不限制机器
- 机器指纹基于硬件信息生成
## 故障排除
### 1. 授权验证失败
- 检查授权文件格式是否正确
- 确认公钥文件路径配置正确
- 验证授权文件签名是否有效
### 2. 功能受限
- 检查授权类型和功能权限
- 确认授权是否过期
- 验证机器ID是否匹配
### 3. 开发调试
- 临时启用 `dev_mode = True`
- 查看日志文件获取详细错误信息
- 使用 `dev_license.json` 进行测试
## 技术支持
如遇到授权相关问题,请提供以下信息:
1. 错误日志
2. 授权文件内容(去除签名)
3. 机器硬件指纹
4. 配置文件相关部分
联系方式:[技术支持邮箱]

View File

@ -0,0 +1,15 @@
{
"product": "BodyBalanceEvaluation",
"version": "1.0.0",
"machine_id": "W10-D13710C7BD317C29",
"platform": "Windows",
"request_time": "2025-10-28T09:36:27.737157+00:00",
"hardware_info": {
"system": "Windows",
"machine": "AMD64",
"processor": "Intel64 Family 6 Model 165 Stepping 2, GenuineIntel",
"node": "MSI"
},
"company_name": "测试公司",
"contact_info": "test@example.com"
}

18
backend/data/license.json Normal file
View File

@ -0,0 +1,18 @@
{
"product": "BodyBalanceEvaluation",
"version": "1.0",
"license_id": "TRIAL-2025-002",
"license_type": "trial",
"company_name": "试用客户",
"contact_info": "",
"issued_at": "2025-10-28T09:31:17.852321+00:00",
"expires_at": "2025-11-27T09:31:17.852321+00:00",
"machine_id": "W10-D13710C7BD317C29",
"features": {
"recording": true,
"export": true,
"trial_limit_minutes": 30,
"max_sessions": 10
},
"signature": "GOaHyLzqgmIDY0/ejaV/BF/VKF3kBh//G6WsS6dIDKn759UizrA0MQFDPmVWRLYCpCuJ+ChV2XB+Jnx+tjVsPh6eHThTbCs53o6AjH01p1uIBQvqphGWDVw5X8tsXdDNqfnrF08E2BXM4cQM5yt3R5RHhLUQbdAB3+ahxdL4u8RYh7upmYv1EyyGoV1ZvcM3x+BFvw9QPdbyt01O9uBmBK+YvNlRY74Q5fZ0jRrC/cbu7VWdX7HGHAIBaGZ3mYxUR7A5neBUaXVnM3d3wjJqWrd6cCsUm8ha/GfUxTop9NbxSAEwc5eHQFVxYOgv++iw8zwIRS+vx4PZPnmC7T952Q=="
}

View File

@ -0,0 +1,418 @@
"""
软件授权管理模块
提供硬件指纹生成授权文件验证数字签名校验等功能
"""
import json
import hashlib
import platform
import subprocess
import os
import logging
from datetime import datetime, timezone
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.exceptions import InvalidSignature
import base64
logger = logging.getLogger(__name__)
@dataclass
class LicenseStatus:
"""授权状态数据类"""
valid: bool = False
message: str = ""
license_type: str = ""
expires_at: Optional[datetime] = None
features: Dict[str, Any] = None
license_id: str = ""
def __post_init__(self):
if self.features is None:
self.features = {}
class LicenseManager:
"""授权管理器"""
def __init__(self, config_manager=None):
self.config_manager = config_manager
self._machine_id = None
self._license_cache = None
self._cache_timestamp = None
def get_machine_id(self) -> str:
"""生成机器硬件指纹"""
if self._machine_id:
return self._machine_id
try:
# 收集硬件信息
hardware_info = []
# CPU信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'cpu', 'get', 'ProcessorId', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'ProcessorId=' in line:
cpu_id = line.split('=')[1].strip()
if cpu_id:
hardware_info.append(f"CPU:{cpu_id}")
break
else:
# Linux/Mac 可以使用其他方法获取CPU信息
pass
except Exception as e:
logger.warning(f"获取CPU信息失败: {e}")
# 主板信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'baseboard', 'get', 'SerialNumber', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'SerialNumber=' in line:
board_serial = line.split('=')[1].strip()
if board_serial and board_serial != "To be filled by O.E.M.":
hardware_info.append(f"BOARD:{board_serial}")
break
except Exception as e:
logger.warning(f"获取主板信息失败: {e}")
# 磁盘信息
try:
if platform.system() == "Windows":
result = subprocess.run(['wmic', 'diskdrive', 'get', 'SerialNumber', '/value'],
capture_output=True, text=True, timeout=10)
for line in result.stdout.split('\n'):
if 'SerialNumber=' in line:
disk_serial = line.split('=')[1].strip()
if disk_serial:
hardware_info.append(f"DISK:{disk_serial}")
break
except Exception as e:
logger.warning(f"获取磁盘信息失败: {e}")
# MAC地址
try:
import uuid
mac = ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
for elements in range(0,2*6,2)][::-1])
hardware_info.append(f"MAC:{mac}")
except Exception as e:
logger.warning(f"获取MAC地址失败: {e}")
# 系统信息作为补充
hardware_info.append(f"OS:{platform.system()}")
hardware_info.append(f"MACHINE:{platform.machine()}")
# 如果没有获取到足够的硬件信息使用系统信息作为fallback
if len(hardware_info) < 2:
hardware_info.append(f"NODE:{platform.node()}")
hardware_info.append(f"PROCESSOR:{platform.processor()}")
# 生成指纹哈希
combined_info = "|".join(sorted(hardware_info))
machine_id = hashlib.sha256(combined_info.encode('utf-8')).hexdigest()[:16].upper()
self._machine_id = f"W10-{machine_id}"
logger.info(f"生成机器指纹: {self._machine_id}")
return self._machine_id
except Exception as e:
logger.error(f"生成机器指纹失败: {e}")
# 使用fallback方案
fallback_info = f"{platform.system()}-{platform.node()}-{platform.machine()}"
fallback_id = hashlib.md5(fallback_info.encode('utf-8')).hexdigest()[:12].upper()
self._machine_id = f"FB-{fallback_id}"
return self._machine_id
def load_license(self, license_path: str) -> Optional[Dict[str, Any]]:
"""加载授权文件"""
try:
if not os.path.exists(license_path):
logger.warning(f"授权文件不存在: {license_path}")
return None
with open(license_path, 'r', encoding='utf-8') as f:
license_data = json.load(f)
# 基本字段验证
required_fields = ['product', 'license_id', 'license_type', 'machine_id',
'issued_at', 'expires_at', 'signature']
for field in required_fields:
if field not in license_data:
logger.error(f"授权文件缺少必要字段: {field}")
return None
return license_data
except json.JSONDecodeError as e:
logger.error(f"授权文件格式错误: {e}")
return None
except Exception as e:
logger.error(f"加载授权文件失败: {e}")
return None
def verify_signature(self, license_data: Dict[str, Any], public_key_path: str) -> bool:
"""验证授权文件数字签名"""
try:
if not os.path.exists(public_key_path):
logger.error(f"公钥文件不存在: {public_key_path}")
return False
# 加载公钥
with open(public_key_path, 'rb') as f:
public_key = serialization.load_pem_public_key(f.read())
# 提取签名
signature_b64 = license_data.get('signature', '')
if not signature_b64:
logger.error("授权文件缺少签名")
return False
try:
signature = base64.b64decode(signature_b64)
except Exception as e:
logger.error(f"签名格式错误: {e}")
return False
# 构建待验证的数据排除signature字段
license_copy = license_data.copy()
license_copy.pop('signature', None)
# 按键排序确保一致性
sorted_data = json.dumps(license_copy, sort_keys=True, separators=(',', ':'))
message = sorted_data.encode('utf-8')
# 验证签名
try:
public_key.verify(
signature,
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except InvalidSignature:
logger.error("授权文件签名验证失败")
return False
except Exception as e:
logger.error(f"签名验证过程出错: {e}")
return False
def check_validity(self, license_data: Dict[str, Any], machine_id: str,
grace_days: int = 0) -> Tuple[bool, str]:
"""检查授权有效性"""
try:
# 检查产品匹配
if license_data.get('product') != 'BodyBalanceEvaluation':
return False, "授权文件产品不匹配"
# 检查机器绑定
license_machine_id = license_data.get('machine_id', '')
if license_machine_id != machine_id:
return False, f"授权文件与当前机器不匹配 (当前: {machine_id}, 授权: {license_machine_id})"
# 检查有效期
now = datetime.now(timezone.utc)
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except Exception as e:
return False, f"授权过期时间格式错误: {e}"
# 应用宽限期
if grace_days > 0:
from datetime import timedelta
expires_at += timedelta(days=grace_days)
if now > expires_at:
return False, f"授权已过期 (过期时间: {expires_at.strftime('%Y-%m-%d %H:%M:%S')})"
return True, "授权有效"
except Exception as e:
return False, f"授权验证过程出错: {e}"
def get_license_status(self, force_reload: bool = False) -> LicenseStatus:
"""获取当前授权状态"""
try:
# 检查缓存(避免频繁文件访问)
if not force_reload and self._license_cache and self._cache_timestamp:
cache_age = datetime.now().timestamp() - self._cache_timestamp
if cache_age < 300: # 5分钟缓存
return self._license_cache
# 获取配置
if not self.config_manager:
return LicenseStatus(valid=False, message="配置管理器未初始化")
license_path = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json')
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
# 转换为绝对路径
if not os.path.isabs(license_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
license_path = os.path.join(base_dir, license_path)
if not os.path.isabs(public_key_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) # backend目录
public_key_path = os.path.join(base_dir, public_key_path)
# 获取机器指纹
machine_id = self.get_machine_id()
# 加载授权文件
license_data = self.load_license(license_path)
if not license_data:
status = LicenseStatus(
valid=False,
message="授权文件不存在或格式错误,当前为试用模式",
license_type="trial",
features={"recording": True, "export": False, "trial_limit_minutes": 30}
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
# 验证签名
if not self.verify_signature(license_data, public_key_path):
status = LicenseStatus(
valid=False,
message="授权文件签名验证失败",
license_type="invalid"
)
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
# 检查有效性
is_valid, message = self.check_validity(license_data, machine_id, grace_days)
if is_valid:
# 解析过期时间
expires_at = None
try:
expires_at_str = license_data.get('expires_at', '')
expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00'))
except:
pass
status = LicenseStatus(
valid=True,
message=message,
license_type=license_data.get('license_type', 'unknown'),
expires_at=expires_at,
features=license_data.get('features', {}),
license_id=license_data.get('license_id', '')
)
else:
status = LicenseStatus(
valid=False,
message=message,
license_type=license_data.get('license_type', 'unknown'),
license_id=license_data.get('license_id', '')
)
# 缓存结果
self._license_cache = status
self._cache_timestamp = datetime.now().timestamp()
return status
except Exception as e:
logger.error(f"获取授权状态失败: {e}")
status = LicenseStatus(
valid=False,
message=f"授权检查出错: {str(e)}",
license_type="error"
)
return status
def verify_license_file(self, license_path: str) -> Tuple[bool, str]:
"""验证指定授权文件的签名与有效性
Returns:
(bool, str): 验证结果与消息
"""
try:
if not self.config_manager:
return False, "配置管理器未初始化"
# 解析公钥路径与宽限期
public_key_path = self.config_manager.get_config_value('LICENSE', 'public_key', 'backend/license_pub.pem')
grace_days = int(self.config_manager.get_config_value('LICENSE', 'grace_days', '3'))
# 转换为绝对路径相对backend目录
if not os.path.isabs(public_key_path):
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
public_key_path = os.path.join(base_dir, public_key_path)
if not os.path.exists(license_path):
return False, f"授权文件不存在: {license_path}"
# 加载授权文件
data = self.load_license(license_path)
if not data:
return False, "授权文件格式错误或缺少必要字段"
# 验证签名
if not self.verify_signature(data, public_key_path):
return False, "授权文件签名验证失败"
# 检查有效性(机器绑定 + 过期判断)
machine_id = self.get_machine_id()
is_valid, message = self.check_validity(data, machine_id, grace_days)
return is_valid, message
except Exception as e:
logger.error(f"验证授权文件失败: {e}")
return False, f"验证过程出错: {str(e)}"
def generate_activation_request(self, output_path: str = None, company_name: str = None, contact_info: str = None) -> str:
"""生成激活请求文件(离线激活用)"""
try:
machine_id = self.get_machine_id()
request_data = {
"product": "BodyBalanceEvaluation",
"version": "1.0.0",
"machine_id": machine_id,
"platform": platform.system(),
"request_time": datetime.now(timezone.utc).isoformat(),
"hardware_info": {
"system": platform.system(),
"machine": platform.machine(),
"processor": platform.processor(),
"node": platform.node()
}
}
if company_name:
request_data["company_name"] = company_name
if contact_info:
request_data["contact_info"] = contact_info
if not output_path:
output_path = f"activation_request_{machine_id}.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(request_data, f, indent=2, ensure_ascii=False)
logger.info(f"激活请求文件已生成: {output_path}")
return output_path
except Exception as e:
logger.error(f"生成激活请求失败: {e}")
raise

View File

@ -0,0 +1,9 @@
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAs+14cr3rRW3fMjQBF6/6
KlvQZYJuC7CF6SltxO4d2aUaVqIU1qjksSpwI8p0oLJWIf8jf5+PW6KPx1sIhkBw
NVc8kUC2RKESOr/wwffnH42UZskts4pDD25w3md2X4j5lLMg4R0Bny7zco/D0YjD
c0wpJ6rRkczMIuObugUjsEr+iCvZE6Na0kctHqiLdzwTMEiLuS2iU8r4I8AfGvbQ
K5uJd1zFuEchgIjJgzizCah4E7QeKOyxPchJL5/tNPlAqI5nWSjBtTigQWPJuoXc
7jaIj8osmAuIAVxKvKqaqhfgEDLfEYdY3S6JPOFRPJ8zHgP05RDyaWGao15AwODk
DwIDAQAB
-----END PUBLIC KEY-----

28
backend/private_key.pem Normal file
View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQCz7XhyvetFbd8y
NAEXr/oqW9Blgm4LsIXpKW3E7h3ZpRpWohTWqOSxKnAjynSgslYh/yN/n49boo/H
WwiGQHA1VzyRQLZEoRI6v/DB9+cfjZRmyS2zikMPbnDeZ3ZfiPmUsyDhHQGfLvNy
j8PRiMNzTCknqtGRzMwi45u6BSOwSv6IK9kTo1rSRy0eqIt3PBMwSIu5LaJTyvgj
wB8a9tArm4l3XMW4RyGAiMmDOLMJqHgTtB4o7LE9yEkvn+00+UCojmdZKMG1OKBB
Y8m6hdzuNoiPyiyYC4gBXEq8qpqqF+AQMt8Rh1jdLok84VE8nzMeA/TlEPJpYZqj
XkDA4OQPAgMBAAECggEATJMGnXKlc+1wPYnzYxTiV2+uz9zEH9Z3D0Wx8UtTyeJh
xLgDPV4wwhOpGRpbK17qmFpgzbpnBR04qqPcC2LWPmVLohfT2n5gZz9z4+Ew7HVR
ULNS72Oq6aDbiVOoBb2iVn4rwpKZM5mEQ1/a+0yEvgeORlMFENODl1+d0XvJdgel
3y5M1lcHCxzbDAnJcG/qY+QpTRA04GxzdUSUS0+k7aOUBj4aTpnQF3qMRL7y9Vwj
FCJvnitUQHy9AgqnP5sVLYwKKXEIyTJpFPRpfyF6N6/qOhJKA4FA+bjEpSyIzwBf
CzB7KC+a9HE6t8RcECdjHTJ5UzX/GZGz1FPy0PNPqQKBgQDe6wuOirzUqHsZyXE+
2Jkvb0Q/uawfBgggTDgiwDNgghDvig+GO0HDdgre9xIoRaavoGx/oihgUjTZTEvh
refuyZ3Ps65VaTs0bYSQmo2QIKuLn4DWHXt3mJoIs4b1WRGUNBsJ8LJYopkX9O2f
6FKTxr/u6Xx1EtxHNJvQqITMlwKBgQDOoSiR5nr3wRAnrrKvX3QO74X8eWGsatSs
XcZD8CW9TyNytGFuT2q34/bSQQqg62ZesthE1XtEB+MJd85hrt8WmZlzU9pKIfNx
EwdcX80vjemLgdek1jwA8ZyJLZ4DPlqhwWW9ivIrK27eY70n83xaNCFBWzPWg9Ke
S9Lqmbp7SQKBgQDNhfmOvz0f+AIfIUnGvp5lTHmpIz+dDsuZM4yiBYCY3vJMV4a+
pI2ab4/QSA02khj/XbIK7u+49rIBEkX32YW3860LHUeDOdU7HioVxFj2ZBilTzbS
sjXuawTBNvwb4rXBZVT0kjVsYOUzYD9hqinQU3MMC7sSmYP8JnXuKCDgGwKBgQCC
i2TzULct4ibPu1qe2+KaMQ/oo9NmuBPnVlOVxppBUUdnB7lqlPgqd/cPfRI2+qIx
gDKMwodfQtBYwf18z5uYTrCZIUgPgAWq0cfbv5cFzVXY0s6oEMXWHs+0B0MGb5WZ
DnO13ZwEVCt2i3MNU5Kj9r1v6iwFAkHFysfVegxMKQKBgAa8oGcD5YIa3IdnxJZ6
VfrT0czUV+KAebVApvCc65P60uCuOMpPyKRSALfI9waaBxzU0wQar97fnvnHxGMi
mj0/ZAYj3oDsr7Vgl6XcC+fp8MoADFERMaOf2yt22RltNRWMphTjjPQg574P2IPS
7uosOvPdzcTmZlFLS3L/ijsW
-----END PRIVATE KEY-----

View File

@ -0,0 +1,19 @@
{
"license_info": {
"license_id": "DEMO-2024-001",
"license_type": "standard",
"company_name": "示例公司",
"contact_info": "demo@example.com",
"issued_at": "2024-01-01T00:00:00Z",
"expires_at": "2025-12-31T23:59:59Z",
"machine_id": "*",
"features": {
"recording": true,
"export": true,
"max_sessions": 1000,
"max_users": 10
},
"version": "1.0"
},
"signature": "demo_signature_placeholder_for_development_testing_only_not_for_production_use_please_generate_proper_signature_using_private_key_when_deploying_to_production_environment"
}

View File

@ -0,0 +1,216 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
授权文件生成工具
用于生成和签名授权文件
"""
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import base64
class LicenseGenerator:
"""授权文件生成器"""
def __init__(self, private_key_path=None):
"""
初始化授权生成器
Args:
private_key_path: 私钥文件路径
"""
self.private_key = None
if private_key_path and os.path.exists(private_key_path):
self.load_private_key(private_key_path)
def generate_key_pair(self, private_key_path, public_key_path):
"""
生成RSA密钥对
Args:
private_key_path: 私钥保存路径
public_key_path: 公钥保存路径
"""
# 生成私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
# 保存私钥
with open(private_key_path, 'wb') as f:
f.write(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
# 保存公钥
public_key = private_key.public_key()
with open(public_key_path, 'wb') as f:
f.write(public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
self.private_key = private_key
print(f"密钥对已生成:")
print(f" 私钥: {private_key_path}")
print(f" 公钥: {public_key_path}")
def load_private_key(self, private_key_path):
"""
加载私钥
Args:
private_key_path: 私钥文件路径
"""
with open(private_key_path, 'rb') as f:
self.private_key = serialization.load_pem_private_key(
f.read(),
password=None
)
def generate_license(self, license_info, output_path):
"""
生成授权文件
Args:
license_info: 授权信息字典
output_path: 输出文件路径
"""
if not self.private_key:
raise ValueError("未加载私钥,无法生成签名")
# 创建授权数据(与 LicenseManager 要求的顶层结构一致)
license_data = {
"product": "BodyBalanceEvaluation",
"version": license_info.get("version", "1.0"),
"license_id": license_info.get("license_id"),
"license_type": license_info.get("license_type"),
"company_name": license_info.get("company_name", ""),
"contact_info": license_info.get("contact_info", ""),
"issued_at": license_info.get("issued_at"),
"expires_at": license_info.get("expires_at"),
"machine_id": license_info.get("machine_id", "*"),
"features": license_info.get("features", {}),
}
# 生成签名(对除 signature 外的全部字段进行排序后签名)
sorted_json = json.dumps(license_data, sort_keys=True, separators=(',', ':'))
signature = self.private_key.sign(
sorted_json.encode('utf-8'),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
# 将签名编码为base64并写入
license_data["signature"] = base64.b64encode(signature).decode('utf-8')
# 保存授权文件
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(license_data, f, indent=2, ensure_ascii=False)
print(f"授权文件已生成: {output_path}")
return license_data
def main():
"""主函数"""
parser = argparse.ArgumentParser(description='授权文件生成工具')
parser.add_argument('--generate-keys', action='store_true', help='生成密钥对')
parser.add_argument('--private-key', help='私钥文件路径')
parser.add_argument('--public-key', help='公钥文件路径')
parser.add_argument('--license-id', help='授权ID')
parser.add_argument('--license-type', choices=['trial', 'standard', 'professional'],
default='standard', help='授权类型')
parser.add_argument('--company-name', help='公司名称')
parser.add_argument('--contact-info', help='联系信息')
parser.add_argument('--expires-days', type=int, default=365, help='有效期天数')
parser.add_argument('--machine-id', help='机器ID留空表示不限制')
parser.add_argument('--output', help='输出授权文件路径')
args = parser.parse_args()
generator = LicenseGenerator()
# 生成密钥对
if args.generate_keys:
if not args.private_key or not args.public_key:
print("错误: 生成密钥对需要指定 --private-key 和 --public-key 参数")
return
generator.generate_key_pair(args.private_key, args.public_key)
return
# 生成授权文件
if not args.license_id or not args.company_name or not args.output:
print("错误: 生成授权文件需要指定 --license-id, --company-name 和 --output 参数")
return
if not args.private_key or not os.path.exists(args.private_key):
print("错误: 需要指定有效的私钥文件路径")
return
# 加载私钥
generator.load_private_key(args.private_key)
# 创建授权信息
from datetime import timezone
now = datetime.now(timezone.utc)
expires_at = now + timedelta(days=args.expires_days)
# 根据授权类型设置功能
features = {
"recording": True,
"export": True
}
if args.license_type == 'trial':
features.update({
"trial_limit_minutes": 30,
"max_sessions": 10
})
elif args.license_type == 'standard':
features.update({
"max_sessions": 1000,
"max_users": 10
})
elif args.license_type == 'professional':
features.update({
"max_sessions": -1, # 无限制
"max_users": -1, # 无限制
"advanced_analytics": True
})
license_info = {
"license_id": args.license_id,
"license_type": args.license_type,
"company_name": args.company_name,
"contact_info": args.contact_info or "",
"issued_at": now.isoformat(),
"expires_at": expires_at.isoformat(),
"machine_id": args.machine_id or "*",
"features": features,
"version": "1.0"
}
# 生成授权文件
generator.generate_license(license_info, args.output)
print("\n授权信息:")
print(f" 授权ID: {license_info['license_id']}")
print(f" 授权类型: {license_info['license_type']}")
print(f" 公司名称: {license_info['company_name']}")
print(f" 有效期: {license_info['expires_at']}")
print(f" 功能: {', '.join(license_info['features'].keys())}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
授权系统快速验证脚本
- 直接验证 LicenseManager 的功能机器指纹授权状态签名与有效性
- 尝试条件允许时通过 Flask 测试客户端验证授权相关API
"""
import os
import sys
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # backend 目录
def main():
# 允许从backend导入
if BASE_DIR not in sys.path:
sys.path.append(BASE_DIR)
# 导入配置与授权管理器
from devices.utils.config_manager import ConfigManager
from devices.utils.license_manager import LicenseManager
cfg = ConfigManager()
# 确保LICENSE配置项存在并指向开发文件
cfg.set_config_value('LICENSE', 'path', 'trial_license_bind.json')
cfg.set_config_value('LICENSE', 'public_key', 'license_public_key.pem')
cfg.set_config_value('LICENSE', 'grace_days', '7')
lm = LicenseManager(cfg)
# 1) 机器指纹
machine_id = lm.get_machine_id()
print(f"Machine ID: {machine_id}")
# 2) 授权状态
status = lm.get_license_status(force_reload=True)
print("License Status:")
print(json.dumps({
'valid': status.valid,
'message': status.message,
'license_type': status.license_type,
'license_id': status.license_id,
'expires_at': status.expires_at.isoformat() if status.expires_at else None,
'features': status.features,
}, ensure_ascii=False, indent=2))
# 3) 验证授权文件(示例/开发文件通常为占位签名,会失败属正常)
dev_license_path = os.path.join(BASE_DIR, 'dev_license.json')
ok, msg = lm.verify_license_file(dev_license_path)
print(f"Verify dev_license.json -> ok={ok}, msg={msg}")
sample_license_path = os.path.join(BASE_DIR, 'sample_license.json')
if os.path.exists(sample_license_path):
ok2, msg2 = lm.verify_license_file(sample_license_path)
print(f"Verify sample_license.json -> ok={ok2}, msg={msg2}")
else:
print("sample_license.json not found; skip signature test for sample.")
# 3.1) 如果存在通过工具生成的试用授权文件,验证其签名(应为成功)
trial_license_path = os.path.join(BASE_DIR, 'trial_license.json')
if os.path.exists(trial_license_path):
ok3, msg3 = lm.verify_license_file(trial_license_path)
print(f"Verify trial_license.json -> ok={ok3}, msg={msg3}")
else:
print("trial_license.json not found; skip trial license verification.")
# 3.2) 验证绑定机器ID的试用授权文件如存在应成功
trial_bind_path = os.path.join(BASE_DIR, 'trial_license_bind.json')
if os.path.exists(trial_bind_path):
ok4, msg4 = lm.verify_license_file(trial_bind_path)
print(f"Verify trial_license_bind.json -> ok={ok4}, msg={msg4}")
else:
print("trial_license_bind.json not found; skip bind license verification.")
# 4) 尝试通过 Flask 测试客户端访问授权API如果 main.py 可正常导入)
try:
from main import AppServer
server = AppServer(debug=True)
# 手动注入授权管理器与状态,避免调用 init_app 引发硬件初始化
server.license_manager = lm
server.config_manager = cfg
server.license_status = status
client = server.app.test_client()
# /api/license/info
resp = client.get('/api/license/info')
print("GET /api/license/info ->", resp.status_code)
try:
print(json.dumps(resp.get_json(), ensure_ascii=False, indent=2))
except Exception:
print(resp.data.decode('utf-8', errors='ignore'))
# /api/license/activation-request
resp2 = client.post('/api/license/activation-request', json={
'company_name': '测试公司',
'contact_info': 'test@example.com'
})
print("POST /api/license/activation-request ->", resp2.status_code)
try:
print(json.dumps(resp2.get_json(), ensure_ascii=False, indent=2))
except Exception:
print(resp2.data.decode('utf-8', errors='ignore'))
# /api/license/verify 使用占位签名文件,会返回失败用于验证路径与流程
# 优先使用通过工具生成的 trial_license.json如存在否则使用 dev_license.json
# 优先顺序trial_license_bind.json -> trial_license.json -> dev_license.json
if os.path.exists(trial_bind_path):
post_file_path = trial_bind_path
elif os.path.exists(trial_license_path):
post_file_path = trial_license_path
else:
post_file_path = dev_license_path
with open(post_file_path, 'rb') as f:
data = {
'license_file': (f, os.path.basename(post_file_path))
}
resp3 = client.post('/api/license/verify', content_type='multipart/form-data', data=data)
print("POST /api/license/verify ->", resp3.status_code)
try:
print(json.dumps(resp3.get_json(), ensure_ascii=False, indent=2))
except Exception:
print(resp3.data.decode('utf-8', errors='ignore'))
except Exception as e:
print(f"Skip API tests due to import error or environment issue: {e}")
if __name__ == '__main__':
main()

View File

@ -0,0 +1,19 @@
{
"license_info": {
"license_id": "TRIAL-2025-001",
"license_type": "trial",
"company_name": "试用客户",
"contact_info": "",
"issued_at": "2025-10-28T09:29:58.838671Z",
"expires_at": "2025-11-27T09:29:58.838671Z",
"machine_id": "*",
"features": {
"recording": true,
"export": true,
"trial_limit_minutes": 30,
"max_sessions": 10
},
"version": "1.0"
},
"signature": "eiIev9M1OvY+n69WTzpRioAt4uSexHKu0awiIofAW/MeIBVxK39LwmhNVqkmWkGGn/Akv0JubNhtWOXAg6vf+B52m1BBdplM8D+5rEzCozZVf/Smylz8GZXPFtXsX0PVzz8T7tF9fq4Eu9reJs+L3iBwj6j5HItIj7zphmwnYhc1RVbjFdLM47IDCBcY1QBIYOZlIglgPjJqVI8mYsirEMwB5xVT32rk5dN8TotKDrUs+tv9nd8BoOzhIg8YaLytlUnMSOC2SXyi3Qn8DMawjVzZHijl+c82/VT5a2DO+m6c0nB3zvoqB40lboGJF//URwmiT/XeUMgAtui8JFThwQ=="
}

View File

@ -0,0 +1,18 @@
{
"product": "BodyBalanceEvaluation",
"version": "1.0",
"license_id": "TRIAL-2025-002",
"license_type": "trial",
"company_name": "试用客户",
"contact_info": "",
"issued_at": "2025-10-28T09:31:17.852321+00:00",
"expires_at": "2025-11-27T09:31:17.852321+00:00",
"machine_id": "W10-D13710C7BD317C29",
"features": {
"recording": true,
"export": true,
"trial_limit_minutes": 30,
"max_sessions": 10
},
"signature": "GOaHyLzqgmIDY0/ejaV/BF/VKF3kBh//G6WsS6dIDKn759UizrA0MQFDPmVWRLYCpCuJ+ChV2XB+Jnx+tjVsPh6eHThTbCs53o6AjH01p1uIBQvqphGWDVw5X8tsXdDNqfnrF08E2BXM4cQM5yt3R5RHhLUQbdAB3+ahxdL4u8RYh7upmYv1EyyGoV1ZvcM3x+BFvw9QPdbyt01O9uBmBK+YvNlRY74Q5fZ0jRrC/cbu7VWdX7HGHAIBaGZ3mYxUR7A5neBUaXVnM3d3wjJqWrd6cCsUm8ha/GfUxTop9NbxSAEwc5eHQFVxYOgv++iw8zwIRS+vx4PZPnmC7T952Q=="
}

View File

@ -0,0 +1,168 @@
# 软件使用授权控制方案BodyBalanceEvaluation
本文档针对 BodyBalanceEvaluation 的当前技术架构Electron + 前端、Python Flask 后端、Windows 打包)设计一套合理、可落地的软件使用授权控制方案。方案兼顾离线可用、安全可靠、易于运维与用户体验。
## 目标与原则
- 合法控制软件使用范围与授权期限,满足试用、商用等场景。
- 支持离线运行(无网络环境),可选在线激活与续期。
- 与当前项目架构低耦合,易于集成与维护。
- 安全合规:私钥不落地、授权文件不可伪造、关键流程可审计。
## 授权类型
- 试用授权Trial
- 有效期短(如 7/14/30 天),功能有限制(如录制时长、导出功能、带水印)。
- 标准授权Per-Device
- 绑定单机硬件指纹,在有效期内完整功能可用。
## 授权架构设计
- 离线优先 + 在线可选:
- 核心授权依赖本地授权文件License File
- 在线激活与续期通过授权服务器获取签名授权文件。
- 授权文件License格式JSON + 数字签名RSA/ECC
- 字段示例:
```json
{
"product": "BodyBalanceEvaluation",
"version": "1.0.0",
"license_id": "LIC-2025-0001",
"license_type": "per_device",
"machine_id": "W10-ABCDEF123456",
"issued_at": "2025-10-01T00:00:00Z",
"expires_at": "2026-10-01T00:00:00Z",
"features": { "recording": true, "export": true, "trial_limit_minutes": null },
"signature": "<RSA-SHA256-BASE64>"
}
```
- 私钥仅存放在授权服务器,后端内嵌公钥用于验证签名。
- 授权关联:
- `machine_id` 使用硬件指纹CPU/主板/磁盘序列/MAC 等)生成并哈希化。
- 授权文件通过签名绑定 `machine_id`,离线不可移植至不同机器。
## 集成点与流程
### 后端集成Python Flask
- 配置路径:统一通过设备侧 `ConfigManager` 读取授权文件路径与公钥路径。
- `backend/devices/utils/config_manager.py`
- `config.ini` 示例:
```ini
[LICENSE]
path = data/license.json
public_key = backend/license_pub.pem
grace_days = 3
```
- 初始化检查:在 `backend/main.py``AppServer.init_app()` 中执行:
- 生成或读取 `machine_id`(硬件指纹)。
- 加载授权文件,校验签名与有效期、匹配 `machine_id`
- 记录授权状态(有效、过期、试用、无授权)到日志与数据库(可选)。
- API 访问控制:
- 对关键路由施加授权检查(装饰器/中间件),如:
- `POST /api/detection/start`、`/stop`、数据导出、录制等。
- 授权失败返回明确的错误码与提示(含续期/激活建议)。
- 试用/降级策略:
- 提供最小可用功能(如实时预览)但限制录制时长或导出。
- 局部特性禁用或添加水印(在屏幕录制或图像导出路径中处理)。
- 容错与灰度:
- 支持 `grace_days` 宽限期(网络异常或时间漂移时可用)。
- 缓存最近一次有效授权状态,定时复核(避免频繁磁盘访问)。
### 前端集成Electron + 前端页面)
- 激活流程:
- 提供激活页:显示 `machine_id` 与授权状态。
- 在线激活:提交序列号/订单号至授权服务器,服务端返回签名授权文件;前端保存至 `[LICENSE].path`
- 离线激活:导出激活请求文件(含 `machine_id` 与产品信息),由管理员获取授权文件后导入。
- 交互与提示:
- 主界面顶部/设置页显示授权状态与到期时间。
- 到期前 7/15 天提醒,试用版显示限制说明。
- 授权失效时提供指引按钮(联系支持/打开激活页)。
## 安全策略与对抗
- 授权文件签名验证:后端使用公钥验证 `signature`,拒绝未签名或签名不匹配的授权。
- 硬件指纹防篡改:
- 组合多项硬件指标fallback 与去重策略避免单项变化导致不可用。
- 指纹哈希,不原样存储,保护隐私。
- 反调试与完整性检测(可选):
- 检测调试器或关键文件篡改(哈希校验),异常时降级或停止。
- 日志与审计:
- 授权校验、激活、失效事件写入日志与数据库,便于问题定位。
## 关键实现要点(建议代码位置)
- 授权模块:`backend/devices/utils/license_manager.py`(新增)
- 方法示例:
- `get_machine_id()`: 采集硬件信息并生成指纹。
- `load_license(path)`: 读取并解析授权文件。
- `verify_signature(license, public_key_path)`: 验证数字签名。
- `check_validity(license, machine_id, now, grace_days)`: 有效期与绑定校验。
- 后端入口:`backend/main.py`
- 在 `AppServer.init_app()` 中:
- 加载 `ConfigManager` → 初始化授权模块 → 设置 `self.license_status`
- 根据 `self.license_status` 控制设备初始化与 API 开放范围。
- 装饰器示例:
```python
def require_license(feature=None):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
status = current_app.server.license_status # 假设注入到 app 上下文
if not status.valid:
return jsonify({'success': False, 'error': status.message}), 403
if feature and not status.features.get(feature, True):
return jsonify({'success': False, 'error': f'未授权功能: {feature}'}), 403
return f(*args, **kwargs)
return wrapper
return decorator
```
## 运行与配置
- `config.ini`
```ini
[LICENSE]
path = D:/BodyCheck/License/license.json
public_key = D:/BodyCheck/License/license_pub.pem
grace_days = 7
```
- 授权文件放置:默认 `data/license.json`(开发环境),打包环境放置在 `exe` 同级 `data`
- 前端激活保存位置与读取与后端一致(通过 API 或直接写入文件)。
## 运维与发行
- 发行流程:
1. 客户安装并启动软件,生成 `machine_id`
2. 客服/授权服务器生成签名授权文件(绑定 `machine_id`)。
3. 客户端导入授权文件或在线激活自动拉取授权。
- 续期与升级:
- 续期生成新授权文件(同一 `license_id` 或新 ID替换旧文件。
- 新版本可通过 `version``features` 差异化控制功能包。
- 撤销与黑名单(可选,在线):
- 维护撤销列表并在在线校验时拒绝。
## 兼容与开发模式
- 开发/测试模式:
- 允许通过环境变量或配置启用 `dev_mode`,免授权检查,仅限开发构建。
- 生产打包时移除或加固该入口,避免被滥用。
## 风险与缓解
- 时间篡改:使用授权签发时间与过期时间 + 宽限期校验;可选在线时间校准。
- 硬件变更:提供有限容差与人工重新授权流程。
- 文件丢失:提示恢复授权或联系支持,保留试用/降级模式保障基本可用。
## 里程碑与落地计划(建议)
1. 新增 `license_manager.py` 与后端集成(校验、装饰器、状态注入)
2. 在 `main.py` 中接入授权校验,限制关键 API
3. 前端新增激活页与状态展示(在线/离线两个流程)
4. 授权服务器与私钥管理上线(可选轻量版)
5. 运维流程与支持文档完善(续期、撤销、迁移)
---
如需,我可以直接在代码库中新增 `license_manager.py``main.py` 的集成改造示例,及前端激活页的基本骨架。