#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ AppServer类 - 主应用服务器 实现Flask和SocketIO服务,替代app.py功能 """ import os import sys import json import time import threading from datetime import datetime from flask import Flask, jsonify from flask import request as flask_request from flask_cors import CORS import logging from flask_socketio import SocketIO, emit import configparser import argparse # 添加当前目录到路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 导入模块 from database import DatabaseManager from utils import config as app_config from utils import DataValidator # 添加数据验证器导入 from devices.camera_manager import CameraManager from devices.imu_manager import IMUManager from devices.pressure_manager import PressureManager from devices.femtobolt_manager import FemtoBoltManager from devices.device_coordinator import DeviceCoordinator from devices.screen_recorder import RecordingManager from devices.utils.config_manager import ConfigManager from devices.utils.license_manager import LicenseManager, LicenseStatus class AppServer: """主应用服务器类""" def __init__(self, host='0.0.0.0', port=5000, debug=False): """ 初始化应用服务器 Args: host: 服务器主机 port: 服务器端口 debug: 调试模式 """ self.host = host self.port = port self.debug = debug # 初始化日志 self._init_logging() # Flask应用 self.app = Flask(__name__) self.app.config['SECRET_KEY'] = 'body-balance-detection-system-2024' # SocketIO self._init_socketio() # CORS配置 CORS(self.app, origins='*', supports_credentials=True, allow_headers=['Content-Type', 'Authorization'], methods=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']) # 配置管理 self._init_config() # 全局变量 self.db_manager = None self.current_detection = None self.detection_thread = None # 授权管理 self.license_manager = None self.license_status = None # 数据推送状态 self.is_pushing_data = False # 设备管理器 self.config_manager = None self.device_coordinator = None self.device_managers = { 'camera': None, 'femtobolt': None, 'imu': None, 'pressure': None } # 注册路由和事件 self._register_routes() self._register_socketio_events() def _init_logging(self): """初始化日志配置""" # 日志目录 if getattr(sys, 'frozen', False): # 打包后的可执行文件 log_dir = os.path.join(os.path.dirname(sys.executable), 'logs') else: # 开发环境 log_dir = 'logs' # 创建日志目录 os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, 'backend.log') # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file, encoding='utf-8'), logging.StreamHandler() ] ) # 设置werkzeug日志级别为WARNING,过滤掉INFO级别的访问日志 logging.getLogger('werkzeug').setLevel(logging.WARNING) self.logger = logging.getLogger(__name__) def _init_socketio(self): """初始化SocketIO""" try: self.socketio = SocketIO( self.app, cors_allowed_origins='*', async_mode='threading', logger=False, engineio_logger=False, ping_timeout=60, ping_interval=25, manage_session=False, always_connect=False, transports=['polling', 'websocket'], # 优先使用polling allow_upgrades=True, # 允许升级到websocket cookie=None # 禁用cookie ) self.logger.info('SocketIO初始化成功') except Exception as e: self.logger.error(f'SocketIO初始化失败: {e}') self.socketio = None # 设置SocketIO日志级别 logging.getLogger('socketio').setLevel(logging.WARNING) logging.getLogger('engineio').setLevel(logging.WARNING) def _init_config(self): """初始化配置""" self.config = configparser.ConfigParser() # 配置文件路径 if getattr(sys, 'frozen', False): # 打包后的可执行文件 config_path = os.path.join(os.path.dirname(sys.executable), 'config.ini') else: # 开发环境 config_path = os.path.join(os.path.dirname(__file__), 'config.ini') self.config.read(config_path, encoding='utf-8') camera1_index = self.config.get('CAMERA1', 'device_index', fallback=None) camera2_index = self.config.get('CAMERA2', 'device_index', fallback=None) print(f"相机1设备号: {camera1_index}, 相机2设备号: {camera2_index}") def init_app(self): """初始化应用组件""" try: # 初始化配置管理器 self.logger.info('正在初始化配置管理器...') self.config_manager = ConfigManager() self.logger.info('配置管理器初始化完成') # 初始化授权管理器 self.logger.info('正在初始化授权管理器...') self.license_manager = LicenseManager(self.config_manager) self.license_status = self.license_manager.get_license_status() # 检查开发模式 dev_mode = self.config_manager.get_config_value('LICENSE', 'dev_mode', 'False').lower() == 'true' if dev_mode: self.logger.warning('开发模式已启用,跳过授权检查') self.license_status = LicenseStatus( valid=True, message="开发模式", license_type="dev", features={"recording": True, "export": True} ) else: # 记录授权状态 if self.license_status.valid: self.logger.info(f'授权验证成功 - 类型: {self.license_status.license_type}, ' f'到期时间: {self.license_status.expires_at}, ' f'授权ID: {self.license_status.license_id}') else: self.logger.warning(f'授权验证失败: {self.license_status.message}') # 将授权状态注入到Flask应用上下文 self.app.license_status = self.license_status self.logger.info('授权管理器初始化完成') # 初始化数据库管理器 self.logger.info('正在初始化数据库管理器...') db_path = self.config_manager.get_config_value('DATABASE', 'path', fallback=None) self.logger.info(f'数据库文件路径: {db_path}') # 确保数据库目录存在 try: os.makedirs(os.path.dirname(db_path), exist_ok=True) except Exception as e: self.logger.error(f'创建数据库目录失败: {e}') raise self.db_manager = DatabaseManager(db_path) self.db_manager.init_database() self.logger.info('数据库管理器初始化完成') # 初始化设备协调器(统一管理所有设备) self.logger.info('正在初始化设备协调器...') self.device_coordinator = DeviceCoordinator(self.socketio) # 设置状态变化回调 self.device_coordinator.set_status_change_callback(self._on_device_status_change) # 调用初始化方法来初始化设备 if self.device_coordinator.initialize(): self.logger.info('设备协调器初始化完成') # 获取设备管理器实例 self.device_managers = self.device_coordinator.get_device_managers() # 为每个设备添加状态变化回调(双重保险) for device_name, manager in self.device_managers.items(): if manager and hasattr(manager, 'add_status_change_callback'): manager.add_status_change_callback(self._on_device_status_change) self.logger.info(f'已获取设备管理器: {list(self.device_managers.keys())}') else: self.logger.warning('设备协调器初始化失败,但系统将继续运行') self.device_managers = {} # 初始化为空字典以避免后续错误 # 初始化录制管理器 self.logger.info('正在初始化录制管理器...') camera1_manager = self.device_managers.get('camera1') camera2_manager = self.device_managers.get('camera2') femtobolt_manager = self.device_managers.get('femtobolt') pressure_manager = self.device_managers.get('pressure') # 录制管理器当前采用屏幕区域截取方式进行相机录制,不依赖 CameraManager # 但保留其他设备管理器以便后续扩展(如FemtoBolt、压力传感器) self.recording_manager = RecordingManager( camera_manager=None, db_manager=self.db_manager, femtobolt_manager=femtobolt_manager, pressure_manager=pressure_manager, config_manager=self.config_manager ) self.logger.info('录制管理器初始化完成') # 启动Flask应用 host = self.host port = self.port debug = self.debug self.logger.info(f'启动Flask应用 - Host: {host}, Port: {port}, Debug: {debug}') if self.socketio: self.socketio.run(self.app, host=host, port=port, debug=debug, allow_unsafe_werkzeug=True) else: self.app.run(host=host, port=port, debug=debug) except Exception as e: self.logger.error(f'应用初始化失败: {e}') raise def require_license(self, feature=None): """ 授权检查装饰器 Args: feature: 需要检查的特定功能,如 'recording', 'export' 等 """ from functools import wraps def decorator(f): @wraps(f) def wrapper(*args, **kwargs): # 获取当前授权状态 if hasattr(self.app, 'license_status'): status = self.app.license_status else: status = LicenseStatus(valid=False, message="授权状态未初始化") # 检查基本授权 if not status.valid: return jsonify({ 'success': False, 'error': f'授权验证失败: {status.message}', 'license_required': True, 'license_type': status.license_type }), 403 # 检查特定功能授权 if feature and not status.features.get(feature, True): return jsonify({ 'success': False, 'error': f'未授权功能: {feature}', 'license_required': True, 'feature_required': feature }), 403 return f(*args, **kwargs) return wrapper return decorator def get_license_info(self): """获取授权信息""" if not self.license_manager: return { 'valid': False, 'message': '授权管理器未初始化', 'machine_id': None } # 刷新授权状态 status = self.license_manager.get_license_status(force_reload=True) machine_id = self.license_manager.get_machine_id() return { 'valid': status.valid, 'message': status.message, 'license_type': status.license_type, 'license_id': status.license_id, 'expires_at': status.expires_at.isoformat() if status.expires_at else None, 'features': status.features, 'machine_id': machine_id } def _register_routes(self): """注册Flask路由""" # ==================== 基础API ==================== @self.app.route('/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'version': '1.0.0' }) # ==================== 授权API ==================== @self.app.route('/api/license/info', methods=['GET']) def get_license_info(): """获取授权信息""" try: if not self.license_status: return jsonify({ 'success': False, 'error': '授权管理器未初始化' }), 500 return jsonify({ 'success': True, 'data': { 'valid': self.license_status.valid, 'message': self.license_status.message, 'license_type': self.license_status.license_type, 'license_id': self.license_status.license_id, 'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None, 'features': self.license_status.features, 'machine_id': self.license_manager.get_machine_id() if self.license_manager else None } }) except Exception as e: self.logger.error(f'获取授权信息失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/license/activation-request', methods=['POST']) def generate_activation_request(): """生成离线激活请求文件""" try: if not self.license_manager: return jsonify({ 'success': False, 'error': '授权管理器未初始化' }), 500 data = flask_request.get_json() company_name = data.get('company_name', '未知公司') contact_info = data.get('contact_info', '') request_file = self.license_manager.generate_activation_request( company_name=company_name, contact_info=contact_info ) # 读取文件内容返回给前端便于直接下载 content_text = None try: with open(request_file, 'r', encoding='utf-8') as f: content_text = f.read() except Exception as e: self.logger.warning(f'读取激活请求文件失败: {e}') return jsonify({ 'success': True, 'data': { 'request_file': request_file, 'content': content_text, 'machine_id': self.license_manager.get_machine_id() if self.license_manager else None, 'message': f'激活请求文件已生成: {request_file}' } }) except Exception as e: self.logger.error(f'生成激活请求失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/license/verify', methods=['POST']) def verify_license_file(): """验证授权文件""" try: if not self.license_manager: return jsonify({ 'success': False, 'error': '授权管理器未初始化' }), 500 # 检查是否有文件上传 if 'license_file' not in flask_request.files: return jsonify({ 'success': False, 'error': '未找到授权文件' }), 400 file = flask_request.files['license_file'] if file.filename == '': return jsonify({ 'success': False, 'error': '未选择文件' }), 400 # 保存临时文件 import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file: file.save(temp_file.name) temp_path = temp_file.name try: # 验证授权文件 is_valid, message = self.license_manager.verify_license_file(temp_path) if is_valid: # 覆盖系统授权文件为上传的文件 try: license_path_cfg = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json') if self.config_manager else 'data/license.json' # 解析目标路径为绝对路径 if not os.path.isabs(license_path_cfg): base_dir = os.path.dirname(os.path.abspath(__file__)) license_path_cfg = os.path.join(base_dir, license_path_cfg) os.makedirs(os.path.dirname(license_path_cfg), exist_ok=True) # 移动/覆盖授权文件 import shutil shutil.copyfile(temp_path, license_path_cfg) except Exception as e: self.logger.error(f'保存授权文件失败: {e}') return jsonify({'success': False, 'error': f'保存授权文件失败: {str(e)}'}), 500 # 更新授权状态(强制刷新) self.license_status = self.license_manager.get_license_status(force_reload=True) return jsonify({ 'success': True, 'data': { 'valid': True, 'message': message, 'license_info': { 'license_type': self.license_status.license_type, 'license_id': self.license_status.license_id, 'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None, 'features': self.license_status.features } } }) else: return jsonify({ 'success': False, 'error': message }), 400 finally: # 清理临时文件 try: os.unlink(temp_path) except: pass except Exception as e: self.logger.error(f'验证授权文件失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/license/activate-package', methods=['POST']) def activate_license_package(): """上传授权包ZIP,解压到配置指定目录并刷新授权""" try: if not self.license_manager: return jsonify({'success': False, 'error': '授权管理器未初始化'}), 500 if 'package_zip' not in flask_request.files: return jsonify({'success': False, 'error': '未找到授权包压缩文件'}), 400 file = flask_request.files['package_zip'] if file.filename == '': return jsonify({'success': False, 'error': '未选择文件'}), 400 import tempfile, zipfile, shutil with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as temp_file: file.save(temp_file.name) temp_path = temp_file.name try: # 目标目录:LICENSE.path 的目录 license_path_cfg = self.config_manager.get_config_value('LICENSE', 'path', 'data/license.json') if self.config_manager else 'data/license.json' if not os.path.isabs(license_path_cfg): base_dir = os.path.dirname(os.path.abspath(__file__)) license_path_cfg = os.path.join(base_dir, license_path_cfg) target_dir = os.path.dirname(license_path_cfg) os.makedirs(target_dir, exist_ok=True) # 安全解压,防止路径遍历 with zipfile.ZipFile(temp_path, 'r') as zip_ref: for member in zip_ref.namelist(): safe_member = os.path.normpath(member) if ('..' in safe_member) or safe_member.startswith('/') or safe_member.startswith('\\') or os.path.isabs(safe_member): return jsonify({'success': False, 'error': '压缩包包含非法路径'}), 400 zip_ref.extractall(target_dir) # 刷新授权 self.license_status = self.license_manager.get_license_status(force_reload=True) return jsonify({ 'success': True, 'data': { 'message': '激活包已安装,请重启系统生效', 'valid': self.license_status.valid, 'license_type': self.license_status.license_type, 'expires_at': self.license_status.expires_at.isoformat() if self.license_status.expires_at else None } }) finally: try: os.unlink(temp_path) except Exception: pass except Exception as e: self.logger.error(f'激活包安装失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 静态文件服务 ==================== @self.app.route('/', methods=['GET']) def serve_static_files(filename): try: # 读取配置中的文件存储根目录(优先使用 FILEPATH.path) cfg_dir = self.config_manager.get_config_value('FILEPATH', 'path', fallback=None) if not cfg_dir: return jsonify({'error': '未配置文件存储路径'}), 500 # 规范化允许目录路径,消除末尾斜杠、大小写和符号链接影响 cfg_dir = os.path.abspath(os.path.realpath(os.path.normpath(cfg_dir))) # 安全检查:防止路径遍历攻击 safe_path = os.path.normpath(filename) if ( '..' in safe_path or safe_path.startswith('/') or # POSIX绝对路径 safe_path.startswith('\\\\') or # Windows UNC路径 os.path.isabs(safe_path) # Windows盘符绝对路径 ): return jsonify({'error': '非法路径'}), 400 file_path = os.path.join(cfg_dir, safe_path) # 检查文件是否存在 if not os.path.exists(file_path): return jsonify({'error': '文件不存在'}), 404 # 检查是否在允许的目录内 if os.path.commonpath([cfg_dir, file_path]) != cfg_dir: return jsonify({'error': '访问被拒绝'}), 403 # 返回文件 from flask import send_file # 为视频文件设置正确的MIME类型和响应头 if file_path.lower().endswith(('.mp4', '.webm', '.avi', '.mov')): response = send_file(file_path, mimetype='video/mp4') # 添加支持视频流播放的响应头 response.headers['Accept-Ranges'] = 'bytes' response.headers['Content-Type'] = 'video/mp4' return response else: return send_file(file_path) except Exception as e: self.logger.error(f'静态文件服务错误: {e}') return jsonify({'error': '服务器内部错误'}), 500 @self.app.route('/test-socketio') def test_socketio(): """测试SocketIO连接""" return '

