#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 平衡体态检测系统 - 后端服务 基于Flask的本地API服务 """ import os import sys import json import time import threading from datetime import datetime from flask import Flask, request, jsonify, send_file from flask_cors import CORS import sqlite3 import logging from pathlib import Path from flask_socketio import SocketIO, emit import cv2 import base64 import configparser from concurrent.futures import ThreadPoolExecutor import queue import gc import psutil import os # 添加当前目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) # 导入自定义模块 from database import DatabaseManager from device_manager import DeviceManager from detection_engine import DetectionEngine from data_processor import DataProcessor from utils import config as app_config # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/backend.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # 创建Flask应用 app = Flask(__name__) app.config['SECRET_KEY'] = 'body-balance-detection-system-2024' socketio = SocketIO(app, cors_allowed_origins='*', async_mode='threading') # 启用CORS支持 CORS(app, origins='*', supports_credentials=True, allow_headers=['Content-Type', 'Authorization'], methods=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS']) # 读取RTSP配置 config = configparser.ConfigParser() config.read(os.path.join(os.path.dirname(__file__), 'config.ini'), encoding='utf-8') rtsp_url = config.get('CAMERA', 'rtsp_url', fallback=None) # 全局变量 db_manager = None device_manager = None detection_engine = None data_processor = None current_detection = None detection_thread = None rtsp_thread = None rtsp_running = False # 用于异步编码的线程池和队列 encoding_executor = ThreadPoolExecutor(max_workers=2) frame_queue = queue.Queue(maxsize=1) # 只保留最新的一帧 # 内存优化配置 frame_skip_counter = 0 FRAME_SKIP_RATIO = 1 # 每3帧发送1帧,减少网络和内存压力 MAX_FRAME_SIZE = (640, 480) # 进一步减小帧尺寸以节省内存 MAX_MEMORY_USAGE = 200 * 1024 * 1024 # 100MB内存限制 memory_check_counter = 0 MEMORY_CHECK_INTERVAL = 50 # 每50帧检查一次内存 def get_memory_usage(): """获取当前进程内存使用量(字节)""" try: process = psutil.Process(os.getpid()) return process.memory_info().rss except: return 0 def async_encode_frame(frame, frame_count): """异步编码帧 - 内存优化版本""" global memory_check_counter try: # 内存检查 memory_check_counter += 1 if memory_check_counter >= MEMORY_CHECK_INTERVAL: memory_check_counter = 0 current_memory = get_memory_usage() if current_memory > MAX_MEMORY_USAGE: logger.warning(f"内存使用过高: {current_memory / 1024 / 1024:.2f}MB,强制清理") gc.collect() # 如果内存仍然过高,跳过此帧 if get_memory_usage() > MAX_MEMORY_USAGE: del frame return # 更激进的图像尺寸压缩以节省内存 height, width = frame.shape[:2] target_width, target_height = MAX_FRAME_SIZE if width > target_width or height > target_height: # 计算缩放比例,保持宽高比 scale_w = target_width / width scale_h = target_height / height scale = min(scale_w, scale_h) new_width = int(width * scale) new_height = int(height * scale) # 使用更快的插值方法减少CPU使用 frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) # 优化JPEG编码参数:优先考虑速度和内存 encode_param = [ int(cv2.IMWRITE_JPEG_QUALITY), 50, # 进一步降低质量以减少内存使用 int(cv2.IMWRITE_JPEG_OPTIMIZE), 1, # 启用优化 int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0 # 禁用渐进式以减少内存 ] success, buffer = cv2.imencode('.jpg', frame, encode_param) if not success: logger.error('图像编码失败') return # 立即释放frame内存 del frame jpg_as_text = base64.b64encode(buffer).decode('utf-8') # 立即释放buffer内存 del buffer # 发送数据 socketio.emit('rtsp_frame', { 'image': jpg_as_text, 'frame_id': frame_count, 'timestamp': time.time() }) # 立即释放base64字符串 del jpg_as_text except Exception as e: logger.error(f'异步编码帧失败: {e}') finally: # 定期强制垃圾回收 if memory_check_counter % 10 == 0: gc.collect() def frame_encoding_worker(): """帧编码工作线程""" while rtsp_running: try: # 从队列获取帧 frame, frame_count = frame_queue.get(timeout=1) # 提交到线程池进行异步编码 encoding_executor.submit(async_encode_frame, frame, frame_count) except queue.Empty: continue except Exception as e: logger.error(f'帧编码工作线程异常: {e}') def init_app(): """初始化应用""" global db_manager, device_manager, detection_engine, data_processor try: # 创建必要的目录 os.makedirs('logs', exist_ok=True) # 从配置文件读取数据库路径并创建目录 db_path = app_config.get('DATABASE', 'path', 'backend/data/body_balance.db') db_dir = os.path.dirname(db_path) os.makedirs(db_dir, exist_ok=True) # 初始化数据库 db_manager = DatabaseManager(db_path) db_manager.init_database() # 临时跳过设备初始化以避免卡住 # device_manager = DeviceManager() # detection_engine = DetectionEngine() # data_processor = DataProcessor() logger.info('跳过设备初始化,仅启动基础服务') logger.info('应用初始化完成') except Exception as e: logger.error(f'应用初始化失败: {e}') raise # ==================== 基础API ==================== @app.route('/health', methods=['GET']) def health_check(): """健康检查""" return jsonify({ 'status': 'ok', 'timestamp': datetime.now().isoformat(), 'version': '1.0.0' }) @app.route('/api/health', methods=['GET']) def api_health_check(): """API健康检查""" return jsonify({ 'status': 'ok', 'timestamp': datetime.now().isoformat(), 'version': '1.0.0' }) # ==================== 认证API ==================== @app.route('/api/auth/login', methods=['POST']) def login(): """用户登录""" try: data = request.get_json() username = data.get('username') password = data.get('password') remember = data.get('remember', False) if not username or not password: return jsonify({ 'success': False, 'message': '用户名或密码不能为空' }), 400 # 使用数据库验证用户 user = db_manager.authenticate_user(username, password) if user: # 检查用户是否已激活 if not user['is_active']: return jsonify({ 'success': False, 'message': '账户未激活,请联系管理员审核' }), 403 # 构建用户数据 user_data = { 'id': user['id'], 'username': user['username'], 'name': user['name'], 'role': 'admin' if user['user_type'] == 'admin' else 'user', 'user_type': user['user_type'], 'avatar': '' } # 生成token(实际项目中应使用JWT等安全token) token = f"token_{username}_{int(time.time())}" logger.info(f'用户 {username} 登录成功') return jsonify({ 'success': True, 'data': { 'token': token, 'user': user_data }, 'message': '登录成功' }) else: logger.warning(f'用户 {username} 登录失败:用户名或密码错误') return jsonify({ 'success': False, 'message': '用户名或密码错误' }), 401 except Exception as e: logger.error(f'登录失败: {e}') return jsonify({'success': False, 'message': '登录失败'}), 500 @app.route('/api/auth/register', methods=['POST']) def register(): """用户注册""" try: data = request.get_json() username = data.get('username') password = data.get('password') name = data.get('name') or data.get('email', '') phone = data.get('phone') if not username or not password: return jsonify({ 'success': False, 'message': '用户名和密码不能为空' }), 400 if len(password) < 6: return jsonify({ 'success': False, 'message': '密码长度不能少于6位' }), 400 # 构建用户数据字典 user_data = { 'username': username, 'password': password, 'name': name, 'phone': phone } # 使用数据库注册用户 result = db_manager.register_user(user_data) if result['success']: logger.info(f'用户 {username} 注册成功,等待管理员审核') return jsonify({ 'success': True, 'message': '注册成功,请等待管理员审核后登录' }) else: return jsonify({ 'success': False, 'message': result['message'] }), 400 except Exception as e: logger.error(f'注册失败: {e}') return jsonify({'success': False, 'message': '注册失败'}), 500 @app.route('/api/auth/logout', methods=['POST']) def logout(): """用户退出""" try: return jsonify({ 'success': True, 'message': '退出成功' }) except Exception as e: logger.error(f'退出失败: {e}') return jsonify({'success': False, 'message': '退出失败'}), 500 @app.route('/api/auth/verify', methods=['GET']) def verify_token(): """验证token""" try: # 简单的token验证(实际项目中应验证JWT等) auth_header = request.headers.get('Authorization') if auth_header and auth_header.startswith('Bearer '): token = auth_header.split(' ')[1] # 这里可以添加真实的token验证逻辑 return jsonify({ 'success': True, 'data': {'valid': True} }) else: return jsonify({ 'success': True, 'data': {'valid': True} # 暂时总是返回有效 }) except Exception as e: logger.error(f'验证token失败: {e}') return jsonify({'success': False, 'message': '验证失败'}), 500 @app.route('/api/auth/forgot-password', methods=['POST']) def forgot_password(): """忘记密码 - 根据用户名和手机号找回密码""" try: data = request.get_json() username = data.get('username') phone = data.get('phone') if not username: return jsonify({ 'success': False, 'error': '请输入用户名' }), 400 if not phone: return jsonify({ 'success': False, 'error': '请输入手机号码' }), 400 # 验证手机号格式 import re phone_pattern = r'^1[3-9]\d{9}$' if not re.match(phone_pattern, phone): return jsonify({ 'success': False, 'error': '手机号格式不正确' }), 400 # 查询用户信息 conn = db_manager.get_connection() cursor = conn.cursor() cursor.execute(''' SELECT username, password, phone FROM users WHERE username = ? AND phone = ? ''', (username, phone)) user = cursor.fetchone() if user: # 用户存在且手机号匹配,返回密码 # 注意:这里返回的是加密后的密码,实际应用中需要解密或重置 # 为了演示,我们假设有一个简单的解密方法 encrypted_password = user['password'] # 这里简化处理,实际应该有更安全的密码找回机制 # 由于使用MD5加密,无法直接解密,所以返回提示信息 return jsonify({ 'success': True, 'password': '1234567', # 演示用固定密码 'message': '密码找回成功' }) else: # 检查用户是否存在 cursor.execute('SELECT username FROM users WHERE username = ?', (username,)) user_exists = cursor.fetchone() if not user_exists: return jsonify({ 'success': False, 'error': '用户不存在' }), 400 else: return jsonify({ 'success': False, 'error': '手机号不匹配' }), 400 except Exception as e: logger.error(f'忘记密码处理失败: {e}') return jsonify({'success': False, 'error': '处理失败'}), 500 # ==================== 用户管理API ==================== @app.route('/api/users', methods=['GET']) def get_users(): """获取用户列表(管理员功能)""" try: # 这里应该验证管理员权限 page = int(request.args.get('page', 1)) size = int(request.args.get('size', 10)) status = request.args.get('status') # active, inactive, all users = db_manager.get_users(page, size, status) total = db_manager.get_user_count(status) return jsonify({ 'success': True, 'data': { 'users': users, 'total': total, 'page': page, 'size': size } }) except Exception as e: logger.error(f'获取用户列表失败: {e}') return jsonify({'success': False, 'message': '获取用户列表失败'}), 500 @app.route('/api/users//approve', methods=['POST']) def approve_user(user_id): """审核用户(管理员功能)""" try: # 这里应该验证管理员权限 data = request.get_json() approve = data.get('approve', True) result = db_manager.approve_user(user_id, approve) if result['success']: action = '审核通过' if approve else '审核拒绝' logger.info(f'用户 {user_id} {action}') return jsonify({ 'success': True, 'message': f'用户{action}成功' }) else: return jsonify({ 'success': False, 'message': result['message'] }), 400 except Exception as e: logger.error(f'审核用户失败: {e}') return jsonify({'success': False, 'message': '审核用户失败'}), 500 @app.route('/api/users/', methods=['DELETE']) def delete_user(user_id): """删除用户(管理员功能)""" try: # 这里应该验证管理员权限 result = db_manager.delete_user(user_id) if result['success']: logger.info(f'用户 {user_id} 删除成功') return jsonify({ 'success': True, 'message': '用户删除成功' }) else: return jsonify({ 'success': False, 'message': result['message'] }), 400 except Exception as e: logger.error(f'删除用户失败: {e}') return jsonify({'success': False, 'message': '删除用户失败'}), 500 @app.route('/api/auth/reset-password', methods=['POST']) def reset_password(): """重置密码""" try: data = request.get_json() token = data.get('token') password = data.get('password') if token and password: return jsonify({ 'success': True, 'message': '密码重置成功' }) else: return jsonify({ 'success': False, 'message': '参数不完整' }), 400 except Exception as e: logger.error(f'重置密码失败: {e}') return jsonify({'success': False, 'message': '重置失败'}), 500 @app.route('/api/system/info', methods=['GET']) def get_system_info(): """获取系统信息""" try: info = { 'version': '1.0.0', 'start_time': datetime.now().isoformat(), 'database_status': 'connected' if db_manager else 'disconnected', 'device_count': len(device_manager.get_connected_devices()) if device_manager else 0 } return jsonify({'success': True, 'data': info}) except Exception as e: logger.error(f'获取系统信息失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 设备管理API ==================== @app.route('/api/devices/status', methods=['GET']) def get_device_status(): """获取设备状态""" try: if not device_manager: return jsonify({'camera': False, 'imu': False, 'pressure': False}) status = device_manager.get_device_status() return jsonify(status) except Exception as e: logger.error(f'获取设备状态失败: {e}') return jsonify({'camera': False, 'imu': False, 'pressure': False}) @app.route('/api/devices/refresh', methods=['POST']) def refresh_devices(): """刷新设备连接""" try: if device_manager: device_manager.refresh_devices() return jsonify({'success': True, 'message': '设备已刷新'}) except Exception as e: logger.error(f'刷新设备失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/devices/calibrate', methods=['POST']) def calibrate_devices(): """校准设备""" try: if device_manager: result = device_manager.calibrate_devices() return jsonify({'success': True, 'data': result}) return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500 except Exception as e: logger.error(f'设备校准失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 患者管理API ==================== @app.route('/api/patients', methods=['GET']) def get_patients(): """获取患者列表""" try: page = int(request.args.get('page', 1)) size = int(request.args.get('size', 10)) keyword = request.args.get('keyword', '') patients = db_manager.get_patients(page, size, keyword) total = db_manager.get_patients_count(keyword) return jsonify({ 'success': True, 'data': { 'patients': patients, 'total': total, 'page': page, 'size': size } }) except Exception as e: logger.error(f'获取患者列表失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/patients', methods=['POST']) def create_patient(): """创建患者""" try: data = request.get_json() patient_id = db_manager.create_patient(data) return jsonify({ 'success': True, 'data': {'patient_id': patient_id}, 'message': '患者创建成功' }) except Exception as e: logger.error(f'创建患者失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/patients/', methods=['PUT']) def update_patient(patient_id): """更新患者信息""" try: data = request.get_json() db_manager.update_patient(patient_id, data) return jsonify({'success': True, 'message': '患者信息更新成功'}) except Exception as e: logger.error(f'更新患者信息失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/patients/', methods=['DELETE']) def delete_patient(patient_id): """删除患者""" try: db_manager.delete_patient(patient_id) return jsonify({'success': True, 'message': '患者删除成功'}) except Exception as e: logger.error(f'删除患者失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 检测管理API ==================== @app.route('/api/detection/start', methods=['POST']) def start_detection(): """开始检测""" global current_detection, detection_thread try: if current_detection and current_detection.get('status') == 'running': return jsonify({'success': False, 'error': '检测已在进行中'}), 400 data = request.get_json() patient_id = data.get('patientId') settings = data.get('settings', {}) # 创建检测会话 session_id = db_manager.create_detection_session(patient_id, settings) # 初始化检测状态 current_detection = { 'session_id': session_id, 'patient_id': patient_id, 'status': 'running', 'start_time': datetime.now(), 'settings': settings, 'data_points': 0 } # 启动检测线程 detection_thread = threading.Thread( target=run_detection, args=(session_id, settings) ) detection_thread.start() return jsonify({ 'success': True, 'data': {'session_id': session_id}, 'message': '检测已开始' }) except Exception as e: logger.error(f'开始检测失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/detection/stop', methods=['POST']) def stop_detection(): """停止检测""" global current_detection try: if not current_detection or current_detection.get('status') != 'running': return jsonify({'success': False, 'error': '没有正在进行的检测'}), 400 # 更新检测状态 current_detection['status'] = 'stopped' current_detection['end_time'] = datetime.now() # 等待检测线程结束 if detection_thread and detection_thread.is_alive(): detection_thread.join(timeout=5) return jsonify({'success': True, 'message': '检测已停止'}) except Exception as e: logger.error(f'停止检测失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/detection/status', methods=['GET']) def get_detection_status(): """获取检测状态""" try: if not current_detection: return jsonify({ 'success': True, 'data': {'status': 'idle'} }) # 计算运行时间 if current_detection.get('status') == 'running': elapsed = (datetime.now() - current_detection['start_time']).total_seconds() current_detection['elapsed_time'] = int(elapsed) return jsonify({ 'success': True, 'data': current_detection }) except Exception as e: logger.error(f'获取检测状态失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/detection/data', methods=['GET']) def get_realtime_data(): """获取实时检测数据""" try: if not current_detection or current_detection.get('status') != 'running': return jsonify({'success': False, 'error': '没有正在进行的检测'}) # 获取最新的检测数据 session_id = current_detection['session_id'] data = detection_engine.get_latest_data(session_id) return jsonify({ 'success': True, 'data': data }) except Exception as e: logger.error(f'获取实时数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 数据分析API ==================== @app.route('/api/analysis/session/', methods=['GET']) def analyze_session(session_id): """分析检测会话数据""" try: # 获取会话数据 session_data = db_manager.get_session_data(session_id) if not session_data: return jsonify({'success': False, 'error': '会话不存在'}), 404 # 进行数据分析 analysis_result = data_processor.analyze_session(session_data) # 保存分析结果 db_manager.save_analysis_result(session_id, analysis_result) return jsonify({ 'success': True, 'data': analysis_result }) except Exception as e: logger.error(f'分析会话数据失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/export/report/', methods=['GET']) def export_report(session_id): """导出检测报告""" try: # 生成报告 report_path = data_processor.generate_report(session_id) if not os.path.exists(report_path): return jsonify({'success': False, 'error': '报告生成失败'}), 500 return send_file( report_path, as_attachment=True, download_name=f'detection_report_{session_id}.pdf' ) except Exception as e: logger.error(f'导出报告失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 # ==================== 历史记录API ==================== @app.route('/api/history/sessions', methods=['GET']) def get_detection_sessions(): """获取检测会话历史""" try: page = int(request.args.get('page', 1)) size = int(request.args.get('size', 10)) patient_id = request.args.get('patient_id') sessions = db_manager.get_detection_sessions(page, size, patient_id) total = db_manager.get_sessions_count(patient_id) return jsonify({ 'success': True, 'data': { 'sessions': sessions, 'total': total, 'page': page, 'size': size } }) except Exception as e: logger.error(f'获取检测历史失败: {e}') return jsonify({'success': False, 'error': str(e)}), 500 def run_detection(session_id, settings): """运行检测的后台线程""" global current_detection try: logger.info(f'开始检测会话: {session_id}') # 检测循环 while (current_detection and current_detection.get('status') == 'running'): # 采集数据 if device_manager: data = device_manager.collect_data() if data: # 保存数据到数据库 db_manager.save_detection_data(session_id, data) current_detection['data_points'] += 1 # 根据采样频率控制循环间隔 frequency = settings.get('frequency', 60) time.sleep(1.0 / frequency) # 检查是否达到设定时长 duration = settings.get('duration', 0) if duration > 0: elapsed = (datetime.now() - current_detection['start_time']).total_seconds() if elapsed >= duration: current_detection['status'] = 'completed' break # 更新会话状态 if current_detection: db_manager.update_session_status( session_id, current_detection['status'], current_detection.get('data_points', 0) ) logger.info(f'检测会话完成: {session_id}') except Exception as e: logger.error(f'检测线程异常: {e}') if current_detection: current_detection['status'] = 'error' current_detection['error'] = str(e) # ==================== 截图保存API ==================== @app.route('/api/screenshots/save', methods=['POST']) def save_screenshot(): """保存截图""" try: data = request.get_json() # 验证必需参数 required_fields = ['patientId', 'patientName', 'sessionId', 'imageData'] for field in required_fields: if not data.get(field): return jsonify({ 'success': False, 'message': f'缺少必需参数: {field}' }), 400 patient_id = data['patientId'] patient_name = data['patientName'] session_id = data['sessionId'] image_data = data['imageData'] # 验证base64图片数据格式 if not image_data.startswith('data:image/'): return jsonify({ 'success': False, 'message': '无效的图片数据格式' }), 400 # 提取base64数据 try: header, encoded = image_data.split(',', 1) image_bytes = base64.b64decode(encoded) except Exception as e: return jsonify({ 'success': False, 'message': f'图片数据解码失败: {str(e)}' }), 400 # 创建文件夹结构 screenshots_dir = Path('screenshots') patient_dir = screenshots_dir / f'{patient_id}_{patient_name}' session_dir = patient_dir / f'{session_id}' # 确保目录存在 session_dir.mkdir(parents=True, exist_ok=True) # 生成文件名(4位流水号) existing_files = list(session_dir.glob(f'{patient_id}_{session_id}_*.png')) next_number = len(existing_files) + 1 filename = f'{patient_id}_{session_id}_{next_number:04d}.png' filepath = session_dir / filename # 保存图片文件 with open(filepath, 'wb') as f: f.write(image_bytes) # 记录到数据库(如果需要) try: # 这里可以添加数据库记录逻辑 # db_manager.save_screenshot_record(patient_id, session_id, str(filepath)) pass except Exception as e: logger.warning(f'保存截图记录到数据库失败: {e}') logger.info(f'截图保存成功: {filepath}') return jsonify({ 'success': True, 'message': '截图保存成功', 'filepath': str(filepath), 'filename': filename }) except Exception as e: logger.error(f'保存截图失败: {e}') return jsonify({ 'success': False, 'message': f'保存截图失败: {str(e)}' }), 500 # ==================== 录像保存API ==================== @app.route('/api/recordings/save', methods=['POST']) def save_recording(): """保存录像""" try: data = request.get_json() # 验证必需参数 required_fields = ['patientId', 'patientName', 'sessionId', 'videoData'] for field in required_fields: if not data.get(field): return jsonify({ 'success': False, 'message': f'缺少必需参数: {field}' }), 400 patient_id = data['patientId'] patient_name = data['patientName'] session_id = data['sessionId'] video_data = data['videoData'] mime_type = data.get('mimeType', 'video/webm;codecs=vp9') # 默认webm格式 # 验证base64视频数据格式 if not video_data.startswith('data:video/'): return jsonify({ 'success': False, 'message': '无效的视频数据格式' }), 400 # 提取base64数据 try: header, encoded = video_data.split(',', 1) video_bytes = base64.b64decode(encoded) except Exception as e: return jsonify({ 'success': False, 'message': f'视频数据解码失败: {str(e)}' }), 400 # 创建文件夹结构(与截图保存相同的结构) recordings_dir = Path('screenshots') # 使用同一个根目录 patient_dir = recordings_dir / f'{patient_id}_{patient_name}' session_dir = patient_dir / f'{session_id}' # 确保目录存在 session_dir.mkdir(parents=True, exist_ok=True) # 根据mimeType确定文件扩展名 if 'mp4' in mime_type: file_extension = 'mp4' elif 'webm' in mime_type: file_extension = 'webm' else: file_extension = 'webm' # 默认扩展名 # 生成文件名 filename = f'{patient_id}_{session_id}_recording.{file_extension}' filepath = session_dir / filename logger.info(f'录像格式: {mime_type}, 文件扩展名: {file_extension}') # 保存视频文件 with open(filepath, 'wb') as f: f.write(video_bytes) # 记录到数据库(如果需要) try: # 这里可以添加数据库记录逻辑 # db_manager.save_recording_record(patient_id, session_id, str(filepath)) pass except Exception as e: logger.warning(f'保存录像记录到数据库失败: {e}') logger.info(f'录像保存成功: {filepath}') return jsonify({ 'success': True, 'message': '录像保存成功', 'filepath': str(filepath), 'filename': filename }) except Exception as e: logger.error(f'保存录像失败: {e}') return jsonify({ 'success': False, 'message': f'保存录像失败: {str(e)}' }), 500 # ==================== 错误处理 ==================== @app.errorhandler(404) def not_found(error): return jsonify({'success': False, 'error': 'API接口不存在'}), 404 @app.errorhandler(500) def internal_error(error): return jsonify({'success': False, 'error': '服务器内部错误'}), 500 if __name__ == '__main__': import argparse # 解析命令行参数 parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend') parser.add_argument('--host', default=None, help='Host address to bind to') parser.add_argument('--port', type=int, default=None, help='Port number to bind to') parser.add_argument('--debug', action='store_true', help='Enable debug mode') args = parser.parse_args() try: # 初始化应用 init_app() # 确定主机和端口 host = args.host if args.host else config.get('SERVER', 'host', fallback='127.0.0.1') port = args.port if args.port else config.getint('SERVER', 'port', fallback=5000) debug = args.debug if args.debug else config.getboolean('APP', 'debug', fallback=False) # 启动Flask+SocketIO服务 logger.info(f'启动后端服务... Host: {host}, Port: {port}, Debug: {debug}') socketio.run(app, host=host, port=port, debug=debug, use_reloader=False, # 禁用热重载以避免进程问题 log_output=True, # 启用详细日志 allow_unsafe_werkzeug=True ) except KeyboardInterrupt: logger.info('服务被用户中断') except Exception as e: logger.error(f'服务启动失败: {e}') sys.exit(1) finally: logger.info('后端服务已停止') # ==================== WebSocket 实时推送RTSP帧 ==================== def generate_test_frame(frame_count): """生成测试帧""" import numpy as np width, height = 640, 480 # 创建黑色背景 frame = np.zeros((height, width, 3), dtype=np.uint8) # 添加动态元素 timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # 添加时间戳 cv2.putText(frame, timestamp, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) # 添加帧计数 cv2.putText(frame, f'TEST Frame: {frame_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2) # 添加移动的圆形 center_x = int(320 + 200 * np.sin(frame_count * 0.1)) center_y = int(240 + 100 * np.cos(frame_count * 0.1)) cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1) # 添加变化的矩形 rect_size = int(50 + 30 * np.sin(frame_count * 0.05)) cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1) return frame def generate_rtsp_frames(): global rtsp_running frame_count = 0 error_count = 0 use_test_mode = False last_frame_time = time.time() logger.info(f'开始生成RTSP帧,URL: {rtsp_url}') try: cap = cv2.VideoCapture(rtsp_url) if not cap.isOpened(): logger.warning(f'无法打开RTSP流: {rtsp_url},切换到测试模式') use_test_mode = True socketio.emit('rtsp_status', {'status': 'started', 'message': '使用测试视频源'}) else: # 最激进的实时优化设置 cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) # 完全禁用缓冲区 cap.set(cv2.CAP_PROP_FPS, 60) # 提高帧率到60fps cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # MJPEG编码 # 设置更低的分辨率以减少处理时间 cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) logger.info('RTSP流已打开,开始推送帧(激进实时模式)') socketio.emit('rtsp_status', {'status': 'started', 'message': '使用RTSP视频源(激进实时模式)'}) rtsp_running = True # 启动帧编码工作线程 encoding_thread = threading.Thread(target=frame_encoding_worker) encoding_thread.daemon = True encoding_thread.start() while rtsp_running: if use_test_mode: # 使用测试模式生成帧 frame = generate_test_frame(frame_count) ret = True else: # 使用RTSP流,添加帧跳过机制减少延迟 ret, frame = cap.read() if not ret: error_count += 1 logger.warning(f'RTSP读取帧失败(第{error_count}次),尝试重连...') if 'cap' in locals(): cap.release() if error_count > 5: logger.warning('RTSP连接失败次数过多,切换到测试模式') use_test_mode = True socketio.emit('rtsp_status', {'status': 'switched', 'message': '已切换到测试视频源'}) continue # 立即重连,不等待 cap = cv2.VideoCapture(rtsp_url) if cap.isOpened(): # 重连时应用相同的激进实时设置 cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) cap.set(cv2.CAP_PROP_FPS, 60) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) continue error_count = 0 # 重置错误计数 # 内存优化的帧跳过策略 # 减少跳帧数量,避免过度内存使用 skip_count = 0 while skip_count < 3: # 减少到最多跳过3帧 temp_ret, temp_frame = cap.read() if temp_ret: # 立即释放之前的帧 if 'frame' in locals(): del frame frame = temp_frame skip_count += 1 else: break # 降低帧率以减少内存压力 current_time = time.time() if current_time - last_frame_time < 1/20: # 降低到20fps最大频率 continue last_frame_time = current_time frame_count += 1 # 实现帧跳过以减少内存和网络压力 global frame_skip_counter frame_skip_counter += 1 if frame_skip_counter % (FRAME_SKIP_RATIO + 1) != 0: # 跳过此帧,立即释放内存 del frame continue try: # 将帧放入队列进行异步处理 try: # 非阻塞方式放入队列,如果队列满了就丢弃旧帧 frame_queue.put_nowait((frame.copy(), frame_count)) except queue.Full: # 队列满了,清空队列并放入新帧 try: old_frame, _ = frame_queue.get_nowait() del old_frame # 立即释放旧帧内存 except queue.Empty: pass frame_queue.put_nowait((frame.copy(), frame_count)) # 立即释放原始帧内存 del frame if frame_count % 60 == 0: # 每60帧记录一次 logger.info(f'已推送 {frame_count} 帧到编码队列,跳过率: {FRAME_SKIP_RATIO}/{FRAME_SKIP_RATIO+1}') # 定期强制垃圾回收 import gc gc.collect() except Exception as e: logger.error(f'帧队列处理失败: {e}') # 移除sleep以获得最大处理速度,让系统自然限制帧率 # time.sleep(1/60) # 注释掉sleep以获得最大实时性 except Exception as e: logger.error(f'RTSP推流异常: {e}') socketio.emit('rtsp_status', {'status': 'error', 'message': f'推流异常: {str(e)}'}) finally: if 'cap' in locals(): cap.release() rtsp_running = False logger.info(f'RTSP推流结束,总共推送了 {frame_count} 帧') @socketio.on('start_rtsp') def handle_start_rtsp(data=None): global rtsp_thread, rtsp_running logger.info(f'收到start_rtsp事件,客户端ID: {request.sid}, 数据: {data}') try: if rtsp_thread and rtsp_thread.is_alive(): logger.warning('RTSP线程已在运行') emit('rtsp_status', {'status': 'already_running', 'message': 'RTSP已在运行'}) return if not rtsp_url: logger.error('RTSP URL未配置') emit('rtsp_status', {'status': 'error', 'message': 'RTSP URL未配置'}) return logger.info(f'启动RTSP线程,URL: {rtsp_url}') rtsp_thread = threading.Thread(target=generate_rtsp_frames) rtsp_thread.daemon = True rtsp_thread.start() logger.info('RTSP线程已启动') emit('rtsp_status', {'status': 'started', 'message': 'RTSP推流已启动'}) except Exception as e: logger.error(f'启动RTSP失败: {e}') emit('rtsp_status', {'status': 'error', 'message': f'启动失败: {str(e)}'}) @socketio.on('stop_rtsp') def handle_stop_rtsp(data=None): global rtsp_running logger.info(f'收到stop_rtsp事件,客户端ID: {request.sid}, 数据: {data}') try: rtsp_running = False logger.info('RTSP推流已停止') emit('rtsp_status', {'status': 'stopped', 'message': 'RTSP推流已停止'}) except Exception as e: logger.error(f'停止RTSP失败: {e}') emit('rtsp_status', {'status': 'error', 'message': f'停止失败: {str(e)}'}) @socketio.on('connect') def handle_connect(): logger.info(f'客户端连接: {request.sid}') emit('connect_status', {'status': 'connected', 'sid': request.sid, 'message': '连接成功'}) @socketio.on('disconnect') def handle_disconnect(): logger.info(f'客户端断开连接: {request.sid}') @socketio.on('ping') def handle_ping(data=None): logger.info(f'收到ping事件,客户端ID: {request.sid}, 数据: {data}') emit('pong', {'timestamp': time.time(), 'message': 'pong'})