BodyBalanceEvaluation/backend/main.py
2025-11-28 08:05:50 +08:00

2001 lines
89 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AppServer类 - 主应用服务器
实现Flask和SocketIO服务替代app.py功能
"""
import os
import sys
import json
import time
import threading
from datetime import datetime
from flask import Flask, jsonify
from flask import request as flask_request
from flask_cors import CORS
import logging
from flask_socketio import SocketIO, emit
import configparser
import argparse
# 添加当前目录到路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入模块
from database import DatabaseManager
from utils import config as app_config
from utils import DataValidator # 添加数据验证器导入
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.pressure_manager import PressureManager
from devices.femtobolt_manager import FemtoBoltManager
from devices.device_coordinator import DeviceCoordinator
from devices.screen_recorder import RecordingManager
from devices.utils.config_manager import ConfigManager
from devices.utils.license_manager import LicenseManager, LicenseStatus
class AppServer:
"""主应用服务器类"""
def __init__(self, host='0.0.0.0', port=5000, debug=False):
"""
初始化应用服务器
Args:
host: 服务器主机
port: 服务器端口
debug: 调试模式
"""
self.host = host
self.port = port
self.debug = debug
# 初始化日志
self._init_logging()
# Flask应用
self.app = Flask(__name__)
self.app.config['SECRET_KEY'] = 'body-balance-detection-system-2024'
# SocketIO
self._init_socketio()
# CORS配置
CORS(self.app, origins='*', supports_credentials=True,
allow_headers=['Content-Type', 'Authorization'],
methods=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'])
# 配置管理
self._init_config()
# 全局变量
self.db_manager = None
self.current_detection = None
self.detection_thread = None
# 授权管理
self.license_manager = None
self.license_status = None
# 数据推送状态
self.is_pushing_data = False
# 设备管理器
self.config_manager = None
self.device_coordinator = None
self.device_managers = {
'camera': None,
'femtobolt': None,
'imu': None,
'pressure': None
}
# 注册路由和事件
self._register_routes()
self._register_socketio_events()
def _init_logging(self):
"""初始化日志配置"""
# 日志目录
if getattr(sys, 'frozen', False):
# 打包后的可执行文件
log_dir = os.path.join(os.path.dirname(sys.executable), 'logs')
else:
# 开发环境
log_dir = 'logs'
# 创建日志目录
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'backend.log')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
# 设置werkzeug日志级别为WARNING过滤掉INFO级别的访问日志
logging.getLogger('werkzeug').setLevel(logging.WARNING)
self.logger = logging.getLogger(__name__)
def _init_socketio(self):
"""初始化SocketIO"""
try:
self.socketio = SocketIO(
self.app,
cors_allowed_origins='*',
async_mode='threading',
logger=False,
engineio_logger=False,
ping_timeout=60,
ping_interval=25,
manage_session=False,
always_connect=False,
transports=['polling', 'websocket'], # 优先使用polling
allow_upgrades=True, # 允许升级到websocket
cookie=None # 禁用cookie
)
self.logger.info('SocketIO初始化成功')
except Exception as e:
self.logger.error(f'SocketIO初始化失败: {e}')
self.socketio = None
# 设置SocketIO日志级别
logging.getLogger('socketio').setLevel(logging.WARNING)
logging.getLogger('engineio').setLevel(logging.WARNING)
def _init_config(self):
"""初始化配置"""
self.config = configparser.ConfigParser()
# 配置文件路径
if getattr(sys, 'frozen', False):
# 打包后的可执行文件
config_path = os.path.join(os.path.dirname(sys.executable), 'config.ini')
else:
# 开发环境
config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
self.config.read(config_path, encoding='utf-8')
camera1_index = self.config.get('CAMERA1', 'device_index', fallback=None)
camera2_index = self.config.get('CAMERA2', 'device_index', fallback=None)
print(f"相机1设备号: {camera1_index}, 相机2设备号: {camera2_index}")
def init_app(self):
"""初始化应用组件"""
try:
# 初始化配置管理器
self.logger.info('正在初始化配置管理器...')
self.config_manager = ConfigManager()
self.logger.info('配置管理器初始化完成')
# 初始化授权管理器
self.logger.info('正在初始化授权管理器...')
self.license_manager = LicenseManager(self.config_manager)
self.license_status = self.license_manager.get_license_status()
# 检查开发模式
dev_mode = self.config_manager.get_config_value('LICENSE', 'dev_mode', 'False').lower() == 'true'
if dev_mode:
self.logger.warning('开发模式已启用,跳过授权检查')
self.license_status = LicenseStatus(
valid=True,
message="开发模式",
license_type="dev",
features={"recording": True, "export": True}
)
else:
# 记录授权状态
if self.license_status.valid:
self.logger.info(f'授权验证成功 - 类型: {self.license_status.license_type}, '
f'到期时间: {self.license_status.expires_at}, '
f'授权ID: {self.license_status.license_id}')
else:
self.logger.warning(f'授权验证失败: {self.license_status.message}')
# 将授权状态注入到Flask应用上下文
self.app.license_status = self.license_status
self.logger.info('授权管理器初始化完成')
# 初始化数据库管理器
self.logger.info('正在初始化数据库管理器...')
db_path = self.config_manager.get_config_value('DATABASE', 'path', fallback=None)
self.logger.info(f'数据库文件路径: {db_path}')
# 确保数据库目录存在
try:
os.makedirs(os.path.dirname(db_path), exist_ok=True)
except Exception as e:
self.logger.error(f'创建数据库目录失败: {e}')
raise
self.db_manager = DatabaseManager(db_path)
self.db_manager.init_database()
self.logger.info('数据库管理器初始化完成')
# 初始化设备协调器(统一管理所有设备)
self.logger.info('正在初始化设备协调器...')
self.device_coordinator = DeviceCoordinator(self.socketio)
# 设置状态变化回调
self.device_coordinator.set_status_change_callback(self._on_device_status_change)
# 调用初始化方法来初始化设备
if self.device_coordinator.initialize():
self.logger.info('设备协调器初始化完成')
# 获取设备管理器实例
self.device_managers = self.device_coordinator.get_device_managers()
# 为每个设备添加状态变化回调(双重保险)
for device_name, manager in self.device_managers.items():
if manager and hasattr(manager, 'add_status_change_callback'):
manager.add_status_change_callback(self._on_device_status_change)
self.logger.info(f'已获取设备管理器: {list(self.device_managers.keys())}')
else:
self.logger.warning('设备协调器初始化失败,但系统将继续运行')
self.device_managers = {} # 初始化为空字典以避免后续错误
# 初始化录制管理器
self.logger.info('正在初始化录制管理器...')
camera1_manager = self.device_managers.get('camera1')
camera2_manager = self.device_managers.get('camera2')
femtobolt_manager = self.device_managers.get('femtobolt')
pressure_manager = self.device_managers.get('pressure')
# 录制管理器当前采用屏幕区域截取方式进行相机录制,不依赖 CameraManager
# 但保留其他设备管理器以便后续扩展如FemtoBolt、压力传感器
self.recording_manager = RecordingManager(
camera_manager=None,
db_manager=self.db_manager,
femtobolt_manager=femtobolt_manager,
pressure_manager=pressure_manager,
config_manager=self.config_manager
)
self.logger.info('录制管理器初始化完成')
# 启动Flask应用
host = self.host
port = self.port
debug = self.debug
self.logger.info(f'启动Flask应用 - Host: {host}, Port: {port}, Debug: {debug}')
if self.socketio:
self.socketio.run(self.app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True)
else:
self.app.run(host=host, port=port, debug=debug)
except Exception as e:
self.logger.error(f'应用初始化失败: {e}')
raise
def require_license(self, feature=None):
"""
授权检查装饰器
Args:
feature: 需要检查的特定功能,如 'recording', 'export'
"""
from functools import wraps
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
# 获取当前授权状态
if hasattr(self.app, 'license_status'):
status = self.app.license_status
else:
status = LicenseStatus(valid=False, message="授权状态未初始化")
# 检查基本授权
if not status.valid:
return jsonify({
'success': False,
'error': f'授权验证失败: {status.message}',
'license_required': True,
'license_type': status.license_type
}), 403
# 检查特定功能授权
if feature and not status.features.get(feature, True):
return jsonify({
'success': False,
'error': f'未授权功能: {feature}',
'license_required': True,
'feature_required': feature
}), 403
return f(*args, **kwargs)
return wrapper
return decorator
def get_license_info(self):
"""获取授权信息"""
if not self.license_manager:
return {
'valid': False,
'message': '授权管理器未初始化',
'machine_id': None
}
# 刷新授权状态
status = self.license_manager.get_license_status(force_reload=True)
machine_id = self.license_manager.get_machine_id()
return {
'valid': status.valid,
'message': status.message,
'license_type': status.license_type,
'license_id': status.license_id,
'expires_at': status.expires_at.isoformat() if status.expires_at else None,
'features': status.features,
'machine_id': machine_id
}
def _register_routes(self):
"""注册Flask路由"""
# ==================== 基础API ====================
@self.app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# ==================== 授权API ====================
@self.app.route('/api/license/info', methods=['GET'])
def get_license_info():
"""获取授权信息"""
try:
if not self.license_status:
return jsonify({
'success': False,
'error': '授权管理器未初始化'
}), 500
return jsonify({
'success': True,
'data': {
'valid': self.license_status.valid,
'message': self.license_status.message,
'license_type': self.license_status.license_type,
'license_id': self.license_status.license_id,
'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None,
'features': self.license_status.features,
'machine_id': self.license_manager.get_machine_id() if self.license_manager else None
}
})
except Exception as e:
self.logger.error(f'获取授权信息失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/license/activation-request', methods=['POST'])
def generate_activation_request():
"""生成离线激活请求文件"""
try:
if not self.license_manager:
return jsonify({
'success': False,
'error': '授权管理器未初始化'
}), 500
data = flask_request.get_json()
company_name = data.get('company_name', '未知公司')
contact_info = data.get('contact_info', '')
request_file = self.license_manager.generate_activation_request(
company_name=company_name,
contact_info=contact_info
)
# 读取文件内容返回给前端便于直接下载
content_text = None
try:
with open(request_file, 'r', encoding='utf-8') as f:
content_text = f.read()
except Exception as e:
self.logger.warning(f'读取激活请求文件失败: {e}')
return jsonify({
'success': True,
'data': {
'request_file': request_file,
'content': content_text,
'machine_id': self.license_manager.get_machine_id() if self.license_manager else None,
'message': f'激活请求文件已生成: {request_file}'
}
})
except Exception as e:
self.logger.error(f'生成激活请求失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/license/verify', methods=['POST'])
def verify_license_file():
"""验证授权文件"""
try:
if not self.license_manager:
return jsonify({
'success': False,
'error': '授权管理器未初始化'
}), 500
# 检查是否有文件上传
if 'license_file' not in flask_request.files:
return jsonify({
'success': False,
'error': '未找到授权文件'
}), 400
file = flask_request.files['license_file']
if file.filename == '':
return jsonify({
'success': False,
'error': '未选择文件'
}), 400
# 保存临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
file.save(temp_file.name)
temp_path = temp_file.name
try:
# 验证授权文件
is_valid, message = self.license_manager.verify_license_file(temp_path)
if is_valid:
# 覆盖系统授权文件为上传的文件
try:
license_path_cfg = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json') if self.config_manager else 'data/license.json'
# 解析目标路径为绝对路径
if not os.path.isabs(license_path_cfg):
base_dir = os.path.dirname(os.path.abspath(__file__))
license_path_cfg = os.path.join(base_dir, license_path_cfg)
os.makedirs(os.path.dirname(license_path_cfg), exist_ok=True)
# 移动/覆盖授权文件
import shutil
shutil.copyfile(temp_path, license_path_cfg)
except Exception as e:
self.logger.error(f'保存授权文件失败: {e}')
return jsonify({'success': False, 'error': f'保存授权文件失败: {str(e)}'}), 500
# 更新授权状态(强制刷新)
self.license_status = self.license_manager.get_license_status(force_reload=True)
return jsonify({
'success': True,
'data': {
'valid': True,
'message': message,
'license_info': {
'license_type': self.license_status.license_type,
'license_id': self.license_status.license_id,
'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None,
'features': self.license_status.features
}
}
})
else:
return jsonify({
'success': False,
'error': message
}), 400
finally:
# 清理临时文件
try:
os.unlink(temp_path)
except:
pass
except Exception as e:
self.logger.error(f'验证授权文件失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/license/activate-package', methods=['POST'])
def activate_license_package():
"""上传授权包ZIP解压到配置指定目录并刷新授权"""
try:
if not self.license_manager:
return jsonify({'success': False, 'error': '授权管理器未初始化'}), 500
if 'package_zip' not in flask_request.files:
return jsonify({'success': False, 'error': '未找到授权包压缩文件'}), 400
file = flask_request.files['package_zip']
if file.filename == '':
return jsonify({'success': False, 'error': '未选择文件'}), 400
import tempfile, zipfile, shutil
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_file:
file.save(temp_file.name)
temp_path = temp_file.name
try:
# 目标目录LICENSE.path 的目录
license_path_cfg = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json') if self.config_manager else 'data/license.json'
if not os.path.isabs(license_path_cfg):
base_dir = os.path.dirname(os.path.abspath(__file__))
license_path_cfg = os.path.join(base_dir, license_path_cfg)
target_dir = os.path.dirname(license_path_cfg)
os.makedirs(target_dir, exist_ok=True)
# 安全解压,防止路径遍历
with zipfile.ZipFile(temp_path, 'r') as zip_ref:
for member in zip_ref.namelist():
safe_member = os.path.normpath(member)
if ('..' in safe_member) or safe_member.startswith('/') or safe_member.startswith('\\') or os.path.isabs(safe_member):
return jsonify({'success': False, 'error': '压缩包包含非法路径'}), 400
zip_ref.extractall(target_dir)
# 刷新授权
self.license_status = self.license_manager.get_license_status(force_reload=True)
return jsonify({
'success': True,
'data': {
'message': '激活包已安装,请重启系统生效',
'valid': self.license_status.valid,
'license_type': self.license_status.license_type,
'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None
}
})
finally:
try:
os.unlink(temp_path)
except Exception:
pass
except Exception as e:
self.logger.error(f'激活包安装失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 静态文件服务 ====================
@self.app.route('/<path:filename>', methods=['GET'])
def serve_static_files(filename):
try:
# 读取配置中的文件存储根目录(优先使用 FILEPATH.path
cfg_dir = self.config_manager.get_config_value('FILEPATH', 'path', fallback=None)
if not cfg_dir:
return jsonify({'error': '未配置文件存储路径'}), 500
# 规范化允许目录路径,消除末尾斜杠、大小写和符号链接影响
cfg_dir = os.path.abspath(os.path.realpath(os.path.normpath(cfg_dir)))
# 安全检查:防止路径遍历攻击
safe_path = os.path.normpath(filename)
if (
'..' in safe_path or
safe_path.startswith('/') or # POSIX绝对路径
safe_path.startswith('\\\\') or # Windows UNC路径
os.path.isabs(safe_path) # Windows盘符绝对路径
):
return jsonify({'error': '非法路径'}), 400
file_path = os.path.join(cfg_dir, safe_path)
# 检查文件是否存在
if not os.path.exists(file_path):
return jsonify({'error': '文件不存在'}), 404
# 检查是否在允许的目录内
if os.path.commonpath([cfg_dir, file_path]) != cfg_dir:
return jsonify({'error': '访问被拒绝'}), 403
# 返回文件
from flask import send_file
# 为视频文件设置正确的MIME类型和响应头
if file_path.lower().endswith(('.mp4', '.webm', '.avi', '.mov')):
response = send_file(file_path, mimetype='video/mp4')
# 添加支持视频流播放的响应头
response.headers['Accept-Ranges'] = 'bytes'
response.headers['Content-Type'] = 'video/mp4'
return response
else:
return send_file(file_path)
except Exception as e:
self.logger.error(f'静态文件服务错误: {e}')
return jsonify({'error': '服务器内部错误'}), 500
@self.app.route('/test-socketio')
def test_socketio():
"""测试SocketIO连接"""
return '<h1>SocketIO Test Page</h1><script src="/socket.io/socket.io.js"></script>'
@self.app.route('/api/health', methods=['GET'])
def api_health_check():
"""API健康检查"""
return jsonify({
'success': True,
'message': '后端服务运行正常',
'timestamp': datetime.now().isoformat(),
'database': self.db_manager is not None,
'config_manager': self.config_manager is not None,
'device_coordinator': self.device_coordinator is not None,
'device_managers': {name: manager is not None for name, manager in self.device_managers.items()}
})
# ==================== 认证API ====================
@self.app.route('/api/auth/login', methods=['POST'])
def login():
"""用户登录"""
try:
# 检查Content-Type
if not flask_request.is_json:
return jsonify({'success': False, 'message': '请求Content-Type必须为application/json'}), 415
data = flask_request.get_json(force=True)
if not data:
return jsonify({'success': False, 'message': '请求数据为空'}), 400
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'success': False, 'message': '用户名和密码不能为空'}), 400
# 验证用户
user = self.db_manager.authenticate_user(username, password)
if user:
# 检查用户是否已激活
if not user['is_active']:
return jsonify({
'success': False,
'message': '账户未激活,请联系管理员审核'
}), 403
# 构建用户数据
user_data = {
'id': user['id'],
'username': user['username'],
'name': user['name'],
'role': 'admin' if user['user_type'] == 'admin' else 'user',
'user_type': user['user_type'],
'avatar': '',
'phone': user['phone']
}
# 生成token实际项目中应使用JWT等安全token
token = f"token_{user['username']}_{int(time.time())}"
self.logger.info(f'用户 {username} 登录成功')
return jsonify({
'success': True,
'data': {
'token': token,
'user': user_data
},
'message': '登录成功'
})
else:
self.logger.warning(f'用户 {username} 登录失败:用户名或密码错误')
return jsonify({
'success': False,
'message': '用户名或密码错误'
}), 401
except Exception as e:
self.logger.error(f'登录失败: {e}')
return jsonify({'success': False, 'message': '登录失败'}), 500
@self.app.route('/api/auth/register', methods=['POST'])
def register():
"""用户注册"""
try:
# 检查Content-Type
if not flask_request.is_json:
return jsonify({'success': False, 'message': '请求Content-Type必须为application/json'}), 415
data = flask_request.get_json(force=True)
username = data.get('username')
password = data.get('password')
name = data.get('name') or data.get('email', '')
phone = data.get('phone')
if not username or not password:
return jsonify({
'success': False,
'message': '用户名和密码不能为空'
}), 400
if len(password) < 6:
return jsonify({
'success': False,
'message': '密码长度不能少于6位'
}), 400
# 构建用户数据字典
user_data = {
'username': username,
'password': password,
'name': name,
'phone': phone
}
# 使用数据库注册用户
result = self.db_manager.register_user(user_data)
if result['success']:
self.logger.info(f'用户 {username} 注册成功,等待管理员审核')
return jsonify({
'success': True,
'message': '注册成功,请等待管理员审核后登录'
})
else:
return jsonify({
'success': False,
'message': result['message']
}), 400
except Exception as e:
self.logger.error(f'注册失败: {e}')
return jsonify({'success': False, 'message': '注册失败'}), 500
@self.app.route('/api/auth/logout', methods=['POST'])
def logout():
"""用户登出"""
try:
# 这里可以添加token失效逻辑
return jsonify({'success': True, 'message': '登出成功'})
except Exception as e:
self.logger.error(f'登出失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/auth/verify', methods=['GET'])
def verify_token():
"""验证token"""
try:
# 从请求头获取token
auth_header = flask_request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return jsonify({
'success': False,
'message': '未提供有效的认证信息',
'data': {'valid': False}
}), 401
token = auth_header.split(' ')[1]
# 这里应该验证JWT token简化处理
if token.startswith('token_'):
return jsonify({
'success': True,
'message': 'Token有效',
'data': {'valid': True}
})
else:
return jsonify({
'success': False,
'message': 'Token无效',
'data': {'valid': False}
}), 401
except Exception as e:
self.logger.error(f'Token验证失败: {e}')
return jsonify({
'success': False,
'message': '验证失败',
'data': {'valid': False}
}), 500
@self.app.route('/api/auth/forgot-password', methods=['POST'])
def forgot_password():
"""忘记密码 - 根据用户名和手机号找回密码"""
try:
data = flask_request.get_json()
username = data.get('username')
phone = data.get('phone')
if not username:
return jsonify({
'success': False,
'error': '请输入用户名'
}), 400
if not phone:
return jsonify({
'success': False,
'error': '请输入手机号码'
}), 400
# 验证手机号格式
import re
phone_pattern = r'^1[3-9]\d{9}$'
if not re.match(phone_pattern, phone):
return jsonify({
'success': False,
'error': '手机号格式不正确'
}), 400
# 查询用户信息
conn = self.db_manager.get_connection()
cursor = conn.cursor()
cursor.execute('''
SELECT username, password, phone FROM users
WHERE username = ? AND phone = ?
''', (username, phone))
user = cursor.fetchone()
if user:
# 用户存在且手机号匹配,返回数据库中存储的实际密码
actual_password = user['password']
self.logger.info(f'用户 {username} 密码查询成功')
return jsonify({
'success': True,
'password': actual_password, # 返回数据库中存储的实际密码
'message': '密码找回成功'
})
else:
# 检查用户是否存在
cursor.execute('SELECT username FROM users WHERE username = ?', (username,))
user_exists = cursor.fetchone()
if not user_exists:
return jsonify({
'success': False,
'error': '用户不存在'
}), 400
else:
return jsonify({
'success': False,
'error': '手机号不匹配'
}), 400
except Exception as e:
self.logger.error(f'忘记密码处理失败: {e}')
return jsonify({'success': False, 'error': '处理失败'}), 500
# ==================== 用户管理API ====================
@self.app.route('/api/users', methods=['GET'])
def get_users():
"""获取用户列表"""
try:
page = int(flask_request.args.get('page', 1))
size = int(flask_request.args.get('size', 10))
users = self.db_manager.get_users(page, size)
total = self.db_manager.get_users_count()
return jsonify({
'success': True,
'data': {
'users': users,
'total': total,
'page': page,
'size': size
}
})
except Exception as e:
self.logger.error(f'获取用户列表失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/users/<int:user_id>/approve', methods=['POST'])
def approve_user(user_id):
"""审核用户"""
try:
data = flask_request.get_json()
status = data.get('status') # 'approved' 或 'rejected'
if status not in ['approved', 'rejected']:
return jsonify({'success': False, 'error': '无效的审核状态'}), 400
# 使用数据库层已有的审核方法
try:
self.db_manager.approve_user(user_id, approved=(status == 'approved'))
return jsonify({
'success': True,
'message': f'用户已{"通过" if status == "approved" else "拒绝"}审核'
})
except Exception:
return jsonify({'success': False, 'error': '用户不存在或审核失败'}), 404
except Exception as e:
self.logger.error(f'审核用户失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/users/<int:user_id>', methods=['DELETE'])
def delete_user(user_id):
"""删除用户"""
try:
result = self.db_manager.delete_user(user_id)
if result:
return jsonify({'success': True, 'message': '用户已删除'})
else:
return jsonify({'success': False, 'error': '用户不存在'}), 404
except Exception as e:
self.logger.error(f'删除用户失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 患者管理API ====================
@self.app.route('/api/patients', methods=['GET', 'POST'])
def handle_patients():
"""患者管理"""
if flask_request.method == 'GET':
# 获取患者列表
try:
page = int(flask_request.args.get('page', 1))
size = int(flask_request.args.get('size', 10))
search = flask_request.args.get('search', '')
patients = self.db_manager.get_patients(page, size, search)
total = self.db_manager.get_patients_count(search)
return jsonify({
'success': True,
'data': {
'patients': patients,
'total': total,
'page': page,
'size': size
}
})
except Exception as e:
self.logger.error(f'获取患者列表失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
elif flask_request.method == 'POST':
# 创建新患者
try:
data = flask_request.get_json()
# 验证患者数据
validation_result = DataValidator.validate_patient_data(data)
if not validation_result['valid']:
return jsonify({
'success': False,
'error': '; '.join(validation_result['errors'])
}), 400
# 准备患者数据
patient_data = {
'name': validation_result['data'].get('name'),
'gender': validation_result['data'].get('gender'),
'birth_date': validation_result['data'].get('birth_date'),
'nationality': validation_result['data'].get('nationality'),
'residence': validation_result['data'].get('residence'),
'height': validation_result['data'].get('height'),
'weight': validation_result['data'].get('weight'),
'shoe_size': validation_result['data'].get('shoe_size'),
'phone': validation_result['data'].get('phone'),
'email': validation_result['data'].get('email'),
'occupation': validation_result['data'].get('occupation'),
'workplace': validation_result['data'].get('workplace'),
'idcode': validation_result['data'].get('idcode'),
'medical_history': validation_result['data'].get('medical_history'),
'notes': validation_result['data'].get('notes')
}
# 创建患者
patient_id = self.db_manager.create_patient(patient_data)
# 获取创建的患者信息
patient = self.db_manager.get_patient(patient_id)
return jsonify({
'success': True,
'message': '患者创建成功',
'data': {
'patient_id': patient_id,
'patient': patient
}
})
except Exception as e:
self.logger.error(f'创建患者失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/patients/<patient_id>', methods=['GET', 'PUT', 'DELETE'])
def handle_patient(patient_id):
"""单个患者操作"""
if flask_request.method == 'GET':
# 获取患者详情
try:
patient = self.db_manager.get_patient(patient_id)
return jsonify({'success': True, 'data': patient})
except Exception as e:
self.logger.error(f'获取患者详情失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
elif flask_request.method == 'PUT':
# 更新患者信息
try:
data = flask_request.get_json()
patient_data = {
'name': data.get('name'),
'gender': data.get('gender'),
'age': data.get('age'),
'birth_date': data.get('birth_date'),
'nationality': data.get('nationality'),
'residence': data.get('residence'),
'height': data.get('height'),
'weight': data.get('weight'),
'shoe_size': data.get('shoe_size'),
'phone': data.get('phone'),
'email': data.get('email'),
'occupation': data.get('occupation'),
'workplace': data.get('workplace'),
'idcode': data.get('idcode'),
'medical_history': data.get('medical_history'),
'notes': data.get('notes')
}
self.db_manager.update_patient(patient_id, patient_data)
return jsonify({'success': True, 'message': '患者信息更新成功'})
except Exception as e:
self.logger.error(f'更新患者信息失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
elif flask_request.method == 'DELETE':
# 删除患者
try:
result = self.db_manager.delete_patient(patient_id)
return jsonify({'success': True, 'message': '患者已删除'})
except Exception as e:
self.logger.error(f'删除患者失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 设备管理API ====================
@self.app.route('/api/devices/status', methods=['GET'])
def get_device_status():
"""获取设备状态"""
try:
if self.device_coordinator:
status = self.device_coordinator.get_device_status()
return jsonify({'success': True, 'data': status})
else:
return jsonify({'success': False, 'error': '设备协调器未初始化'}), 500
except Exception as e:
self.logger.error(f'获取设备状态失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 设备配置API ====================
@self.app.route('/api/config/devices', methods=['GET'])
def get_all_device_configs():
"""获取所有设备配置"""
try:
if self.config_manager:
configs = self.config_manager.get_all_device_configs()
return jsonify({
'success': True,
'data': configs
})
else:
return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500
except Exception as e:
self.logger.error(f"获取设备配置失败: {e}")
return jsonify({
'success': False,
'message': f'获取设备配置失败: {str(e)}'
}), 500
@self.app.route('/api/config/devices/all', methods=['POST'])
def set_all_device_configs():
"""批量设置所有设备配置"""
try:
if not self.config_manager:
return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500
data = flask_request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
# 验证数据格式
supported_devices = ['camera', 'femtobolt','imu','pressure']
for device_name in data.keys():
if device_name not in supported_devices:
return jsonify({
'success': False,
'message': f'不支持的设备类型: {device_name},支持的设备类型: {", ".join(supported_devices)}'
}), 400
result = self.config_manager.set_all_device_configs(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"批量设置设备配置失败: {e}")
return jsonify({
'success': False,
'message': f'批量设置设备配置失败: {str(e)}'
}), 500
@self.app.route('/api/devices/calibrate', methods=['POST'])
def calibrate_device():
"""校准设备"""
try:
data = flask_request.get_json()
device_type = data.get('device_type')
if not device_type:
return jsonify({'success': False, 'error': '设备类型不能为空'}), 400
if device_type in self.device_managers and self.device_managers[device_type]:
device_manager = self.device_managers[device_type]
if hasattr(device_manager, 'calibrate'):
result = device_manager.calibrate()
return jsonify({'success': True, 'data': result})
else:
return jsonify({'success': False, 'error': f'{device_type}设备不支持校准'}), 400
else:
return jsonify({'success': False, 'error': f'{device_type}设备管理器未初始化'}), 500
except Exception as e:
self.logger.error(f'校准设备失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/devices/calibrate/imu', methods=['POST'])
def calibrate_imu():
"""校准IMU"""
try:
if self.device_managers['imu']:
result = self.device_managers['imu'].calibrate()
if result:
return jsonify({'success': True, 'message': 'IMU校准成功'})
else:
return jsonify({'success': False, 'error': 'IMU校准失败'}), 500
else:
return jsonify({'success': False, 'error': 'IMU设备管理器未初始化'}), 500
except Exception as e:
self.logger.error(f'IMU校准失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 检测API ====================
@self.app.route('/api/detection/start', methods=['POST'])
def start_detection():
"""开始检测"""
try:
if not self.db_manager or not self.device_coordinator:
return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500
data = flask_request.get_json()
patient_id = data.get('patient_id')
creator_id = data.get('creator_id')
if not patient_id or not creator_id:
return jsonify({'success': False, 'error': '缺少患者ID或创建人ID'}), 400
# 调用create_detection_session方法settings传空字典
session_id = self.db_manager.create_detection_session(patient_id, settings={}, creator_id=creator_id)
self.logger.info('检测开始,设备连接监控已启动')
# 启动设备连接监控
if self.device_coordinator:
try:
self.device_coordinator.start_all_connection_monitor()
self.logger.info('检测开始,设备连接监控已启动')
except Exception as monitor_error:
self.logger.error(f'启动设备连接监控失败: {monitor_error}')
return jsonify({'success': True, 'session_id': session_id})
except Exception as e:
self.logger.error(f'开始检测失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/has_data', methods=['GET'])
def has_session_detection_data(session_id: str):
"""检查指定会话是否存在检测数据,用于判断单次检测是否有效"""
try:
if not self.db_manager:
return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500
exists = self.db_manager.has_session_detection_data(session_id)
return jsonify({'success': True, 'session_id': session_id, 'has_data': bool(exists)})
except Exception as e:
self.logger.error(f'检查会话检测数据存在失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/stop', methods=['POST'])
def stop_detection(session_id):
"""停止检测"""
try:
if not self.db_manager or not self.device_coordinator:
self.logger.error('数据库管理器或设备管理器未初始化')
return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500
if not session_id:
self.logger.error('缺少会话ID')
return jsonify({
'success': False,
'error': '缺少会话ID'
}), 400
if self.db_manager.is_empty_session(session_id):
# 删除空白会话
self.db_manager.delete_detection_session(session_id)
self.logger.info(f'已删除空白会话: {session_id}')
return jsonify({
'success': True,
'message': '空白会话已删除'
})
else:
# 正常会话的停止流程:调用数据库层结束检测,自动计算时长并写入结束信息
data = flask_request.get_json() or {}
diagnosis_info = data.get('diagnosis_info')
treatment_info = data.get('treatment_info')
suggestion_info = data.get('suggestion_info')
success = self.db_manager.update_session_endcheck(
session_id,
diagnosis_info=diagnosis_info,
treatment_info=treatment_info,
suggestion_info=suggestion_info
)
if success:
self.logger.info(f'检测会话已结束检查 - 会话ID: {session_id}')
return jsonify({
'success': True,
'message': '检测已结束并已写入总结信息'
})
else:
self.logger.error('停止检测失败,更新会话状态失败')
return jsonify({
'success': False,
'error': '停止检测失败'
}), 500
except Exception as e:
self.logger.error(f'停止检测失败: {e}', exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/start_record', methods=['POST'])
def start_record(session_id):
"""开始视频录制"""
try:
if not self.db_manager or not self.device_coordinator:
return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500
data = flask_request.get_json()
patient_id = data.get('patient_id')
screen_location = data.get('screen_location') # [0,0,1920,1080]
femtobolt_location = data.get('femtobolt_location') # [0,0,640,480]
camera1_location = data.get('camera1_location') # [0,0,640,480]
camera2_location = data.get('camera2_location') # [0,0,640,480]
if not patient_id:
return jsonify({'success': False, 'error': '缺少患者ID'}), 400
# 开始视频录制
recording_response = None
try:
recording_response = self.recording_manager.start_recording(session_id, patient_id,screen_location,camera1_location,camera2_location,femtobolt_location)
# 处理录制管理器返回的数据库更新信息
if recording_response and recording_response.get('success') and 'database_updates' in recording_response:
db_updates = recording_response['database_updates']
try:
# 保存检测视频记录(映射到 detection_video 表字段)
video_paths = db_updates.get('video_paths', {})
video_record = {
'screen_video_path': video_paths.get('screen_video_path'),
'femtobolt_video_path': video_paths.get('femtobolt_video_path'),
'camera1_video_path': video_paths.get('camera1_video_path'),
'camera2_video_path': video_paths.get('camera2_video_path'),
}
try:
self.db_manager.save_detection_video(db_updates['session_id'], video_record)
except Exception as video_err:
self.logger.error(f'保存检测视频记录失败: {video_err}')
self.logger.info(f'数据库更新成功 - 会话ID: {db_updates["session_id"]}')
except Exception as db_error:
self.logger.error(f'处理数据库更新失败: {db_error}')
except Exception as rec_e:
self.logger.error(f'开始同步录制失败: {rec_e}')
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return jsonify({'success': True, 'session_id': session_id, 'detectionStartTime': start_time, 'recording': recording_response})
except Exception as e:
self.logger.error(f'开始录制失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/stop_record', methods=['POST'])
def stop_record(session_id):
"""停止视频录制"""
try:
if not session_id:
self.logger.error('缺少会话ID')
return jsonify({
'success': False,
'error': '缺少会话ID'
}), 400
# 停止同步录制,传递视频数据
try:
restrt = self.recording_manager.stop_recording(session_id)
self.logger.info(f'停止录制结果: {restrt}')
except Exception as rec_e:
self.logger.error(f'停止同步录制失败: {rec_e}', exc_info=True)
raise
return jsonify({'success': True,'msg': '停止录制成功'})
except Exception as e:
self.logger.error(f'停止检测失败: {e}', exc_info=True)
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/status', methods=['GET'])
def get_detection_status(session_id):
"""获取检测状态"""
try:
if not self.db_manager:
return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500
if not session_id:
return jsonify({
'success': False,
'error': '缺少会话ID'
}), 400
# 获取会话数据
session_data = self.db_manager.get_session_data(session_id)
if session_data:
return jsonify({
'success': True,
'data': session_data
})
else:
return jsonify({
'success': False,
'error': '会话不存在'
}), 404
except Exception as e:
self.logger.error(f'获取检测状态失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/save-info', methods=['POST'])
def save_session_info(session_id):
"""保存会话信息(诊断、处理、建议、状态)"""
try:
if not self.db_manager:
return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500
if not session_id:
return jsonify({
'success': False,
'error': '缺少会话ID'
}), 400
# 获取请求数据
data = flask_request.get_json() or {}
diagnosis_info = data.get('diagnosis_info')
treatment_info = data.get('treatment_info')
suggestion_info = data.get('suggestion_info')
status = data.get('status')
# 验证至少提供一个要更新的字段
if not any([diagnosis_info, treatment_info, suggestion_info, status]):
return jsonify({
'success': False,
'error': '至少需要提供一个要更新的字段diagnosis_info, treatment_info, suggestion_info, status'
}), 400
# 调用数据库管理器的批量更新方法
self.db_manager.update_session_all_info(
session_id=session_id,
diagnosis_info=diagnosis_info,
treatment_info=treatment_info,
suggestion_info=suggestion_info,
status=status
)
# 构建更新信息反馈
updated_fields = []
if diagnosis_info is not None:
updated_fields.append('诊断信息')
if treatment_info is not None:
updated_fields.append('处理信息')
if suggestion_info is not None:
updated_fields.append('建议信息')
if status is not None:
updated_fields.append(f'状态({status})')
self.logger.info(f'会话信息保存成功: {session_id}, 更新字段: {", ".join(updated_fields)}')
return jsonify({
'success': True,
'message': f'会话信息保存成功,更新字段: {", ".join(updated_fields)}',
'data': {
'session_id': session_id,
'updated_fields': updated_fields
}
})
except Exception as e:
self.logger.error(f'保存会话信息失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/<session_id>/save-data', methods=['POST'])
def save_detection_data(session_id):
"""采集检测数据"""
try:
if not self.db_manager:
return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500
if not self.device_coordinator:
return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500
# 获取请求数据
data = flask_request.get_json() or {}
# print(f"接收到的data数据: {data}")
patient_id = data.get('patient_id')
# 如果没有提供patient_id从会话信息中获取
if not patient_id:
session_data = self.db_manager.get_session_data(session_id)
if not session_data:
return jsonify({
'success': False,
'error': '检测会话不存在'
}), 404
patient_id = session_data.get('patient_id')
if not patient_id:
return jsonify({
'success': False,
'error': '无法获取患者ID'
}), 400
# 调用录制管理器保存检测截图到文件
collected_data = self.recording_manager.save_detection_images(
session_id=session_id,
patient_id=patient_id,
detection_data=data
)
# 将采集的数据保存到数据库
if collected_data:
self.db_manager.save_detection_data(session_id, collected_data)
self.logger.info(f'检测数据采集并保存成功: {session_id}')
return jsonify({
'success': True,
'data': {
'session_id': session_id,
'timestamp': collected_data.get('timestamp'),
'data_collected': bool(collected_data)
},
'message': '数据采集成功'
})
except Exception as e:
self.logger.error(f'采集检测数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/history/sessions', methods=['GET'])
def get_detection_sessions():
"""获取检测会话历史"""
try:
page = int(flask_request.args.get('page', 1))
size = int(flask_request.args.get('size', 10))
patient_id = flask_request.args.get('patient_id')
sessions = self.db_manager.get_detection_sessions(page, size, patient_id)
total = self.db_manager.get_sessions_count(patient_id)
return jsonify({
'success': True,
'data': {
'sessions': sessions,
'total': total,
'page': page,
'size': size
}
})
except Exception as e:
self.logger.error(f'获取检测历史失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/history/sessions/<session_id>', methods=['GET'])
def get_session_data(session_id):
"""获取会话详细数据"""
try:
session_data = self.db_manager.get_session_data(session_id)
if session_data is None:
return jsonify({'success': False, 'error': '会话不存在'}), 404
return jsonify({
'success': True,
'data': session_data
})
except Exception as e:
self.logger.error(f'获取会话数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/data/details', methods=['GET'])
def get_detection_data_by_ids():
"""根据多个主键ID查询检测数据详情ids为逗号分隔"""
try:
ids_param = flask_request.args.get('ids')
if not ids_param:
return jsonify({'success': False, 'error': '缺少ids参数'}), 400
ids = [i.strip() for i in ids_param.split(',') if i.strip()]
data_list = self.db_manager.get_detection_data_by_ids(ids)
return jsonify({'success': True, 'data': data_list})
except Exception as e:
self.logger.error(f'批量获取检测数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/data/<data_id>', methods=['DELETE'])
def delete_detection_data(data_id):
"""删除检测数据记录支持单个或多个ID多个用逗号分隔"""
try:
if not data_id:
return jsonify({'success': False, 'error': '未提供检测数据ID'}), 400
# 支持批量:逗号分隔
ids = [i.strip() for i in str(data_id).split(',') if i.strip()]
payload = ids if len(ids) > 1 else (ids[0] if ids else data_id)
success = self.db_manager.delete_detection_data(payload)
if success:
return jsonify({
'success': True,
'message': '检测数据删除成功',
'deleted_ids': ids
})
else:
return jsonify({'success': False, 'error': '检测数据删除失败'}), 500
except ValueError as e:
return jsonify({'success': False, 'error': str(e)}), 404
except Exception as e:
self.logger.error(f'删除检测数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/video/<video_id>', methods=['DELETE'])
def delete_detection_video(video_id):
"""删除检测视频记录支持单个或多个ID多个用逗号分隔"""
try:
if not video_id:
return jsonify({'success': False, 'error': '未提供检测视频ID'}), 400
# 支持批量:逗号分隔
ids = [i.strip() for i in str(video_id).split(',') if i.strip()]
payload = ids if len(ids) > 1 else (ids[0] if ids else video_id)
success = self.db_manager.delete_detection_video(payload)
if success:
return jsonify({
'success': True,
'message': '检测视频删除成功',
'deleted_ids': ids
})
else:
return jsonify({'success': False, 'error': '检测视频删除失败'}), 500
except ValueError as e:
return jsonify({'success': False, 'error': str(e)}), 404
except Exception as e:
self.logger.error(f'删除检测视频失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/detection/sessions/<session_id>', methods=['DELETE'])
def delete_detection_session(session_id):
"""删除检测会话及其相关的检测数据"""
try:
self.db_manager.delete_detection_session(session_id)
return jsonify({
'success': True,
'message': '检测会话删除成功'
})
except ValueError as e:
return jsonify({'success': False, 'error': str(e)}), 404
except Exception as e:
self.logger.error(f'删除检测会话失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 错误处理 ====================
@self.app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': 'API接口不存在'}), 404
@self.app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'error': '服务器内部错误'}), 500
# ==================== SOCKET事件 ====================
def _register_socketio_events(self):
"""注册SocketIO事件"""
if self.socketio is None:
return
# 注册统一设备命名空间的连接事件
@self.socketio.on('connect', namespace='/devices')
def handle_devices_connect():
self.logger.info('设备命名空间客户端连接')
emit('status', {'message': '设备命名空间连接成功'}, namespace='/devices')
# 连接时发送当前所有设备的状态
self.broadcast_all_device_status()
@self.socketio.on('disconnect', namespace='/devices')
def handle_devices_disconnect():
self.logger.info('设备命名空间客户端断开连接')
# 注册设备订阅事件
@self.socketio.on('subscribe_device', namespace='/devices')
def handle_subscribe_device(data):
"""订阅特定设备数据"""
device_type = data.get('device_type')
if device_type in ['camera1', 'camera2', 'femtobolt', 'imu', 'pressure']:
self.logger.info(f'客户端订阅{device_type}设备数据')
emit('subscription_status', {
'device_type': device_type,
'status': 'subscribed',
'message': f'{device_type}设备数据订阅成功'
}, namespace='/devices')
else:
emit('subscription_status', {
'device_type': device_type,
'status': 'error',
'message': '不支持的设备类型'
}, namespace='/devices')
@self.socketio.on('unsubscribe_device', namespace='/devices')
def handle_unsubscribe_device(data):
"""取消订阅特定设备数据"""
device_type = data.get('device_type')
self.logger.info(f'客户端取消订阅{device_type}设备数据')
emit('subscription_status', {
'device_type': device_type,
'status': 'unsubscribed',
'message': f'{device_type}设备数据取消订阅成功'
}, namespace='/devices')
@self.socketio.on('start_push_data', namespace='/devices')
def handle_start_push_data():
"""启动数据推送"""
try:
self.start_device_push_data()
emit('test_status', {'status': 'started', 'message': '数据推送已开始'}, namespace='/devices')
except Exception as e:
emit('test_status', {'status': 'error', 'message': str(e)}, namespace='/devices')
@self.socketio.on('stop_push_data', namespace='/devices')
def handle_stop_push_data():
"""停止数据推送"""
try:
self.stop_device_push_data()
emit('test_status', {'status': 'stopped', 'message': '数据推送已停止'}, namespace='/devices')
except Exception as e:
emit('test_status', {'status': 'error', 'message': str(e)}, namespace='/devices')
@self.socketio.on('restart_device', namespace='/devices')
def handle_restart_device(data):
"""重启特定设备"""
device_type = data.get('device_type')
self.logger.info(f'客户端请求重启{device_type}设备')
self.restart_device(device_type)
def restart_device(self, device_type: str):
"""重启指定类型的设备
Args:
device_type (str): 设备类型 (camera, imu, pressure, femtobolt)
"""
if not self.device_coordinator:
self.logger.error('设备协调器未初始化,无法重启设备')
return False
try:
self.logger.info(f'开始重启 {device_type} 设备...')
# 调用设备协调器的重启方法
success = self.device_coordinator.restart_device(device_type)
if success:
self.logger.info(f'{device_type} 设备重启成功')
# 发送重启成功事件到前端
self.socketio.emit('device_restart_message', {
'device_type': device_type,
'message': f'{device_type} 设备重启成功!',
'timestamp': time.time()
}, namespace='/devices')
return True
else:
self.logger.error(f'{device_type} 设备重启失败!')
# 发送重启失败事件到前端
self.socketio.emit('device_restart_message', {
'device_type': device_type,
'message': f'{device_type} 设备重启失败!',
'timestamp': time.time()
}, namespace='/devices')
return False
except Exception as e:
self.logger.error(f'重启 {device_type} 设备时发生异常: {str(e)}')
# 发送重启异常事件到前端
self.socketio.emit('device_restart_error', {
'device_type': device_type,
'error': str(e),
'message': f'重启 {device_type} 设备时发生异常',
'timestamp': time.time()
}, namespace='/devices')
return False
def start_device_push_data(self):
"""开始设备数据推送"""
if self.is_pushing_data:
self.logger.warning('设备数据推送已在运行')
return
try:
self.logger.info('开始设备数据推送...')
self.is_pushing_data = True
# 并行启动真实设备管理器
failed_devices = []
device_threads = {}
device_results = {}
def initialize_device(device_name, manager):
"""设备初始化工作函数"""
try:
# 检查设备是否已连接,避免重复初始化
if hasattr(manager, 'is_connected') and manager.is_connected:
print(f"[DEBUG] {device_name} 已连接,跳过初始化")
# 如果已连接但未启动流,则启动流
if hasattr(manager, 'is_streaming') and not manager.is_streaming:
print(f"[DEBUG] {device_name} 已连接但未启动流,开始启动流")
manager.start_streaming()
device_results[device_name] = True
self.logger.info(f'{device_name}设备已连接,启动成功')
return
print(f"[DEBUG] 尝试初始化设备: {device_name}")
if manager.initialize():
print(f"[DEBUG] {device_name} 初始化成功,开始启动流")
manager.start_streaming()
device_results[device_name] = True
self.logger.info(f'{device_name}设备启动成功')
else:
print(f"[DEBUG] {device_name} 初始化失败")
device_results[device_name] = False
self.logger.error(f'{device_name}设备启动失败')
except Exception as e:
print(f"[DEBUG] {device_name} 初始化异常: {e}")
device_results[device_name] = False
self.logger.error(f'{device_name}设备启动异常: {e}')
# 为每个设备创建初始化线程
for device_name, manager in self.device_managers.items():
if manager is not None: # 确保管理器已初始化
thread = threading.Thread(
target=initialize_device,
args=(device_name, manager),
name=f'Init-{device_name}',
daemon=True
)
device_threads[device_name] = thread
thread.start()
else:
self.logger.warning(f'{device_name}管理器未初始化,跳过启动')
device_results[device_name] = False
# 等待所有设备初始化完成最多等待30秒
for device_name, thread in device_threads.items():
thread.join(timeout=30.0)
if thread.is_alive():
self.logger.warning(f'{device_name}设备初始化超时')
device_results[device_name] = False
# 收集失败的设备
for device_name, success in device_results.items():
if not success:
failed_devices.append(device_name)
# 输出启动结果摘要
successful_devices = [name for name, success in device_results.items() if success]
# 广播设备状态更新
for device_name, success in device_results.items():
self.broadcast_device_status(device_name, success)
if successful_devices:
self.logger.info(f'成功启动的设备: {", ".join(successful_devices)}')
if failed_devices:
self.logger.warning(f'启动失败的设备: {", ".join(failed_devices)}')
self.logger.info('设备数据推送已启动')
except Exception as e:
self.logger.error(f'启动设备数据推送失败: {e}')
self.is_pushing_data = False
# 不抛出异常,允许应用继续运行
def stop_device_push_data(self):
"""停止设备数据推送"""
if not self.is_pushing_data:
self.logger.warning('设备数据推送未运行')
return
try:
self.logger.info('停止设备数据推送...')
self.is_pushing_data = False
# 停止设备管理器
for device_name, manager in self.device_managers.items():
if manager is not None:
try:
manager.stop_streaming()
manager.disconnect()
self.logger.info(f'{device_name}设备已停止')
# 广播设备状态为未连接
self.broadcast_device_status(device_name, False)
except Exception as e:
self.logger.error(f'停止{device_name}设备失败: {e}')
# 即使停止失败也广播为未连接状态
self.broadcast_device_status(device_name, False)
self.logger.info('设备数据推送已停止')
except Exception as e:
self.logger.error(f'停止设备数据推送失败: {e}')
def broadcast_device_status(self, device_name: str, is_connected: bool):
"""广播单个设备状态"""
if self.socketio:
try:
status_data = {
'device_type': device_name,
'status': is_connected,
'timestamp': datetime.now().isoformat()
}
self.socketio.emit('device_status', status_data, namespace='/devices')
self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}')
except Exception as e:
self.logger.error(f'广播设备状态失败: {e}')
def broadcast_all_device_status(self):
"""广播所有设备状态"""
current_devices = self.device_coordinator.get_device_managers()
for device_name, manager in current_devices.items():
if manager is not None:
try:
# 检查设备是否连接使用is_connected属性
is_connected = hasattr(manager, 'is_connected') and getattr(manager, 'is_connected', False)
self.broadcast_device_status(device_name, is_connected)
except Exception as e:
self.logger.error(f'检查{device_name}设备状态失败: {e}')
self.broadcast_device_status(device_name, False)
def _on_device_status_change(self, device_name: str, is_connected: bool):
"""
设备状态变化回调函数
Args:
device_name: 设备名称
is_connected: 连接状态
"""
self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}')
self.broadcast_device_status(device_name, is_connected)
def _detection_worker(self, detection_id, duration):
"""检测工作线程"""
try:
self.logger.info(f'检测线程启动 - ID: {detection_id}, 持续时间: {duration}')
# 模拟检测过程
start_time = time.time()
while time.time() - start_time < duration:
if not self.current_detection:
break
# 发送检测进度
elapsed = time.time() - start_time
progress = min(100, (elapsed / duration) * 100)
if self.socketio:
self.socketio.emit('detection_progress', {
'detection_id': detection_id,
'progress': progress,
'elapsed_time': elapsed,
'remaining_time': max(0, duration - elapsed)
})
time.sleep(1)
# 检测完成
if self.current_detection and self.current_detection['id'] == detection_id:
self.db_manager.end_detection_session(detection_id)
self.current_detection = None
if self.socketio:
self.socketio.emit('detection_complete', {
'detection_id': detection_id,
'message': '检测已完成'
})
self.logger.info(f'检测完成 - ID: {detection_id}')
except Exception as e:
self.logger.error(f'检测线程异常: {e}')
if self.current_detection:
self.current_detection = None
def run(self, debug=None):
"""运行服务器"""
if debug is not None:
self.debug = debug
try:
self.logger.info('正在启动AppServer...')
self.init_app()
except KeyboardInterrupt:
self.logger.info('服务被用户中断')
except Exception as e:
self.logger.error(f'服务启动失败: {e}')
sys.exit(1)
finally:
self.logger.info('AppServer已停止')
def main():
"""主函数"""
# 解析命令行参数
parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend')
parser.add_argument('--host', default='0.0.0.0', help='Host address to bind to')
parser.add_argument('--port', type=int, default=5000, help='Port number to bind to')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
# 创建并运行服务器
server = AppServer(host=args.host, port=args.port, debug=args.debug)
server.run()
if __name__ == '__main__':
main()