SocketIO Test Page

' @self.app.route('/api/health', methods=['GET']) def api_health_check(): """API健康检查""" return jsonify({ 'success': True, 'message': '后端服务运行正常', 'timestamp': datetime.now().isoformat(), 'database': self.db_manager is not None, 'config_manager': self.config_manager is not None, 'device_coordinator': self.device_coordinator is not None, 'device_managers': {name: manager is not None for name, manager in self.device_managers.items()} }) # ==================== 认证API ==================== @self.app.route('/api/auth/login', methods=['POST']) def login(): """用户登录""" try: # 检查Content-Type if not flask_request.is_json: return jsonify({'success': False, 'message': '请求Content-Type必须为application/json'}), 415 data = flask_request.get_json(force=True) if not data: return jsonify({'success': False, 'message': '请求数据为空'}), 400 username = data.get('username') password = data.get('password') if not username or not password: return jsonify({'success': False, 'message': '用户名和密码不能为空'}), 400 # 验证用户 user = self.db_manager.authenticate_user(username, password) if user: # 检查用户是否已激活 if not user['is_active']: return jsonify({ 'success': False, 'message': '账户未激活,请联系管理员审核' }), 403 # 构建用户数据 user_data = { 'id': user['id'], 'username': user['username'], 'name': user['name'], 'role': 'admin' if user['user_type'] == 'admin' else 'user', 'user_type': user['user_type'], 'avatar': '', 'phone': user['phone'] } # 生成token(实际项目中应使用JWT等安全token) token = f"token_{user['username']}_{int(time.time())}" self.logger.info(f'用户 {username} 登录成功') return jsonify({ 'success': True, 'data': { 'token': token, 'user': user_data }, 'message': '登录成功' }) else: self.logger.warning(f'用户 {username} 登录失败:用户名或密码错误') return jsonify({ 'success': False, 'message': '用户名或密码错误' }), 401 except Exception as e: self.logger.error(f'登录失败: {e}') return jsonify({'success': False, 'message': '登录失败'}), 500 @self.app.route('/api/auth/register', methods=['POST']) def register(): """用户注册""" try: # 检查Content-Type if not flask_request.is_json: return jsonify({'success': False, 'message': '请求Content-Type必须为application/json'}), 415 data = flask_request.get_json(force=True) username = data.get('username') password = data.get('password') name = data.get('name') or data.get('email', '') phone = data.get('phone') if not username or not password: return jsonify({ 'success': False, 'message': '用户名和密码不能为空' }), 400 if len(password) < 6: return jsonify({ 'success': False, 'message': '密码长度不能少于6位' }), 400 # 构建用户数据字典 user_data = { 'username': username, 'password': password, 'name': name, 'phone': phone } # 使用数据库注册用户 result = self.db_manager.register_user(user_data) if result['success']: self.logger.info(f'用户 {username} 注册成功,等待管理员审核') return jsonify({ 'success': True, 'message': '注册成功,请等待管理员审核后登录' }) else: return jsonify({ 'success': False, 'message': result['message'] }), 400 except Exception as e: self.logger.error(f'注册失败: {e}') return jsonify({'success': False, 'message': '注册失败'}), 500 @self.app.route('/api/auth/logout', methods=['POST']) def logout(): """用户登出""" try: # 这里可以添加token失效逻辑 return jsonify({'success': True, 'message': '登出成功'}) except Exception as e: self.logger.error(f'登出失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/auth/verify', methods=['GET']) def verify_token(): """验证token""" try: # 从请求头获取token auth_header = flask_request.headers.get('Authorization') if not auth_header or not auth_header.startswith('Bearer '): return jsonify({ 'success': False, 'message': '未提供有效的认证信息', 'data': {'valid': False} }), 401 token = auth_header.split(' ')[1] # 这里应该验证JWT token,简化处理 if token.startswith('token_'): return jsonify({ 'success': True, 'message': 'Token有效', 'data': {'valid': True} }) else: return jsonify({ 'success': False, 'message': 'Token无效', 'data': {'valid': False} }), 401 except Exception as e: self.logger.error(f'Token验证失败: {e}') return jsonify({ 'success': False, 'message': '验证失败', 'data': {'valid': False} }), 500 @self.app.route('/api/auth/forgot-password', methods=['POST']) def forgot_password(): """忘记密码 - 根据用户名和手机号找回密码""" try: data = flask_request.get_json() username = data.get('username') phone = data.get('phone') if not username: return jsonify({ 'success': False, 'error': '请输入用户名' }), 400 if not phone: return jsonify({ 'success': False, 'error': '请输入手机号码' }), 400 # 验证手机号格式 import re phone_pattern = r'^1[3-9]\d{9}$' if not re.match(phone_pattern, phone): return jsonify({ 'success': False, 'error': '手机号格式不正确' }), 400 # 查询用户信息 conn = self.db_manager.get_connection() cursor = conn.cursor() cursor.execute(''' SELECT username, password, phone FROM users WHERE username = ? AND phone = ? ''', (username, phone)) user = cursor.fetchone() if user: # 用户存在且手机号匹配,返回数据库中存储的实际密码 actual_password = user['password'] self.logger.info(f'用户 {username} 密码查询成功') return jsonify({ 'success': True, 'password': actual_password, # 返回数据库中存储的实际密码 'message': '密码找回成功' }) else: # 检查用户是否存在 cursor.execute('SELECT username FROM users WHERE username = ?', (username,)) user_exists = cursor.fetchone() if not user_exists: return jsonify({ 'success': False, 'error': '用户不存在' }), 400 else: return jsonify({ 'success': False, 'error': '手机号不匹配' }), 400 except Exception as e: self.logger.error(f'忘记密码处理失败: {e}') return jsonify({'success': False, 'error': '处理失败'}), 500 # ==================== 用户管理API ==================== @self.app.route('/api/users', methods=['GET']) def get_users(): """获取用户列表""" try: page = int(flask_request.args.get('page', 1)) size = int(flask_request.args.get('size', 10)) users = self.db_manager.get_users(page, size) total = self.db_manager.get_users_count() return jsonify({ 'success': True, 'data': { 'users': users, 'total': total, 'page': page, 'size': size } }) except Exception as e: self.logger.error(f'获取用户列表失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/users//approve', methods=['POST']) def approve_user(user_id): """审核用户""" try: data = flask_request.get_json() status = data.get('status') # 'approved' 或 'rejected' if status not in ['approved', 'rejected']: return jsonify({'success': False, 'error': '无效的审核状态'}), 400 # 使用数据库层已有的审核方法 try: self.db_manager.approve_user(user_id, approved=(status == 'approved')) return jsonify({ 'success': True, 'message': f'用户已{"通过" if status == "approved" else "拒绝"}审核' }) except Exception: return jsonify({'success': False, 'error': '用户不存在或审核失败'}), 404 except Exception as e: self.logger.error(f'审核用户失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/users/', methods=['DELETE']) def delete_user(user_id): """删除用户""" try: result = self.db_manager.delete_user(user_id) if result: return jsonify({'success': True, 'message': '用户已删除'}) else: return jsonify({'success': False, 'error': '用户不存在'}), 404 except Exception as e: self.logger.error(f'删除用户失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 患者管理API ==================== @self.app.route('/api/patients', methods=['GET', 'POST']) def handle_patients(): """患者管理""" if flask_request.method == 'GET': # 获取患者列表 try: page = int(flask_request.args.get('page', 1)) size = int(flask_request.args.get('size', 10)) search = flask_request.args.get('search', '') patients = self.db_manager.get_patients(page, size, search) total = self.db_manager.get_patients_count(search) return jsonify({ 'success': True, 'data': { 'patients': patients, 'total': total, 'page': page, 'size': size } }) except Exception as e: self.logger.error(f'获取患者列表失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 elif flask_request.method == 'POST': # 创建新患者 try: data = flask_request.get_json() # 验证患者数据 validation_result = DataValidator.validate_patient_data(data) if not validation_result['valid']: return jsonify({ 'success': False, 'error': '; '.join(validation_result['errors']) }), 400 # 准备患者数据 patient_data = { 'name': validation_result['data'].get('name'), 'gender': validation_result['data'].get('gender'), 'birth_date': validation_result['data'].get('birth_date'), 'nationality': validation_result['data'].get('nationality'), 'residence': validation_result['data'].get('residence'), 'height': validation_result['data'].get('height'), 'weight': validation_result['data'].get('weight'), 'shoe_size': validation_result['data'].get('shoe_size'), 'phone': validation_result['data'].get('phone'), 'email': validation_result['data'].get('email'), 'occupation': validation_result['data'].get('occupation'), 'workplace': validation_result['data'].get('workplace'), 'idcode': validation_result['data'].get('idcode'), 'medical_history': validation_result['data'].get('medical_history'), 'notes': validation_result['data'].get('notes') } # 创建患者 patient_id = self.db_manager.create_patient(patient_data) # 获取创建的患者信息 patient = self.db_manager.get_patient(patient_id) return jsonify({ 'success': True, 'message': '患者创建成功', 'data': { 'patient_id': patient_id, 'patient': patient } }) except Exception as e: self.logger.error(f'创建患者失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/patients/', methods=['GET', 'PUT', 'DELETE']) def handle_patient(patient_id): """单个患者操作""" if flask_request.method == 'GET': # 获取患者详情 try: patient = self.db_manager.get_patient(patient_id) return jsonify({'success': True, 'data': patient}) except Exception as e: self.logger.error(f'获取患者详情失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 elif flask_request.method == 'PUT': # 更新患者信息 try: data = flask_request.get_json() patient_data = { 'name': data.get('name'), 'gender': data.get('gender'), 'age': data.get('age'), 'birth_date': data.get('birth_date'), 'nationality': data.get('nationality'), 'residence': data.get('residence'), 'height': data.get('height'), 'weight': data.get('weight'), 'shoe_size': data.get('shoe_size'), 'phone': data.get('phone'), 'email': data.get('email'), 'occupation': data.get('occupation'), 'workplace': data.get('workplace'), 'idcode': data.get('idcode'), 'medical_history': data.get('medical_history'), 'notes': data.get('notes') } self.db_manager.update_patient(patient_id, patient_data) return jsonify({'success': True, 'message': '患者信息更新成功'}) except Exception as e: self.logger.error(f'更新患者信息失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 elif flask_request.method == 'DELETE': # 删除患者 try: result = self.db_manager.delete_patient(patient_id) return jsonify({'success': True, 'message': '患者已删除'}) except Exception as e: self.logger.error(f'删除患者失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 设备管理API ==================== @self.app.route('/api/devices/status', methods=['GET']) def get_device_status(): """获取设备状态""" try: if self.device_coordinator: status = self.device_coordinator.get_device_status() return jsonify({'success': True, 'data': status}) else: return jsonify({'success': False, 'error': '设备协调器未初始化'}), 500 except Exception as e: self.logger.error(f'获取设备状态失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 设备配置API ==================== @self.app.route('/api/config/devices', methods=['GET']) def get_all_device_configs(): """获取所有设备配置""" try: if self.config_manager: configs = self.config_manager.get_all_device_configs() return jsonify({ 'success': True, 'data': configs }) else: return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500 except Exception as e: self.logger.error(f"获取设备配置失败: {e}") return jsonify({ 'success': False, 'message': f'获取设备配置失败: {str(e)}' }), 500 @self.app.route('/api/config/devices/all', methods=['POST']) def set_all_device_configs(): """批量设置所有设备配置""" try: if not self.config_manager: return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500 data = flask_request.get_json() if not data: return jsonify({ 'success': False, 'message': '请求数据不能为空' }), 400 # 验证数据格式 supported_devices = ['camera', 'femtobolt','imu','pressure'] for device_name in data.keys(): if device_name not in supported_devices: return jsonify({ 'success': False, 'message': f'不支持的设备类型: {device_name},支持的设备类型: {", ".join(supported_devices)}' }), 400 result = self.config_manager.set_all_device_configs(data) status_code = 200 if result['success'] else 400 return jsonify(result), status_code except Exception as e: self.logger.error(f"批量设置设备配置失败: {e}") return jsonify({ 'success': False, 'message': f'批量设置设备配置失败: {str(e)}' }), 500 @self.app.route('/api/devices/calibrate', methods=['POST']) def calibrate_device(): """校准设备""" try: data = flask_request.get_json() device_type = data.get('device_type') if not device_type: return jsonify({'success': False, 'error': '设备类型不能为空'}), 400 if device_type in self.device_managers and self.device_managers[device_type]: device_manager = self.device_managers[device_type] if hasattr(device_manager, 'calibrate'): result = device_manager.calibrate() return jsonify({'success': True, 'data': result}) else: return jsonify({'success': False, 'error': f'{device_type}设备不支持校准'}), 400 else: return jsonify({'success': False, 'error': f'{device_type}设备管理器未初始化'}), 500 except Exception as e: self.logger.error(f'校准设备失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/devices/calibrate/imu', methods=['POST']) def calibrate_imu(): """校准IMU""" try: if self.device_managers['imu']: result = self.device_managers['imu'].calibrate() if result: return jsonify({'success': True, 'message': 'IMU校准成功'}) else: return jsonify({'success': False, 'error': 'IMU校准失败'}), 500 else: return jsonify({'success': False, 'error': 'IMU设备管理器未初始化'}), 500 except Exception as e: self.logger.error(f'IMU校准失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 检测API ==================== @self.app.route('/api/detection/start', methods=['POST']) def start_detection(): """开始检测""" try: if not self.db_manager or not self.device_coordinator: return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500 data = flask_request.get_json() patient_id = data.get('patient_id') creator_id = data.get('creator_id') if not patient_id or not creator_id: return jsonify({'success': False, 'error': '缺少患者ID或创建人ID'}), 400 # 调用create_detection_session方法,settings传空字典 session_id = self.db_manager.create_detection_session(patient_id, settings={}, creator_id=creator_id) self.logger.info('检测开始,设备连接监控已启动') # 启动设备连接监控 if self.device_coordinator: try: self.device_coordinator.start_all_connection_monitor() self.logger.info('检测开始,设备连接监控已启动') except Exception as monitor_error: self.logger.error(f'启动设备连接监控失败: {monitor_error}') return jsonify({'success': True, 'session_id': session_id}) except Exception as e: self.logger.error(f'开始检测失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//has_data', methods=['GET']) def has_session_detection_data(session_id: str): """检查指定会话是否存在检测数据,用于判断单次检测是否有效""" try: if not self.db_manager: return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500 exists = self.db_manager.has_session_detection_data(session_id) return jsonify({'success': True, 'session_id': session_id, 'has_data': bool(exists)}) except Exception as e: self.logger.error(f'检查会话检测数据存在失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//stop', methods=['POST']) def stop_detection(session_id): """停止检测""" try: if not self.db_manager or not self.device_coordinator: self.logger.error('数据库管理器或设备管理器未初始化') return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500 if not session_id: self.logger.error('缺少会话ID') return jsonify({ 'success': False, 'error': '缺少会话ID' }), 400 if self.db_manager.is_empty_session(session_id): # 删除空白会话 self.db_manager.delete_detection_session(session_id) self.logger.info(f'已删除空白会话: {session_id}') return jsonify({ 'success': True, 'message': '空白会话已删除' }) else: # 正常会话的停止流程:调用数据库层结束检测,自动计算时长并写入结束信息 data = flask_request.get_json() or {} diagnosis_info = data.get('diagnosis_info') treatment_info = data.get('treatment_info') suggestion_info = data.get('suggestion_info') success = self.db_manager.update_session_endcheck( session_id, diagnosis_info=diagnosis_info, treatment_info=treatment_info, suggestion_info=suggestion_info ) if success: self.logger.info(f'检测会话已结束检查 - 会话ID: {session_id}') return jsonify({ 'success': True, 'message': '检测已结束并已写入总结信息' }) else: self.logger.error('停止检测失败,更新会话状态失败') return jsonify({ 'success': False, 'error': '停止检测失败' }), 500 except Exception as e: self.logger.error(f'停止检测失败: {e}', exc_info=True) return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//start_record', methods=['POST']) def start_record(session_id): """开始视频录制""" try: if not self.db_manager or not self.device_coordinator: return jsonify({'success': False, 'error': '数据库管理器或设备管理器未初始化'}), 500 data = flask_request.get_json() patient_id = data.get('patient_id') screen_location = data.get('screen_location') # [0,0,1920,1080] femtobolt_location = data.get('femtobolt_location') # [0,0,640,480] camera1_location = data.get('camera1_location') # [0,0,640,480] camera2_location = data.get('camera2_location') # [0,0,640,480] if not patient_id: return jsonify({'success': False, 'error': '缺少患者ID'}), 400 # 开始视频录制 recording_response = None try: recording_response = self.recording_manager.start_recording(session_id, patient_id,screen_location,camera1_location,camera2_location,femtobolt_location) # 处理录制管理器返回的数据库更新信息 if recording_response and recording_response.get('success') and 'database_updates' in recording_response: db_updates = recording_response['database_updates'] try: # 保存检测视频记录(映射到 detection_video 表字段) video_paths = db_updates.get('video_paths', {}) video_record = { 'screen_video_path': video_paths.get('screen_video_path'), 'femtobolt_video_path': video_paths.get('femtobolt_video_path'), 'camera1_video_path': video_paths.get('camera1_video_path'), 'camera2_video_path': video_paths.get('camera2_video_path'), } try: self.db_manager.save_detection_video(db_updates['session_id'], video_record) except Exception as video_err: self.logger.error(f'保存检测视频记录失败: {video_err}') self.logger.info(f'数据库更新成功 - 会话ID: {db_updates["session_id"]}') except Exception as db_error: self.logger.error(f'处理数据库更新失败: {db_error}') except Exception as rec_e: self.logger.error(f'开始同步录制失败: {rec_e}') start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return jsonify({'success': True, 'session_id': session_id, 'detectionStartTime': start_time, 'recording': recording_response}) except Exception as e: self.logger.error(f'开始录制失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//stop_record', methods=['POST']) def stop_record(session_id): """停止视频录制""" try: if not session_id: self.logger.error('缺少会话ID') return jsonify({ 'success': False, 'error': '缺少会话ID' }), 400 # 停止同步录制,传递视频数据 try: restrt = self.recording_manager.stop_recording(session_id) self.logger.info(f'停止录制结果: {restrt}') except Exception as rec_e: self.logger.error(f'停止同步录制失败: {rec_e}', exc_info=True) raise return jsonify({'success': True,'msg': '停止录制成功'}) except Exception as e: self.logger.error(f'停止检测失败: {e}', exc_info=True) return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//status', methods=['GET']) def get_detection_status(session_id): """获取检测状态""" try: if not self.db_manager: return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500 if not session_id: return jsonify({ 'success': False, 'error': '缺少会话ID' }), 400 # 获取会话数据 session_data = self.db_manager.get_session_data(session_id) if session_data: return jsonify({ 'success': True, 'data': session_data }) else: return jsonify({ 'success': False, 'error': '会话不存在' }), 404 except Exception as e: self.logger.error(f'获取检测状态失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//save-info', methods=['POST']) def save_session_info(session_id): """保存会话信息(诊断、处理、建议、状态)""" try: if not self.db_manager: return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500 if not session_id: return jsonify({ 'success': False, 'error': '缺少会话ID' }), 400 # 获取请求数据 data = flask_request.get_json() or {} diagnosis_info = data.get('diagnosis_info') treatment_info = data.get('treatment_info') suggestion_info = data.get('suggestion_info') status = data.get('status') # 验证至少提供一个要更新的字段 if not any([diagnosis_info, treatment_info, suggestion_info, status]): return jsonify({ 'success': False, 'error': '至少需要提供一个要更新的字段(diagnosis_info, treatment_info, suggestion_info, status)' }), 400 # 调用数据库管理器的批量更新方法 self.db_manager.update_session_all_info( session_id=session_id, diagnosis_info=diagnosis_info, treatment_info=treatment_info, suggestion_info=suggestion_info, status=status ) # 构建更新信息反馈 updated_fields = [] if diagnosis_info is not None: updated_fields.append('诊断信息') if treatment_info is not None: updated_fields.append('处理信息') if suggestion_info is not None: updated_fields.append('建议信息') if status is not None: updated_fields.append(f'状态({status})') self.logger.info(f'会话信息保存成功: {session_id}, 更新字段: {", ".join(updated_fields)}') return jsonify({ 'success': True, 'message': f'会话信息保存成功,更新字段: {", ".join(updated_fields)}', 'data': { 'session_id': session_id, 'updated_fields': updated_fields } }) except Exception as e: self.logger.error(f'保存会话信息失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection//save-data', methods=['POST']) def save_detection_data(session_id): """采集检测数据""" try: if not self.db_manager: return jsonify({'success': False, 'error': '数据库管理器未初始化'}), 500 if not self.device_coordinator: return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500 # 获取请求数据 data = flask_request.get_json() or {} # print(f"接收到的data数据: {data}") patient_id = data.get('patient_id') # 如果没有提供patient_id,从会话信息中获取 if not patient_id: session_data = self.db_manager.get_session_data(session_id) if not session_data: return jsonify({ 'success': False, 'error': '检测会话不存在' }), 404 patient_id = session_data.get('patient_id') if not patient_id: return jsonify({ 'success': False, 'error': '无法获取患者ID' }), 400 # 调用录制管理器保存检测截图到文件 collected_data = self.recording_manager.save_detection_images( session_id=session_id, patient_id=patient_id, detection_data=data ) # 将采集的数据保存到数据库 if collected_data: self.db_manager.save_detection_data(session_id, collected_data) self.logger.info(f'检测数据采集并保存成功: {session_id}') return jsonify({ 'success': True, 'data': { 'session_id': session_id, 'timestamp': collected_data.get('timestamp'), 'data_collected': bool(collected_data) }, 'message': '数据采集成功' }) except Exception as e: self.logger.error(f'采集检测数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/history/sessions', methods=['GET']) def get_detection_sessions(): """获取检测会话历史""" try: page = int(flask_request.args.get('page', 1)) size = int(flask_request.args.get('size', 10)) patient_id = flask_request.args.get('patient_id') sessions = self.db_manager.get_detection_sessions(page, size, patient_id) total = self.db_manager.get_sessions_count(patient_id) return jsonify({ 'success': True, 'data': { 'sessions': sessions, 'total': total, 'page': page, 'size': size } }) except Exception as e: self.logger.error(f'获取检测历史失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/history/sessions/', methods=['GET']) def get_session_data(session_id): """获取会话详细数据""" try: session_data = self.db_manager.get_session_data(session_id) if session_data is None: return jsonify({'success': False, 'error': '会话不存在'}), 404 return jsonify({ 'success': True, 'data': session_data }) except Exception as e: self.logger.error(f'获取会话数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection/data/details', methods=['GET']) def get_detection_data_by_ids(): """根据多个主键ID查询检测数据详情,ids为逗号分隔""" try: ids_param = flask_request.args.get('ids') if not ids_param: return jsonify({'success': False, 'error': '缺少ids参数'}), 400 ids = [i.strip() for i in ids_param.split(',') if i.strip()] data_list = self.db_manager.get_detection_data_by_ids(ids) return jsonify({'success': True, 'data': data_list}) except Exception as e: self.logger.error(f'批量获取检测数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection/data/', methods=['DELETE']) def delete_detection_data(data_id): """删除检测数据记录(支持单个或多个ID,多个用逗号分隔)""" try: if not data_id: return jsonify({'success': False, 'error': '未提供检测数据ID'}), 400 # 支持批量:逗号分隔 ids = [i.strip() for i in str(data_id).split(',') if i.strip()] payload = ids if len(ids) > 1 else (ids[0] if ids else data_id) success = self.db_manager.delete_detection_data(payload) if success: return jsonify({ 'success': True, 'message': '检测数据删除成功', 'deleted_ids': ids }) else: return jsonify({'success': False, 'error': '检测数据删除失败'}), 500 except ValueError as e: return jsonify({'success': False, 'error': str(e)}), 404 except Exception as e: self.logger.error(f'删除检测数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection/video/', methods=['DELETE']) def delete_detection_video(video_id): """删除检测视频记录(支持单个或多个ID,多个用逗号分隔)""" try: if not video_id: return jsonify({'success': False, 'error': '未提供检测视频ID'}), 400 # 支持批量:逗号分隔 ids = [i.strip() for i in str(video_id).split(',') if i.strip()] payload = ids if len(ids) > 1 else (ids[0] if ids else video_id) success = self.db_manager.delete_detection_video(payload) if success: return jsonify({ 'success': True, 'message': '检测视频删除成功', 'deleted_ids': ids }) else: return jsonify({'success': False, 'error': '检测视频删除失败'}), 500 except ValueError as e: return jsonify({'success': False, 'error': str(e)}), 404 except Exception as e: self.logger.error(f'删除检测视频失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @self.app.route('/api/detection/sessions/', methods=['DELETE']) def delete_detection_session(session_id): """删除检测会话及其相关的检测数据""" try: self.db_manager.delete_detection_session(session_id) return jsonify({ 'success': True, 'message': '检测会话删除成功' }) except ValueError as e: return jsonify({'success': False, 'error': str(e)}), 404 except Exception as e: self.logger.error(f'删除检测会话失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 错误处理 ==================== @self.app.errorhandler(404) def not_found(error): return jsonify({'success': False, 'error': 'API接口不存在'}), 404 @self.app.errorhandler(500) def internal_error(error): return jsonify({'success': False, 'error': '服务器内部错误'}), 500 # ==================== SOCKET事件 ==================== def _register_socketio_events(self): """注册SocketIO事件""" if self.socketio is None: return # 注册统一设备命名空间的连接事件 @self.socketio.on('connect', namespace='/devices') def handle_devices_connect(): self.logger.info('设备命名空间客户端连接') emit('status', {'message': '设备命名空间连接成功'}, namespace='/devices') # 连接时发送当前所有设备的状态 self.broadcast_all_device_status() @self.socketio.on('disconnect', namespace='/devices') def handle_devices_disconnect(): self.logger.info('设备命名空间客户端断开连接') # 注册设备订阅事件 @self.socketio.on('subscribe_device', namespace='/devices') def handle_subscribe_device(data): """订阅特定设备数据""" device_type = data.get('device_type') if device_type in ['camera1', 'camera2', 'femtobolt', 'imu', 'pressure']: self.logger.info(f'客户端订阅{device_type}设备数据') emit('subscription_status', { 'device_type': device_type, 'status': 'subscribed', 'message': f'{device_type}设备数据订阅成功' }, namespace='/devices') else: emit('subscription_status', { 'device_type': device_type, 'status': 'error', 'message': '不支持的设备类型' }, namespace='/devices') @self.socketio.on('unsubscribe_device', namespace='/devices') def handle_unsubscribe_device(data): """取消订阅特定设备数据""" device_type = data.get('device_type') self.logger.info(f'客户端取消订阅{device_type}设备数据') emit('subscription_status', { 'device_type': device_type, 'status': 'unsubscribed', 'message': f'{device_type}设备数据取消订阅成功' }, namespace='/devices') @self.socketio.on('start_push_data', namespace='/devices') def handle_start_push_data(): """启动数据推送""" try: self.start_device_push_data() emit('test_status', {'status': 'started', 'message': '数据推送已开始'}, namespace='/devices') except Exception as e: emit('test_status', {'status': 'error', 'message': str(e)}, namespace='/devices') @self.socketio.on('stop_push_data', namespace='/devices') def handle_stop_push_data(): """停止数据推送""" try: self.stop_device_push_data() emit('test_status', {'status': 'stopped', 'message': '数据推送已停止'}, namespace='/devices') except Exception as e: emit('test_status', {'status': 'error', 'message': str(e)}, namespace='/devices') @self.socketio.on('restart_device', namespace='/devices') def handle_restart_device(data): """重启特定设备""" device_type = data.get('device_type') self.logger.info(f'客户端请求重启{device_type}设备') self.restart_device(device_type) def restart_device(self, device_type: str): """重启指定类型的设备 Args: device_type (str): 设备类型 (camera, imu, pressure, femtobolt) """ if not self.device_coordinator: self.logger.error('设备协调器未初始化,无法重启设备') return False try: self.logger.info(f'开始重启 {device_type} 设备...') # 调用设备协调器的重启方法 success = self.device_coordinator.restart_device(device_type) if success: self.logger.info(f'{device_type} 设备重启成功') # 发送重启成功事件到前端 self.socketio.emit('device_restart_message', { 'device_type': device_type, 'message': f'{device_type} 设备重启成功!', 'timestamp': time.time() }, namespace='/devices') return True else: self.logger.error(f'{device_type} 设备重启失败!') # 发送重启失败事件到前端 self.socketio.emit('device_restart_message', { 'device_type': device_type, 'message': f'{device_type} 设备重启失败!', 'timestamp': time.time() }, namespace='/devices') return False except Exception as e: self.logger.error(f'重启 {device_type} 设备时发生异常: {str(e)}') # 发送重启异常事件到前端 self.socketio.emit('device_restart_error', { 'device_type': device_type, 'error': str(e), 'message': f'重启 {device_type} 设备时发生异常', 'timestamp': time.time() }, namespace='/devices') return False def start_device_push_data(self): """开始设备数据推送""" if self.is_pushing_data: self.logger.warning('设备数据推送已在运行') return try: self.logger.info('开始设备数据推送...') self.is_pushing_data = True # 并行启动真实设备管理器 failed_devices = [] device_threads = {} device_results = {} def initialize_device(device_name, manager): """设备初始化工作函数""" try: # 检查设备是否已连接,避免重复初始化 if hasattr(manager, 'is_connected') and manager.is_connected: print(f"[DEBUG] {device_name} 已连接,跳过初始化") # 如果已连接但未启动流,则启动流 if hasattr(manager, 'is_streaming') and not manager.is_streaming: print(f"[DEBUG] {device_name} 已连接但未启动流,开始启动流") manager.start_streaming() device_results[device_name] = True self.logger.info(f'{device_name}设备已连接,启动成功') return print(f"[DEBUG] 尝试初始化设备: {device_name}") if manager.initialize(): print(f"[DEBUG] {device_name} 初始化成功,开始启动流") manager.start_streaming() device_results[device_name] = True self.logger.info(f'{device_name}设备启动成功') else: print(f"[DEBUG] {device_name} 初始化失败") device_results[device_name] = False self.logger.error(f'{device_name}设备启动失败') except Exception as e: print(f"[DEBUG] {device_name} 初始化异常: {e}") device_results[device_name] = False self.logger.error(f'{device_name}设备启动异常: {e}') # 为每个设备创建初始化线程 for device_name, manager in self.device_managers.items(): if manager is not None: # 确保管理器已初始化 thread = threading.Thread( target=initialize_device, args=(device_name, manager), name=f'Init-{device_name}', daemon=True ) device_threads[device_name] = thread thread.start() else: self.logger.warning(f'{device_name}管理器未初始化,跳过启动') device_results[device_name] = False # 等待所有设备初始化完成(最多等待30秒) for device_name, thread in device_threads.items(): thread.join(timeout=30.0) if thread.is_alive(): self.logger.warning(f'{device_name}设备初始化超时') device_results[device_name] = False # 收集失败的设备 for device_name, success in device_results.items(): if not success: failed_devices.append(device_name) # 输出启动结果摘要 successful_devices = [name for name, success in device_results.items() if success] # 广播设备状态更新 for device_name, success in device_results.items(): self.broadcast_device_status(device_name, success) if successful_devices: self.logger.info(f'成功启动的设备: {", ".join(successful_devices)}') if failed_devices: self.logger.warning(f'启动失败的设备: {", ".join(failed_devices)}') self.logger.info('设备数据推送已启动') except Exception as e: self.logger.error(f'启动设备数据推送失败: {e}') self.is_pushing_data = False # 不抛出异常,允许应用继续运行 def stop_device_push_data(self): """停止设备数据推送""" if not self.is_pushing_data: self.logger.warning('设备数据推送未运行') return try: self.logger.info('停止设备数据推送...') self.is_pushing_data = False # 停止设备管理器 for device_name, manager in self.device_managers.items(): if manager is not None: try: manager.stop_streaming() manager.disconnect() self.logger.info(f'{device_name}设备已停止') # 广播设备状态为未连接 self.broadcast_device_status(device_name, False) except Exception as e: self.logger.error(f'停止{device_name}设备失败: {e}') # 即使停止失败也广播为未连接状态 self.broadcast_device_status(device_name, False) self.logger.info('设备数据推送已停止') except Exception as e: self.logger.error(f'停止设备数据推送失败: {e}') def broadcast_device_status(self, device_name: str, is_connected: bool): """广播单个设备状态""" if self.socketio: try: status_data = { 'device_type': device_name, 'status': is_connected, 'timestamp': datetime.now().isoformat() } self.socketio.emit('device_status', status_data, namespace='/devices') self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}') except Exception as e: self.logger.error(f'广播设备状态失败: {e}') def broadcast_all_device_status(self): """广播所有设备状态""" current_devices = self.device_coordinator.get_device_managers() for device_name, manager in current_devices.items(): if manager is not None: try: # 检查设备是否连接(使用is_connected属性) is_connected = hasattr(manager, 'is_connected') and getattr(manager, 'is_connected', False) self.broadcast_device_status(device_name, is_connected) except Exception as e: self.logger.error(f'检查{device_name}设备状态失败: {e}') self.broadcast_device_status(device_name, False) def _on_device_status_change(self, device_name: str, is_connected: bool): """ 设备状态变化回调函数 Args: device_name: 设备名称 is_connected: 连接状态 """ self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}') self.broadcast_device_status(device_name, is_connected) def _detection_worker(self, detection_id, duration): """检测工作线程""" try: self.logger.info(f'检测线程启动 - ID: {detection_id}, 持续时间: {duration}秒') # 模拟检测过程 start_time = time.time() while time.time() - start_time < duration: if not self.current_detection: break # 发送检测进度 elapsed = time.time() - start_time progress = min(100, (elapsed / duration) * 100) if self.socketio: self.socketio.emit('detection_progress', { 'detection_id': detection_id, 'progress': progress, 'elapsed_time': elapsed, 'remaining_time': max(0, duration - elapsed) }) time.sleep(1) # 检测完成 if self.current_detection and self.current_detection['id'] == detection_id: self.db_manager.end_detection_session(detection_id) self.current_detection = None if self.socketio: self.socketio.emit('detection_complete', { 'detection_id': detection_id, 'message': '检测已完成' }) self.logger.info(f'检测完成 - ID: {detection_id}') except Exception as e: self.logger.error(f'检测线程异常: {e}') if self.current_detection: self.current_detection = None def run(self, debug=None): """运行服务器""" if debug is not None: self.debug = debug try: self.logger.info('正在启动AppServer...') self.init_app() except KeyboardInterrupt: self.logger.info('服务被用户中断') except Exception as e: self.logger.error(f'服务启动失败: {e}') sys.exit(1) finally: self.logger.info('AppServer已停止') def main(): """主函数""" # 解析命令行参数 parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend') parser.add_argument('--host', default='0.0.0.0', help='Host address to bind to') parser.add_argument('--port', type=int, default=5000, help='Port number to bind to') parser.add_argument('--debug', action='store_true', help='Enable debug mode') args = parser.parse_args() # 创建并运行服务器 server = AppServer(host=args.host, port=args.port, debug=args.debug) server.run() if __name__ == '__main__': main()