BodyBalanceEvaluation/backend/app.py
2025-07-30 19:09:15 +08:00

1156 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
平衡体态检测系统 - 后端服务
基于Flask的本地API服务
"""
import os
import sys
import json
import time
import threading
from datetime import datetime
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import sqlite3
import logging
from pathlib import Path
from flask_socketio import SocketIO, emit
import cv2
import base64
import configparser
from concurrent.futures import ThreadPoolExecutor
import queue
import gc
import psutil
import os
# 添加当前目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入自定义模块
from database import DatabaseManager
from device_manager import DeviceManager
from detection_engine import DetectionEngine
from data_processor import DataProcessor
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/backend.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 创建Flask应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'body-balance-detection-system-2024'
socketio = SocketIO(app, cors_allowed_origins='*', async_mode='threading')
# 启用CORS支持
CORS(app, origins='*', supports_credentials=True, allow_headers=['Content-Type', 'Authorization'], methods=['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'])
# 读取RTSP配置
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'config.ini'), encoding='utf-8')
rtsp_url = config.get('CAMERA', 'rtsp_url', fallback=None)
# 全局变量
db_manager = None
device_manager = None
detection_engine = None
data_processor = None
current_detection = None
detection_thread = None
rtsp_thread = None
rtsp_running = False
# 用于异步编码的线程池和队列
encoding_executor = ThreadPoolExecutor(max_workers=2)
frame_queue = queue.Queue(maxsize=1) # 只保留最新的一帧
# 内存优化配置
frame_skip_counter = 0
FRAME_SKIP_RATIO = 1 # 每3帧发送1帧减少网络和内存压力
MAX_FRAME_SIZE = (640, 480) # 进一步减小帧尺寸以节省内存
MAX_MEMORY_USAGE = 200 * 1024 * 1024 # 100MB内存限制
memory_check_counter = 0
MEMORY_CHECK_INTERVAL = 50 # 每50帧检查一次内存
def get_memory_usage():
"""获取当前进程内存使用量(字节)"""
try:
process = psutil.Process(os.getpid())
return process.memory_info().rss
except:
return 0
def async_encode_frame(frame, frame_count):
"""异步编码帧 - 内存优化版本"""
global memory_check_counter
try:
# 内存检查
memory_check_counter += 1
if memory_check_counter >= MEMORY_CHECK_INTERVAL:
memory_check_counter = 0
current_memory = get_memory_usage()
if current_memory > MAX_MEMORY_USAGE:
logger.warning(f"内存使用过高: {current_memory / 1024 / 1024:.2f}MB强制清理")
gc.collect()
# 如果内存仍然过高,跳过此帧
if get_memory_usage() > MAX_MEMORY_USAGE:
del frame
return
# 更激进的图像尺寸压缩以节省内存
height, width = frame.shape[:2]
target_width, target_height = MAX_FRAME_SIZE
if width > target_width or height > target_height:
# 计算缩放比例,保持宽高比
scale_w = target_width / width
scale_h = target_height / height
scale = min(scale_w, scale_h)
new_width = int(width * scale)
new_height = int(height * scale)
# 使用更快的插值方法减少CPU使用
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
# 优化JPEG编码参数优先考虑速度和内存
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), 50, # 进一步降低质量以减少内存使用
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1, # 启用优化
int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0 # 禁用渐进式以减少内存
]
success, buffer = cv2.imencode('.jpg', frame, encode_param)
if not success:
logger.error('图像编码失败')
return
# 立即释放frame内存
del frame
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
socketio.emit('rtsp_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
# 立即释放base64字符串
del jpg_as_text
except Exception as e:
logger.error(f'异步编码帧失败: {e}')
finally:
# 定期强制垃圾回收
if memory_check_counter % 10 == 0:
gc.collect()
def frame_encoding_worker():
"""帧编码工作线程"""
while rtsp_running:
try:
# 从队列获取帧
frame, frame_count = frame_queue.get(timeout=1)
# 提交到线程池进行异步编码
encoding_executor.submit(async_encode_frame, frame, frame_count)
except queue.Empty:
continue
except Exception as e:
logger.error(f'帧编码工作线程异常: {e}')
def init_app():
"""初始化应用"""
global db_manager, device_manager, detection_engine, data_processor
try:
# 创建必要的目录
os.makedirs('logs', exist_ok=True)
os.makedirs('data', exist_ok=True)
os.makedirs('exports', exist_ok=True)
os.makedirs('videos', exist_ok=True)
# 初始化数据库
db_manager = DatabaseManager('data/body_balance.db')
db_manager.init_database()
# 初始化设备管理器
device_manager = DeviceManager()
# 初始化检测引擎
detection_engine = DetectionEngine()
# 初始化数据处理器
data_processor = DataProcessor()
logger.info('应用初始化完成')
except Exception as e:
logger.error(f'应用初始化失败: {e}')
raise
# ==================== 基础API ====================
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
@app.route('/api/health', methods=['GET'])
def api_health_check():
"""API健康检查"""
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
# ==================== 认证API ====================
@app.route('/api/auth/login', methods=['POST'])
def login():
"""用户登录"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
remember = data.get('remember', False)
# 简单的模拟登录验证
if username and password:
# 这里可以添加真实的用户验证逻辑
# 目前使用模拟数据
user_data = {
'id': 1,
'username': username,
'name': '医生',
'role': 'doctor',
'avatar': ''
}
# 生成简单的token实际项目中应使用JWT等安全token
token = f"token_{username}_{int(time.time())}"
return jsonify({
'success': True,
'data': {
'token': token,
'user': user_data
},
'message': '登录成功'
})
else:
return jsonify({
'success': False,
'message': '用户名或密码不能为空'
}), 400
except Exception as e:
logger.error(f'登录失败: {e}')
return jsonify({'success': False, 'message': '登录失败'}), 500
@app.route('/api/auth/register', methods=['POST'])
def register():
"""用户注册"""
try:
data = request.get_json()
username = data.get('username')
password = data.get('password')
email = data.get('email')
# 简单的模拟注册
if username and password:
return jsonify({
'success': True,
'message': '注册成功,请登录'
})
else:
return jsonify({
'success': False,
'message': '用户名和密码不能为空'
}), 400
except Exception as e:
logger.error(f'注册失败: {e}')
return jsonify({'success': False, 'message': '注册失败'}), 500
@app.route('/api/auth/logout', methods=['POST'])
def logout():
"""用户退出"""
try:
return jsonify({
'success': True,
'message': '退出成功'
})
except Exception as e:
logger.error(f'退出失败: {e}')
return jsonify({'success': False, 'message': '退出失败'}), 500
@app.route('/api/auth/verify', methods=['GET'])
def verify_token():
"""验证token"""
try:
# 简单的token验证实际项目中应验证JWT等
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
# 这里可以添加真实的token验证逻辑
return jsonify({
'success': True,
'data': {'valid': True}
})
else:
return jsonify({
'success': True,
'data': {'valid': True} # 暂时总是返回有效
})
except Exception as e:
logger.error(f'验证token失败: {e}')
return jsonify({'success': False, 'message': '验证失败'}), 500
@app.route('/api/auth/forgot-password', methods=['POST'])
def forgot_password():
"""忘记密码"""
try:
data = request.get_json()
email = data.get('email')
if email:
return jsonify({
'success': True,
'message': '重置密码邮件已发送'
})
else:
return jsonify({
'success': False,
'message': '邮箱不能为空'
}), 400
except Exception as e:
logger.error(f'忘记密码处理失败: {e}')
return jsonify({'success': False, 'message': '处理失败'}), 500
@app.route('/api/auth/reset-password', methods=['POST'])
def reset_password():
"""重置密码"""
try:
data = request.get_json()
token = data.get('token')
password = data.get('password')
if token and password:
return jsonify({
'success': True,
'message': '密码重置成功'
})
else:
return jsonify({
'success': False,
'message': '参数不完整'
}), 400
except Exception as e:
logger.error(f'重置密码失败: {e}')
return jsonify({'success': False, 'message': '重置失败'}), 500
@app.route('/api/system/info', methods=['GET'])
def get_system_info():
"""获取系统信息"""
try:
info = {
'version': '1.0.0',
'start_time': datetime.now().isoformat(),
'database_status': 'connected' if db_manager else 'disconnected',
'device_count': len(device_manager.get_connected_devices()) if device_manager else 0
}
return jsonify({'success': True, 'data': info})
except Exception as e:
logger.error(f'获取系统信息失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 设备管理API ====================
@app.route('/api/devices/status', methods=['GET'])
def get_device_status():
"""获取设备状态"""
try:
if not device_manager:
return jsonify({'camera': False, 'imu': False, 'pressure': False})
status = device_manager.get_device_status()
return jsonify(status)
except Exception as e:
logger.error(f'获取设备状态失败: {e}')
return jsonify({'camera': False, 'imu': False, 'pressure': False})
@app.route('/api/devices/refresh', methods=['POST'])
def refresh_devices():
"""刷新设备连接"""
try:
if device_manager:
device_manager.refresh_devices()
return jsonify({'success': True, 'message': '设备已刷新'})
except Exception as e:
logger.error(f'刷新设备失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/devices/calibrate', methods=['POST'])
def calibrate_devices():
"""校准设备"""
try:
if device_manager:
result = device_manager.calibrate_devices()
return jsonify({'success': True, 'data': result})
return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500
except Exception as e:
logger.error(f'设备校准失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 患者管理API ====================
@app.route('/api/patients', methods=['GET'])
def get_patients():
"""获取患者列表"""
try:
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
keyword = request.args.get('keyword', '')
patients = db_manager.get_patients(page, size, keyword)
total = db_manager.get_patients_count(keyword)
return jsonify({
'success': True,
'data': {
'patients': patients,
'total': total,
'page': page,
'size': size
}
})
except Exception as e:
logger.error(f'获取患者列表失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/patients', methods=['POST'])
def create_patient():
"""创建患者"""
try:
data = request.get_json()
patient_id = db_manager.create_patient(data)
return jsonify({
'success': True,
'data': {'patient_id': patient_id},
'message': '患者创建成功'
})
except Exception as e:
logger.error(f'创建患者失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/patients/<patient_id>', methods=['PUT'])
def update_patient(patient_id):
"""更新患者信息"""
try:
data = request.get_json()
db_manager.update_patient(patient_id, data)
return jsonify({'success': True, 'message': '患者信息更新成功'})
except Exception as e:
logger.error(f'更新患者信息失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/patients/<patient_id>', methods=['DELETE'])
def delete_patient(patient_id):
"""删除患者"""
try:
db_manager.delete_patient(patient_id)
return jsonify({'success': True, 'message': '患者删除成功'})
except Exception as e:
logger.error(f'删除患者失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 检测管理API ====================
@app.route('/api/detection/start', methods=['POST'])
def start_detection():
"""开始检测"""
global current_detection, detection_thread
try:
if current_detection and current_detection.get('status') == 'running':
return jsonify({'success': False, 'error': '检测已在进行中'}), 400
data = request.get_json()
patient_id = data.get('patientId')
settings = data.get('settings', {})
# 创建检测会话
session_id = db_manager.create_detection_session(patient_id, settings)
# 初始化检测状态
current_detection = {
'session_id': session_id,
'patient_id': patient_id,
'status': 'running',
'start_time': datetime.now(),
'settings': settings,
'data_points': 0
}
# 启动检测线程
detection_thread = threading.Thread(
target=run_detection,
args=(session_id, settings)
)
detection_thread.start()
return jsonify({
'success': True,
'data': {'session_id': session_id},
'message': '检测已开始'
})
except Exception as e:
logger.error(f'开始检测失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/detection/stop', methods=['POST'])
def stop_detection():
"""停止检测"""
global current_detection
try:
if not current_detection or current_detection.get('status') != 'running':
return jsonify({'success': False, 'error': '没有正在进行的检测'}), 400
# 更新检测状态
current_detection['status'] = 'stopped'
current_detection['end_time'] = datetime.now()
# 等待检测线程结束
if detection_thread and detection_thread.is_alive():
detection_thread.join(timeout=5)
return jsonify({'success': True, 'message': '检测已停止'})
except Exception as e:
logger.error(f'停止检测失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/detection/status', methods=['GET'])
def get_detection_status():
"""获取检测状态"""
try:
if not current_detection:
return jsonify({
'success': True,
'data': {'status': 'idle'}
})
# 计算运行时间
if current_detection.get('status') == 'running':
elapsed = (datetime.now() - current_detection['start_time']).total_seconds()
current_detection['elapsed_time'] = int(elapsed)
return jsonify({
'success': True,
'data': current_detection
})
except Exception as e:
logger.error(f'获取检测状态失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/detection/data', methods=['GET'])
def get_realtime_data():
"""获取实时检测数据"""
try:
if not current_detection or current_detection.get('status') != 'running':
return jsonify({'success': False, 'error': '没有正在进行的检测'})
# 获取最新的检测数据
session_id = current_detection['session_id']
data = detection_engine.get_latest_data(session_id)
return jsonify({
'success': True,
'data': data
})
except Exception as e:
logger.error(f'获取实时数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 数据分析API ====================
@app.route('/api/analysis/session/<session_id>', methods=['GET'])
def analyze_session(session_id):
"""分析检测会话数据"""
try:
# 获取会话数据
session_data = db_manager.get_session_data(session_id)
if not session_data:
return jsonify({'success': False, 'error': '会话不存在'}), 404
# 进行数据分析
analysis_result = data_processor.analyze_session(session_data)
# 保存分析结果
db_manager.save_analysis_result(session_id, analysis_result)
return jsonify({
'success': True,
'data': analysis_result
})
except Exception as e:
logger.error(f'分析会话数据失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/export/report/<session_id>', methods=['GET'])
def export_report(session_id):
"""导出检测报告"""
try:
# 生成报告
report_path = data_processor.generate_report(session_id)
if not os.path.exists(report_path):
return jsonify({'success': False, 'error': '报告生成失败'}), 500
return send_file(
report_path,
as_attachment=True,
download_name=f'detection_report_{session_id}.pdf'
)
except Exception as e:
logger.error(f'导出报告失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 历史记录API ====================
@app.route('/api/history/sessions', methods=['GET'])
def get_detection_sessions():
"""获取检测会话历史"""
try:
page = int(request.args.get('page', 1))
size = int(request.args.get('size', 10))
patient_id = request.args.get('patient_id')
sessions = db_manager.get_detection_sessions(page, size, patient_id)
total = db_manager.get_sessions_count(patient_id)
return jsonify({
'success': True,
'data': {
'sessions': sessions,
'total': total,
'page': page,
'size': size
}
})
except Exception as e:
logger.error(f'获取检测历史失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
def run_detection(session_id, settings):
"""运行检测的后台线程"""
global current_detection
try:
logger.info(f'开始检测会话: {session_id}')
# 检测循环
while (current_detection and
current_detection.get('status') == 'running'):
# 采集数据
if device_manager:
data = device_manager.collect_data()
if data:
# 保存数据到数据库
db_manager.save_detection_data(session_id, data)
current_detection['data_points'] += 1
# 根据采样频率控制循环间隔
frequency = settings.get('frequency', 60)
time.sleep(1.0 / frequency)
# 检查是否达到设定时长
duration = settings.get('duration', 0)
if duration > 0:
elapsed = (datetime.now() - current_detection['start_time']).total_seconds()
if elapsed >= duration:
current_detection['status'] = 'completed'
break
# 更新会话状态
if current_detection:
db_manager.update_session_status(
session_id,
current_detection['status'],
current_detection.get('data_points', 0)
)
logger.info(f'检测会话完成: {session_id}')
except Exception as e:
logger.error(f'检测线程异常: {e}')
if current_detection:
current_detection['status'] = 'error'
current_detection['error'] = str(e)
# ==================== 截图保存API ====================
@app.route('/api/screenshots/save', methods=['POST'])
def save_screenshot():
"""保存截图"""
try:
data = request.get_json()
# 验证必需参数
required_fields = ['patientId', 'patientName', 'sessionId', 'imageData']
for field in required_fields:
if not data.get(field):
return jsonify({
'success': False,
'message': f'缺少必需参数: {field}'
}), 400
patient_id = data['patientId']
patient_name = data['patientName']
session_id = data['sessionId']
image_data = data['imageData']
# 验证base64图片数据格式
if not image_data.startswith('data:image/'):
return jsonify({
'success': False,
'message': '无效的图片数据格式'
}), 400
# 提取base64数据
try:
header, encoded = image_data.split(',', 1)
image_bytes = base64.b64decode(encoded)
except Exception as e:
return jsonify({
'success': False,
'message': f'图片数据解码失败: {str(e)}'
}), 400
# 创建文件夹结构
screenshots_dir = Path('screenshots')
patient_dir = screenshots_dir / f'{patient_id}_{patient_name}'
session_dir = patient_dir / f'{session_id}'
# 确保目录存在
session_dir.mkdir(parents=True, exist_ok=True)
# 生成文件名4位流水号
existing_files = list(session_dir.glob(f'{patient_id}_{session_id}_*.png'))
next_number = len(existing_files) + 1
filename = f'{patient_id}_{session_id}_{next_number:04d}.png'
filepath = session_dir / filename
# 保存图片文件
with open(filepath, 'wb') as f:
f.write(image_bytes)
# 记录到数据库(如果需要)
try:
# 这里可以添加数据库记录逻辑
# db_manager.save_screenshot_record(patient_id, session_id, str(filepath))
pass
except Exception as e:
logger.warning(f'保存截图记录到数据库失败: {e}')
logger.info(f'截图保存成功: {filepath}')
return jsonify({
'success': True,
'message': '截图保存成功',
'filepath': str(filepath),
'filename': filename
})
except Exception as e:
logger.error(f'保存截图失败: {e}')
return jsonify({
'success': False,
'message': f'保存截图失败: {str(e)}'
}), 500
# ==================== 录像保存API ====================
@app.route('/api/recordings/save', methods=['POST'])
def save_recording():
"""保存录像"""
try:
data = request.get_json()
# 验证必需参数
required_fields = ['patientId', 'patientName', 'sessionId', 'videoData']
for field in required_fields:
if not data.get(field):
return jsonify({
'success': False,
'message': f'缺少必需参数: {field}'
}), 400
patient_id = data['patientId']
patient_name = data['patientName']
session_id = data['sessionId']
video_data = data['videoData']
mime_type = data.get('mimeType', 'video/webm;codecs=vp9') # 默认webm格式
# 验证base64视频数据格式
if not video_data.startswith('data:video/'):
return jsonify({
'success': False,
'message': '无效的视频数据格式'
}), 400
# 提取base64数据
try:
header, encoded = video_data.split(',', 1)
video_bytes = base64.b64decode(encoded)
except Exception as e:
return jsonify({
'success': False,
'message': f'视频数据解码失败: {str(e)}'
}), 400
# 创建文件夹结构(与截图保存相同的结构)
recordings_dir = Path('screenshots') # 使用同一个根目录
patient_dir = recordings_dir / f'{patient_id}_{patient_name}'
session_dir = patient_dir / f'{session_id}'
# 确保目录存在
session_dir.mkdir(parents=True, exist_ok=True)
# 根据mimeType确定文件扩展名
if 'mp4' in mime_type:
file_extension = 'mp4'
elif 'webm' in mime_type:
file_extension = 'webm'
else:
file_extension = 'webm' # 默认扩展名
# 生成文件名
filename = f'{patient_id}_{session_id}_recording.{file_extension}'
filepath = session_dir / filename
logger.info(f'录像格式: {mime_type}, 文件扩展名: {file_extension}')
# 保存视频文件
with open(filepath, 'wb') as f:
f.write(video_bytes)
# 记录到数据库(如果需要)
try:
# 这里可以添加数据库记录逻辑
# db_manager.save_recording_record(patient_id, session_id, str(filepath))
pass
except Exception as e:
logger.warning(f'保存录像记录到数据库失败: {e}')
logger.info(f'录像保存成功: {filepath}')
return jsonify({
'success': True,
'message': '录像保存成功',
'filepath': str(filepath),
'filename': filename
})
except Exception as e:
logger.error(f'保存录像失败: {e}')
return jsonify({
'success': False,
'message': f'保存录像失败: {str(e)}'
}), 500
# ==================== 错误处理 ====================
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': 'API接口不存在'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'error': '服务器内部错误'}), 500
if __name__ == '__main__':
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend')
parser.add_argument('--host', default=None, help='Host address to bind to')
parser.add_argument('--port', type=int, default=None, help='Port number to bind to')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
try:
# 初始化应用
init_app()
# 确定主机和端口
host = args.host if args.host else config.get('SERVER', 'host', fallback='127.0.0.1')
port = args.port if args.port else config.getint('SERVER', 'port', fallback=5000)
debug = args.debug if args.debug else config.getboolean('APP', 'debug', fallback=False)
# 启动Flask+SocketIO服务
logger.info(f'启动后端服务... Host: {host}, Port: {port}, Debug: {debug}')
socketio.run(app,
host=host,
port=port,
debug=debug,
use_reloader=False, # 禁用热重载以避免进程问题
log_output=True, # 启用详细日志
allow_unsafe_werkzeug=True
)
except KeyboardInterrupt:
logger.info('服务被用户中断')
except Exception as e:
logger.error(f'服务启动失败: {e}')
sys.exit(1)
finally:
logger.info('后端服务已停止')
# ==================== WebSocket 实时推送RTSP帧 ====================
def generate_test_frame(frame_count):
"""生成测试帧"""
import numpy as np
width, height = 640, 480
# 创建黑色背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
# 添加动态元素
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# 添加时间戳
cv2.putText(frame, timestamp, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加帧计数
cv2.putText(frame, f'TEST Frame: {frame_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加移动的圆形
center_x = int(320 + 200 * np.sin(frame_count * 0.1))
center_y = int(240 + 100 * np.cos(frame_count * 0.1))
cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1)
# 添加变化的矩形
rect_size = int(50 + 30 * np.sin(frame_count * 0.05))
cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1)
return frame
def generate_rtsp_frames():
global rtsp_running
frame_count = 0
error_count = 0
use_test_mode = False
last_frame_time = time.time()
logger.info(f'开始生成RTSP帧URL: {rtsp_url}')
try:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
logger.warning(f'无法打开RTSP流: {rtsp_url},切换到测试模式')
use_test_mode = True
socketio.emit('rtsp_status', {'status': 'started', 'message': '使用测试视频源'})
else:
# 最激进的实时优化设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) # 完全禁用缓冲区
cap.set(cv2.CAP_PROP_FPS, 60) # 提高帧率到60fps
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # MJPEG编码
# 设置更低的分辨率以减少处理时间
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
logger.info('RTSP流已打开开始推送帧激进实时模式')
socketio.emit('rtsp_status', {'status': 'started', 'message': '使用RTSP视频源激进实时模式'})
rtsp_running = True
# 启动帧编码工作线程
encoding_thread = threading.Thread(target=frame_encoding_worker)
encoding_thread.daemon = True
encoding_thread.start()
while rtsp_running:
if use_test_mode:
# 使用测试模式生成帧
frame = generate_test_frame(frame_count)
ret = True
else:
# 使用RTSP流添加帧跳过机制减少延迟
ret, frame = cap.read()
if not ret:
error_count += 1
logger.warning(f'RTSP读取帧失败{error_count}次),尝试重连...')
if 'cap' in locals():
cap.release()
if error_count > 5:
logger.warning('RTSP连接失败次数过多切换到测试模式')
use_test_mode = True
socketio.emit('rtsp_status', {'status': 'switched', 'message': '已切换到测试视频源'})
continue
# 立即重连,不等待
cap = cv2.VideoCapture(rtsp_url)
if cap.isOpened():
# 重连时应用相同的激进实时设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
cap.set(cv2.CAP_PROP_FPS, 60)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
continue
error_count = 0 # 重置错误计数
# 内存优化的帧跳过策略
# 减少跳帧数量,避免过度内存使用
skip_count = 0
while skip_count < 3: # 减少到最多跳过3帧
temp_ret, temp_frame = cap.read()
if temp_ret:
# 立即释放之前的帧
if 'frame' in locals():
del frame
frame = temp_frame
skip_count += 1
else:
break
# 降低帧率以减少内存压力
current_time = time.time()
if current_time - last_frame_time < 1/20: # 降低到20fps最大频率
continue
last_frame_time = current_time
frame_count += 1
# 实现帧跳过以减少内存和网络压力
global frame_skip_counter
frame_skip_counter += 1
if frame_skip_counter % (FRAME_SKIP_RATIO + 1) != 0:
# 跳过此帧,立即释放内存
del frame
continue
try:
# 将帧放入队列进行异步处理
try:
# 非阻塞方式放入队列,如果队列满了就丢弃旧帧
frame_queue.put_nowait((frame.copy(), frame_count))
except queue.Full:
# 队列满了,清空队列并放入新帧
try:
old_frame, _ = frame_queue.get_nowait()
del old_frame # 立即释放旧帧内存
except queue.Empty:
pass
frame_queue.put_nowait((frame.copy(), frame_count))
# 立即释放原始帧内存
del frame
if frame_count % 60 == 0: # 每60帧记录一次
logger.info(f'已推送 {frame_count} 帧到编码队列,跳过率: {FRAME_SKIP_RATIO}/{FRAME_SKIP_RATIO+1}')
# 定期强制垃圾回收
import gc
gc.collect()
except Exception as e:
logger.error(f'帧队列处理失败: {e}')
# 移除sleep以获得最大处理速度让系统自然限制帧率
# time.sleep(1/60) # 注释掉sleep以获得最大实时性
except Exception as e:
logger.error(f'RTSP推流异常: {e}')
socketio.emit('rtsp_status', {'status': 'error', 'message': f'推流异常: {str(e)}'})
finally:
if 'cap' in locals():
cap.release()
rtsp_running = False
logger.info(f'RTSP推流结束总共推送了 {frame_count}')
@socketio.on('start_rtsp')
def handle_start_rtsp(data=None):
global rtsp_thread, rtsp_running
logger.info(f'收到start_rtsp事件客户端ID: {request.sid}, 数据: {data}')
try:
if rtsp_thread and rtsp_thread.is_alive():
logger.warning('RTSP线程已在运行')
emit('rtsp_status', {'status': 'already_running', 'message': 'RTSP已在运行'})
return
if not rtsp_url:
logger.error('RTSP URL未配置')
emit('rtsp_status', {'status': 'error', 'message': 'RTSP URL未配置'})
return
logger.info(f'启动RTSP线程URL: {rtsp_url}')
rtsp_thread = threading.Thread(target=generate_rtsp_frames)
rtsp_thread.daemon = True
rtsp_thread.start()
logger.info('RTSP线程已启动')
emit('rtsp_status', {'status': 'started', 'message': 'RTSP推流已启动'})
except Exception as e:
logger.error(f'启动RTSP失败: {e}')
emit('rtsp_status', {'status': 'error', 'message': f'启动失败: {str(e)}'})
@socketio.on('stop_rtsp')
def handle_stop_rtsp(data=None):
global rtsp_running
logger.info(f'收到stop_rtsp事件客户端ID: {request.sid}, 数据: {data}')
try:
rtsp_running = False
logger.info('RTSP推流已停止')
emit('rtsp_status', {'status': 'stopped', 'message': 'RTSP推流已停止'})
except Exception as e:
logger.error(f'停止RTSP失败: {e}')
emit('rtsp_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
@socketio.on('connect')
def handle_connect():
logger.info(f'客户端连接: {request.sid}')
emit('connect_status', {'status': 'connected', 'sid': request.sid, 'message': '连接成功'})
@socketio.on('disconnect')
def handle_disconnect():
logger.info(f'客户端断开连接: {request.sid}')
@socketio.on('ping')
def handle_ping(data=None):
logger.info(f'收到ping事件客户端ID: {request.sid}, 数据: {data}')
emit('pong', {'timestamp': time.time(), 'message': 'pong'})