BodyBalanceEvaluation/backend/device_manager.py

1982 lines
84 KiB
Python
Raw Normal View History

2025-07-28 11:59:56 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备管理模块
负责摄像头IMU传感器和压力传感器的连接和数据采集
2025-07-31 17:23:05 +08:00
以及视频推流功能
2025-07-28 11:59:56 +08:00
"""
import cv2
import numpy as np
import time
import threading
import json
2025-07-31 17:23:05 +08:00
import queue
import base64
import gc
import os
import psutil
import configparser
2025-07-28 11:59:56 +08:00
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
2025-07-31 17:23:05 +08:00
from concurrent.futures import ThreadPoolExecutor
2025-07-28 11:59:56 +08:00
import logging
# 数据库管理
2025-08-06 08:48:38 +08:00
# from backend.app import get_detection_sessions
from database import DatabaseManager
# FemtoBolt深度相机支持
try:
import pykinect_azure as pykinect
# 重新启用FemtoBolt功能使用正确的Orbbec SDK K4A Wrapper路径
FEMTOBOLT_AVAILABLE = False
print("信息: pykinect_azure库已安装FemtoBolt深度相机功能已启用")
print("使用Orbbec SDK K4A Wrapper以确保与FemtoBolt设备的兼容性")
except ImportError:
FEMTOBOLT_AVAILABLE = False
print("警告: pykinect_azure库未安装FemtoBolt深度相机功能将不可用")
print("请使用以下命令安装: pip install pykinect_azure")
2025-07-28 11:59:56 +08:00
logger = logging.getLogger(__name__)
class DeviceManager:
"""设备管理器"""
def __init__(self, db_manager: DatabaseManager = None):
2025-07-28 11:59:56 +08:00
self.camera = None
self.femtobolt_camera = None
2025-07-28 11:59:56 +08:00
self.imu_device = None
self.pressure_device = None
self.device_status = {
'camera': False,
'femtobolt': False,
2025-07-28 11:59:56 +08:00
'imu': False,
'pressure': False
}
self.calibration_data = {}
self.data_lock = threading.Lock()
self.latest_data = {}
# 数据库连接
self.db_manager = db_manager
# 推流状态和线程
self.camera_streaming = False
self.femtobolt_streaming = False
self.camera_streaming_thread = None
self.femtobolt_streaming_thread = None
self.streaming_stop_event = threading.Event()
# 同步录制状态
self.sync_recording = False
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
# 三路视频录制器
self.feet_video_writer = None
self.body_video_writer = None
self.screen_video_writer = None
# 录制线程和控制
self.feet_recording_thread = None
self.body_recording_thread = None
self.screen_recording_thread = None
self.recording_stop_event = threading.Event()
# 屏幕录制队列
self.screen_frame_queue = queue.Queue(maxsize=100)
# 兼容旧版录制状态
self.recording = False
self.video_writer = None
# FemtoBolt相机相关
self.femtobolt_config = None
self.femtobolt_recording = False
self.femtobolt_color_writer = None
self.femtobolt_depth_writer = None
# WebSocket连接用于推流
self.socketio = None
2025-07-28 11:59:56 +08:00
# 初始化设备
self._init_devices()
def _init_devices(self):
"""初始化所有设备"""
try:
self._init_camera()
self._init_femtobolt_camera()
2025-07-28 11:59:56 +08:00
self._init_imu()
self._init_pressure_sensor()
logger.info('设备初始化完成')
except Exception as e:
logger.error(f'设备初始化失败: {e}')
def _init_camera(self):
"""初始化足部监视摄像头"""
2025-07-28 11:59:56 +08:00
try:
# 从数据库读取摄像头设备索引配置
device_index = 0 # 默认值
if self.db_manager:
try:
monitor_config = self.db_manager.get_system_setting('monitor_device_index')
if monitor_config:
device_index = int(monitor_config)
logger.info(f'从数据库读取摄像头设备索引: {device_index}')
else:
logger.info('数据库中未找到monitor_device_index配置使用默认值0')
except Exception as e:
logger.warning(f'读取摄像头设备索引配置失败使用默认值0: {e}')
else:
logger.warning('数据库管理器未初始化使用默认摄像头索引0')
# 尝试连接指定索引的摄像头
self.camera = cv2.VideoCapture(device_index)
2025-07-28 11:59:56 +08:00
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.camera.set(cv2.CAP_PROP_FPS, 30)
self.device_status['camera'] = True
logger.info(f'摄像头初始化成功,设备索引: {device_index}')
2025-07-28 11:59:56 +08:00
else:
logger.warning(f'摄像头连接失败,设备索引: {device_index}')
2025-07-28 11:59:56 +08:00
self.camera = None
except Exception as e:
logger.error(f'摄像头初始化异常: {e}')
self.camera = None
def _init_femtobolt_camera(self):
"""初始化FemtoBolt深度相机"""
if not FEMTOBOLT_AVAILABLE:
logger.warning('FemtoBolt深度相机库未安装跳过初始化')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
return
try:
# 初始化pykinect_azure库优先使用指定SDK路径
sdk_initialized = False
# 首先尝试手动指定路径(优先级最高)
sdk_paths = self._get_femtobolt_sdk_paths()
for sdk_path in sdk_paths:
if os.path.exists(sdk_path):
try:
logger.info(f'尝试使用FemtoBolt SDK路径: {sdk_path}')
pykinect.initialize_libraries(track_body=False, module_k4a_path=sdk_path)
logger.info(f'✓ 成功使用FemtoBolt SDK: {sdk_path}')
sdk_initialized = True
break
except Exception as e:
logger.warning(f'✗ FemtoBolt SDK路径失败: {sdk_path} - {e}')
continue
# 如果手动指定路径失败,尝试自动检测
if not sdk_initialized:
try:
logger.info('尝试自动检测FemtoBolt SDK...')
pykinect.initialize_libraries(track_body=False)
logger.info('✓ 自动检测FemtoBolt SDK成功')
sdk_initialized = True
except Exception as e:
logger.warning(f'✗ 自动检测失败: {e}')
if not sdk_initialized:
logger.warning('无法初始化FemtoBolt深度相机SDK功能将不可用')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
return
# 配置FemtoBolt设备参数
self.femtobolt_config = pykinect.default_configuration
logger.info('FemtoBolt配置参数。。。。。。。。。。。。。。。。。')
logger.warning(pykinect.default_configuration)
# 从config.ini读取配置
import configparser
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), '..', 'config.ini'))
color_res_str = config.get('DEFAULT', 'femtobolt_color_resolution', fallback='1080P')
depth_range_min = config.getint('DEFAULT', 'femtobolt_depth_range_min', fallback=500)
depth_range_max = config.getint('DEFAULT', 'femtobolt_depth_range_max', fallback=4500)
# 解析分辨率配置,分为宽度和高度
resolution_map = {
'1024x1024': (1024, 1024),
'1920x1080': (1920, 1080),
'1280x720': (1280, 720),
'720x720': (720, 720)
}
width, height = resolution_map.get(color_res_str, (1920, 1080))
# 假设SDK支持设置宽高参数示例代码如下需根据实际SDK调整
if hasattr(self.femtobolt_config, 'color_resolution_width') and hasattr(self.femtobolt_config, 'color_resolution_height'):
self.femtobolt_config.color_resolution_width = width
self.femtobolt_config.color_resolution_height = height
else:
logger.info('FemtoBolt存在分辨率参数。。。。。。。。。。。。。。。。。')
# 兼容原有枚举设置
if color_res_str == '720P':
self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_720P
elif color_res_str == '1080P':
self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
else:
self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
# self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_WFOV_2X2BINNED
self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.femtobolt_config.camera_fps = pykinect.K4A_FRAMES_PER_SECOND_30
self.femtobolt_config.synchronized_images_only = True
# 视效范围参数示例假设SDK支持depth_range_min和depth_range_max
# 直接尝试启动设备pykinect_azure库没有设备数量检测API
logger.info('准备启动FemtoBolt设备...')
# 启动FemtoBolt设备
logger.info('尝试启动FemtoBolt设备...')
self.femtobolt_camera = pykinect.start_device(config=self.femtobolt_config)
if self.femtobolt_camera:
self.device_status['femtobolt'] = True
logger.info('✓ FemtoBolt深度相机初始化成功!')
else:
raise Exception('设备启动返回None')
except Exception as e:
logger.warning(f'FemtoBolt深度相机初始化失败: {e}')
logger.warning('FemtoBolt深度相机功能将不可用但不影响其他功能')
logger.warning('可能的解决方案:')
logger.warning('1. 检查FemtoBolt设备是否正确连接并被识别')
logger.warning('2. 安装Orbbec官方的K4A兼容驱动程序')
logger.warning('3. 确保没有其他应用程序占用设备')
logger.warning('4. 尝试重新插拔设备或重启计算机')
logger.warning('5. 考虑使用Orbbec原生SDK而非Azure Kinect SDK')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
# 不再抛出异常,让系统继续运行其他功能
def _get_femtobolt_sdk_paths(self) -> List[str]:
"""获取FemtoBolt SDK可能的路径列表"""
import platform
sdk_paths = []
if platform.system() == "Windows":
# 优先使用Orbbec SDK K4A Wrapper与azure_kinect_image_example.py一致
2025-08-06 08:48:38 +08:00
base_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_dir, "dll", "bin", "k4a.dll")
orbbec_paths = []
if os.path.exists(dll_path):
orbbec_paths.append(dll_path)
# orbbec_paths = [
# r"D:\BodyBalanceEvaluation\backend\dll\bin\k4a.dll",
# ]
# Azure Kinect SDK标准安装路径备用
standard_paths = [
r"C:\Program Files\Azure Kinect SDK v1.4.1\sdk\windows-desktop\amd64\release\bin\k4a.dll",
r"C:\Program Files\Azure Kinect SDK v1.4.0\sdk\windows-desktop\amd64\release\bin\k4a.dll",
r"C:\Program Files\Azure Kinect SDK v1.4.2\sdk\windows-desktop\amd64\release\bin\k4a.dll",
]
# 优先检查Orbbec路径
for path in orbbec_paths:
if os.path.exists(path):
sdk_paths.append(path)
# 然后检查标准路径
for path in standard_paths:
if os.path.exists(path):
sdk_paths.append(path)
# 项目内的dll目录
project_dll_path = os.path.join(os.path.dirname(__file__), "dll", "k4a.dll")
if os.path.exists(project_dll_path):
sdk_paths.append(project_dll_path)
return sdk_paths
2025-07-28 11:59:56 +08:00
def _init_imu(self):
"""初始化IMU传感器"""
try:
# 这里应该连接实际的IMU设备
# 目前使用模拟数据
self.imu_device = MockIMUDevice()
self.device_status['imu'] = True
logger.info('IMU传感器初始化成功模拟')
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
self.imu_device = None
def _init_pressure_sensor(self):
"""初始化压力传感器"""
try:
# 这里应该连接实际的压力传感器
# 目前使用模拟数据
self.pressure_device = MockPressureDevice()
self.device_status['pressure'] = True
logger.info('压力传感器初始化成功(模拟)')
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
self.pressure_device = None
def get_device_status(self) -> Dict[str, bool]:
"""获取设备状态"""
return self.device_status.copy()
def get_connected_devices(self) -> List[str]:
"""获取已连接的设备列表"""
return [device for device, status in self.device_status.items() if status]
def refresh_devices(self):
"""刷新设备连接"""
logger.info('刷新设备连接...')
# 重新初始化所有设备
if self.camera:
self.camera.release()
self._init_devices()
def calibrate_devices(self) -> Dict[str, Any]:
"""校准设备"""
calibration_result = {}
try:
# 摄像头校准
if self.device_status['camera']:
camera_calibration = self._calibrate_camera()
calibration_result['camera'] = camera_calibration
# IMU校准
if self.device_status['imu']:
imu_calibration = self._calibrate_imu()
calibration_result['imu'] = imu_calibration
# 压力传感器校准
if self.device_status['pressure']:
pressure_calibration = self._calibrate_pressure()
calibration_result['pressure'] = pressure_calibration
self.calibration_data = calibration_result
logger.info('设备校准完成')
except Exception as e:
logger.error(f'设备校准失败: {e}')
raise
return calibration_result
def _calibrate_camera(self) -> Dict[str, Any]:
"""校准摄像头"""
if not self.camera or not self.camera.isOpened():
return {'status': 'failed', 'error': '摄像头未连接'}
try:
# 获取几帧图像进行校准
frames = []
for _ in range(10):
ret, frame = self.camera.read()
if ret:
frames.append(frame)
time.sleep(0.1)
if not frames:
return {'status': 'failed', 'error': '无法获取图像'}
# 计算平均亮度和对比度
avg_brightness = np.mean([np.mean(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) for frame in frames])
calibration = {
'status': 'success',
'brightness': float(avg_brightness),
'resolution': (int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))),
'fps': float(self.camera.get(cv2.CAP_PROP_FPS)),
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]:
"""校准IMU传感器"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
# 收集静态数据进行零点校准
samples = []
for _ in range(100):
data = self.imu_device.read_data()
samples.append(data)
time.sleep(0.01)
# 计算零点偏移
accel_offset = {
'x': np.mean([s['accel']['x'] for s in samples]),
'y': np.mean([s['accel']['y'] for s in samples]),
'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿
}
gyro_offset = {
'x': np.mean([s['gyro']['x'] for s in samples]),
'y': np.mean([s['gyro']['y'] for s in samples]),
'z': np.mean([s['gyro']['z'] for s in samples])
}
calibration = {
'status': 'success',
'accel_offset': accel_offset,
'gyro_offset': gyro_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_pressure(self) -> Dict[str, Any]:
"""校准压力传感器"""
if not self.pressure_device:
return {'status': 'failed', 'error': '压力传感器未连接'}
try:
# 收集零压力数据
samples = []
for _ in range(50):
data = self.pressure_device.read_data()
samples.append(data)
time.sleep(0.02)
# 计算零点偏移
zero_offset = {
'left_foot': np.mean([s['left_foot'] for s in samples]),
'right_foot': np.mean([s['right_foot'] for s in samples])
}
calibration = {
'status': 'success',
'zero_offset': zero_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def collect_data(self, session_id: str, patient_id: str, screen_image_base64: str = None) -> Dict[str, Any]:
"""采集所有设备数据并保存到指定目录结构
2025-07-28 11:59:56 +08:00
Args:
session_id: 检测会话ID
patient_id: 患者ID
screen_image_base64: 前端界面截图的base64数据
2025-07-28 11:59:56 +08:00
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
# 创建数据存储目录
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
data_dir.mkdir(parents=True, exist_ok=True)
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': None,
'body_pose': None,
'body_image': None,
'foot_data': None,
'foot_image': None,
'foot_data_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
2025-08-06 08:48:38 +08:00
# # 1. 采集头部姿态数据从IMU设备获取
# if self.device_status['imu']:
# head_pose_data = self._collect_head_pose_data()
# if head_pose_data:
# data['head_pose'] = json.dumps(head_pose_data)
# logger.debug(f'头部姿态数据采集成功: {session_id}')
# # 2. 采集身体姿态数据从FemtoBolt深度相机获取
# if self.device_status['femtobolt']:
# body_pose_data = self._collect_body_pose_data()
# if body_pose_data:
# data['body_pose'] = json.dumps(body_pose_data)
# logger.debug(f'身体姿态数据采集成功: {session_id}')
# # 3. 采集身体视频截图从FemtoBolt深度相机获取
# if self.device_status['femtobolt']:
# body_image_path = self._capture_body_image(data_dir)
# if body_image_path:
# data['body_image'] = str(body_image_path)
# logger.debug(f'身体截图保存成功: {body_image_path}')
# # 4. 采集足部压力数据(从压力传感器获取)
# if self.device_status['pressure']:
# foot_data = self._collect_foot_pressure_data()
# if foot_data:
# data['foot_data'] = json.dumps(foot_data)
# logger.debug(f'足部压力数据采集成功: {session_id}')
# # 5. 采集足部监测视频截图(从摄像头获取)
# if self.device_status['camera']:
# foot_image_path = self._capture_foot_image(data_dir)
# if foot_image_path:
# data['foot_image'] = str(foot_image_path)
# logger.debug(f'足部截图保存成功: {foot_image_path}')
# # 6. 生成足底压力数据图(从压力传感器数据生成)
# if self.device_status['pressure']:
# foot_data_image_path = self._generate_foot_pressure_image(data_dir)
# if foot_data_image_path:
# data['foot_data_image'] = str(foot_data_image_path)
# logger.debug(f'足底压力数据图生成成功: {foot_data_image_path}')
# 7. 保存屏幕录制截图从前端传入的base64数据
if screen_image_base64:
2025-08-06 08:48:38 +08:00
try:
logger.debug(f'屏幕截图保存.................{screen_image_base64}')
# 保存屏幕截图的base64数据为图片文件
screen_image_path = None
if screen_image_base64:
try:
if screen_image_base64.startswith('data:image/'):
base64_data = screen_image_base64.split(',')[1]
else:
base64_data = screen_image_base64
image_data = base64.b64decode(base64_data)
image_path = data_dir / 'screen_image.png'
with open(image_path, 'wb') as f:
f.write(image_data)
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
screen_image_path = str(abs_image_path.relative_to(abs_cwd))
logger.debug(f'屏幕截图保存成功: {screen_image_path}')
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
import traceback
logger.error(traceback.format_exc())
if screen_image_path:
data['screen_image'] = str(screen_image_path)
logger.debug(f'屏幕截图保存成功: {screen_image_path}')
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
import traceback
logger.error(traceback.format_exc())
2025-07-28 11:59:56 +08:00
# 更新最新数据
with self.data_lock:
self.latest_data = data.copy()
logger.debug(f'数据采集完成: {session_id}, 时间戳: {timestamp}')
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.error(f'数据采集失败: {e}')
return data
def start_video_recording(self, output_path: str) -> bool:
"""开始视频录制"""
2025-07-28 11:59:56 +08:00
if not self.camera or not self.camera.isOpened():
return False
2025-07-28 11:59:56 +08:00
try:
# 获取摄像头参数
width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(self.camera.get(cv2.CAP_PROP_FPS))
2025-07-28 11:59:56 +08:00
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
2025-07-28 11:59:56 +08:00
if self.video_writer.isOpened():
self.recording = True
logger.info(f'开始视频录制: {output_path}')
return True
else:
logger.error('视频写入器创建失败')
return False
except Exception as e:
logger.error(f'开始视频录制失败: {e}')
return False
def stop_video_recording(self):
"""停止视频录制"""
if hasattr(self, 'video_writer') and self.video_writer:
self.video_writer.release()
self.video_writer = None
self.recording = False
logger.info('视频录制已停止')
def start_femtobolt_recording(self, filename=None):
"""开始FemtoBolt深度相机录制"""
if not FEMTOBOLT_AVAILABLE or self.femtobolt_camera is None:
logger.error('FemtoBolt深度相机未初始化无法录制')
return False
try:
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'femtobolt_recording_{timestamp}'
# 确保录制目录存在
os.makedirs('recordings', exist_ok=True)
# 创建彩色和深度视频文件路径
color_filepath = os.path.join('recordings', f'{filename}_color.mp4')
depth_filepath = os.path.join('recordings', f'{filename}_depth.mp4')
# 设置视频参数基于FemtoBolt配置
if self.femtobolt_config.color_resolution == pykinect.K4A_COLOR_RESOLUTION_1080P:
width, height = 1920, 1080
elif self.femtobolt_config.color_resolution == pykinect.K4A_COLOR_RESOLUTION_720P:
width, height = 1280, 720
else:
width, height = 1920, 1080 # 默认
fps = 30 # 默认30fps
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.femtobolt_color_writer = cv2.VideoWriter(color_filepath, fourcc, fps, (width, height))
self.femtobolt_depth_writer = cv2.VideoWriter(depth_filepath, fourcc, fps, (width, height))
if self.femtobolt_color_writer.isOpened() and self.femtobolt_depth_writer.isOpened():
self.femtobolt_recording = True
self.femtobolt_recording_filename = filename
logger.info(f'开始FemtoBolt录制: {filename}')
return True
else:
logger.error('FemtoBolt视频写入器创建失败')
if self.femtobolt_color_writer:
self.femtobolt_color_writer.release()
if self.femtobolt_depth_writer:
self.femtobolt_depth_writer.release()
return False
except Exception as e:
logger.error(f'FemtoBolt开始录制失败: {e}')
return False
def stop_femtobolt_recording(self):
"""停止FemtoBolt深度相机录制"""
if self.femtobolt_recording:
self.femtobolt_recording = False
if hasattr(self, 'femtobolt_color_writer') and self.femtobolt_color_writer:
self.femtobolt_color_writer.release()
self.femtobolt_color_writer = None
if hasattr(self, 'femtobolt_depth_writer') and self.femtobolt_depth_writer:
self.femtobolt_depth_writer.release()
self.femtobolt_depth_writer = None
logger.info('FemtoBolt视频录制已停止')
def start_camera_stream(self):
"""开始摄像头推流"""
if self.camera is None:
logger.error('摄像头未初始化')
return False
try:
self.camera_streaming = True
logger.info('摄像头推流已开始')
return True
except Exception as e:
logger.error(f'摄像头推流启动失败: {e}')
return False
def stop_camera_stream(self):
"""停止摄像头推流"""
self.camera_streaming = False
logger.info('摄像头推流已停止')
def start_femtobolt_stream(self):
"""开始FemtoBolt深度相机推流"""
if not FEMTOBOLT_AVAILABLE or self.femtobolt_camera is None:
logger.error('FemtoBolt深度相机未初始化')
return False
try:
2025-08-04 10:18:50 +08:00
# 检查是否已经在推流
if self.femtobolt_streaming:
logger.warning('FemtoBolt深度相机推流已在运行')
return True
# 重置停止事件
self.streaming_stop_event.clear()
# 设置推流标志
self.femtobolt_streaming = True
2025-08-04 10:18:50 +08:00
# 启动推流线程
self.femtobolt_streaming_thread = threading.Thread(
target=self._femtobolt_streaming_thread,
daemon=True,
name='FemtoBoltStreamingThread'
)
self.femtobolt_streaming_thread.start()
logger.info('FemtoBolt深度相机推流已开始')
return True
except Exception as e:
logger.error(f'FemtoBolt深度相机推流启动失败: {e}')
2025-08-04 10:18:50 +08:00
self.femtobolt_streaming = False
return False
def stop_femtobolt_stream(self):
"""停止FemtoBolt深度相机推流"""
self.femtobolt_streaming = False
logger.debug('FemtoBolt深度相机推流已停止')
def record_femtobolt_frame(self, color_image, depth_image):
"""录制FemtoBolt帧到视频文件"""
if not self.femtobolt_recording:
return
try:
if hasattr(self, 'femtobolt_color_writer') and self.femtobolt_color_writer and color_image is not None:
# 确保图像尺寸正确
if color_image.shape[:2] != (1080, 1920): # height, width
color_image = cv2.resize(color_image, (1920, 1080))
self.femtobolt_color_writer.write(color_image)
if hasattr(self, 'femtobolt_depth_writer') and self.femtobolt_depth_writer and depth_image is not None:
# 将深度图像转换为3通道格式用于视频录制
depth_normalized = cv2.normalize(depth_image, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U)
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 确保图像尺寸正确
if depth_colored.shape[:2] != (1080, 1920): # height, width
depth_colored = cv2.resize(depth_colored, (1920, 1080))
self.femtobolt_depth_writer.write(depth_colored)
except Exception as e:
logger.error(f'录制FemtoBolt帧失败: {e}')
def set_socketio(self, socketio):
"""设置WebSocket连接用于推流"""
self.socketio = socketio
def start_streaming(self) -> Dict[str, bool]:
"""启动视频推流
Returns:
Dict: 推流启动状态
{
'camera_streaming': bool,
'femtobolt_streaming': bool
2025-07-28 11:59:56 +08:00
}
"""
result = {
'camera_streaming': False,
'femtobolt_streaming': False
}
try:
# 重置停止事件
self.streaming_stop_event.clear()
# 启动足部监视摄像头推流
if self.device_status['camera'] and not self.camera_streaming:
self.camera_streaming = True
self.camera_streaming_thread = threading.Thread(
target=self._camera_streaming_thread,
daemon=True,
name='CameraStreamingThread'
)
self.camera_streaming_thread.start()
result['camera_streaming'] = True
logger.debug('足部监视摄像头推流已启动')
# 启动FemtoBolt深度相机推流
if self.device_status['femtobolt'] and not self.femtobolt_streaming:
self.femtobolt_streaming = True
self.femtobolt_streaming_thread = threading.Thread(
target=self._femtobolt_streaming_thread,
daemon=True,
name='FemtoBoltStreamingThread'
)
self.femtobolt_streaming_thread.start()
result['femtobolt_streaming'] = True
logger.debug('FemtoBolt深度相机推流已启动')
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.warning(f'启动推流失败: {e}')
return result
2025-07-28 11:59:56 +08:00
def stop_streaming(self) -> bool:
"""停止所有视频推流
Returns:
bool: 停止操作是否成功
"""
try:
# 设置停止事件
self.streaming_stop_event.set()
# 停止摄像头推流
if self.camera_streaming:
self.camera_streaming = False
if self.camera_streaming_thread and self.camera_streaming_thread.is_alive():
self.camera_streaming_thread.join(timeout=2)
logger.debug('足部监视摄像头推流已停止')
# 停止FemtoBolt推流
if self.femtobolt_streaming:
self.femtobolt_streaming = False
if self.femtobolt_streaming_thread and self.femtobolt_streaming_thread.is_alive():
self.femtobolt_streaming_thread.join(timeout=2)
logger.debug('FemtoBolt深度相机推流已停止')
return True
except Exception as e:
logger.warning(f'停止推流失败: {e}')
return False
def _camera_streaming_thread(self):
"""足部监视摄像头推流线程"""
frame_count = 0
try:
while self.camera_streaming and not self.streaming_stop_event.is_set():
if self.camera and self.camera.isOpened():
ret, frame = self.camera.read()
if ret and self.socketio:
# 编码并推送帧
try:
# 调整帧大小以减少网络负载
height, width = frame.shape[:2]
if width > 640:
scale = 640 / width
new_width = 640
new_height = int(height * scale)
frame = cv2.resize(frame, (new_width, new_height))
# JPEG编码
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
success, buffer = cv2.imencode('.jpg', frame, encode_param)
if success:
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
self.socketio.emit('video_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
frame_count += 1
except Exception as e:
logger.debug(f'摄像头帧推送失败: {e}')
# 控制帧率
2025-08-04 10:18:50 +08:00
# time.sleep(1/30) # 30 FPS
except Exception as e:
logger.debug(f'摄像头推流线程异常: {e}')
finally:
self.camera_streaming = False
def _femtobolt_streaming_thread(self):
"""FemtoBolt深度相机推流线程"""
frame_count = 0
try:
while self.femtobolt_streaming and not self.streaming_stop_event.is_set():
if self.femtobolt_camera and self.socketio:
try:
# 获取FemtoBolt帧
capture = self.femtobolt_camera.update()
2025-08-04 10:18:50 +08:00
# 检查capture是否有效并获取彩色深度图像
if capture is not None:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 读取config.ini中的深度范围配置
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
try:
depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
except Exception:
depth_range_min = None
depth_range_max = None
# 优化深度图彩色映射范围外用黑色区间内用Jet模型从蓝色到黄色到红色渐变
if depth_range_min is not None and depth_range_max is not None:
# 归一化深度值到0-255范围
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# 应用OpenCV的COLORMAP_JET进行伪彩色映射
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 范围外用黑色
mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
depth_colored[mask_outside] = [0, 0, 0] # BGR黑色
else:
# 如果没有配置,使用默认伪彩色映射
depth_colored = cv2.convertScaleAbs(depth_image, alpha=0.03)
depth_colored = cv2.applyColorMap(depth_colored, cv2.COLORMAP_JET)
2025-08-04 10:18:50 +08:00
# 转换颜色格式(如果需要)
if len(depth_colored.shape) == 3 and depth_colored.shape[2] == 4:
depth_colored = cv2.cvtColor(depth_colored, cv2.COLOR_BGRA2BGR)
elif len(depth_colored.shape) == 3 and depth_colored.shape[2] == 3:
2025-08-04 10:18:50 +08:00
pass
# 预处理裁剪成宽460高819保持高度不裁剪宽度从中间裁剪
height, width = depth_colored.shape[:2]
target_width = 460
target_height = 819
# 计算裁剪区域的纵向起点,保持高度不裁剪,纵向居中裁剪或上下填充(这里保持高度不裁剪,故不裁剪高度)
# 计算宽度裁剪起点
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
cropped_image = depth_colored[:, left:right]
else:
cropped_image = depth_colored
# 如果高度不足target_height进行上下填充黑边
cropped_height = cropped_image.shape[0]
if cropped_height < target_height:
pad_top = (target_height - cropped_height) // 2
pad_bottom = target_height - cropped_height - pad_top
cropped_image = cv2.copyMakeBorder(cropped_image, pad_top, pad_bottom, 0, 0, cv2.BORDER_CONSTANT, value=[0,0,0])
elif cropped_height > target_height:
# 如果高度超过target_height裁剪高度中间部分
top = (cropped_height - target_height) // 2
cropped_image = cropped_image[top:top+target_height, :]
# 最终调整大小保持宽460高819
depth_colored = cv2.resize(cropped_image, (target_width, target_height))
2025-08-04 10:18:50 +08:00
# JPEG编码
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
success, buffer = cv2.imencode('.jpg', depth_colored, encode_param)
2025-08-04 10:18:50 +08:00
if success and self.socketio:
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
self.socketio.emit('depth_camera_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
frame_count += 1
else:
# 如果没有获取到有效帧,短暂等待后继续
time.sleep(0.01)
except Exception as e:
logger.debug(f'FemtoBolt帧推送失败: {e}')
# 发生错误时短暂等待,避免快速循环
time.sleep(0.1)
# 控制帧率
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.debug(f'FemtoBolt推流线程异常: {e}')
finally:
self.femtobolt_streaming = False
def start_recording(self, session_id: str, patient_id: str) -> Dict[str, Any]:
"""启动同步录制
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 录制启动状态和信息
{
'success': bool,
'session_id': str,
'patient_id': str,
'recording_start_time': str,
'video_paths': {
'feet_video': str,
'body_video': str,
'screen_video': str
},
'message': str
}
"""
result = {
'success': False,
'session_id': session_id,
'patient_id': patient_id,
'recording_start_time': None,
'video_paths': {
'feet_video': None,
'body_video': None,
'screen_video': None
2025-07-28 11:59:56 +08:00
},
'message': ''
2025-07-28 11:59:56 +08:00
}
try:
# 检查是否已在录制
if self.sync_recording:
result['message'] = f'已在录制中当前会话ID: {self.current_session_id}'
return result
# 设置录制参数
self.current_session_id = session_id
self.current_patient_id = patient_id
self.recording_start_time = datetime.now()
# 创建存储目录
base_path = os.path.join('data', 'patients', patient_id, session_id)
os.makedirs(base_path, exist_ok=True)
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
body_video_path = os.path.join(base_path, 'body.mp4')
screen_video_path = os.path.join(base_path, 'screen.mp4')
2025-07-28 11:59:56 +08:00
result['video_paths']['feet_video'] = feet_video_path
result['video_paths']['body_video'] = body_video_path
result['video_paths']['screen_video'] = screen_video_path
# 更新数据库中的视频路径
if self.db_manager:
try:
# 更新会话状态为录制中
self.db_manager.update_session_status(session_id, 'recording')
2025-07-28 11:59:56 +08:00
# 更新视频文件路径
self.db_manager.update_session_normal_video_path(session_id, feet_video_path)
self.db_manager.update_session_femtobolt_video_path(session_id, body_video_path)
self.db_manager.update_session_screen_video_path(session_id, screen_video_path)
2025-07-28 11:59:56 +08:00
logger.debug(f'数据库视频路径更新成功 - 会话ID: {session_id}')
except Exception as db_error:
logger.error(f'更新数据库视频路径失败: {db_error}')
# 数据库更新失败不影响录制启动,继续执行
# 视频编码参数
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = 30
# 初始化视频写入器
if self.device_status['camera']:
# 获取摄像头分辨率
if self.camera and self.camera.isOpened():
width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.feet_video_writer = cv2.VideoWriter(
feet_video_path, fourcc, fps, (width, height)
)
if self.device_status['femtobolt']:
# FemtoBolt默认分辨率
self.body_video_writer = cv2.VideoWriter(
body_video_path, fourcc, fps, (1280, 720)
)
2025-08-06 08:48:38 +08:00
# # 屏幕录制写入器(默认分辨率,后续根据实际帧调整)
# self.screen_video_writer = cv2.VideoWriter(
# screen_video_path, fourcc, fps, (1920, 1080)
# )
# 重置停止事件
self.recording_stop_event.clear()
# 启动录制线程
if self.feet_video_writer:
self.feet_recording_thread = threading.Thread(
target=self._feet_recording_thread,
daemon=True,
name='FeetRecordingThread'
)
self.feet_recording_thread.start()
if self.body_video_writer:
self.body_recording_thread = threading.Thread(
target=self._body_recording_thread,
daemon=True,
name='BodyRecordingThread'
)
self.body_recording_thread.start()
2025-08-06 08:48:38 +08:00
# #屏幕录制
# if self.screen_video_writer:
# self.screen_recording_thread = threading.Thread(
# target=self._screen_recording_thread,
# daemon=True,
# name='ScreenRecordingThread'
# )
# self.screen_recording_thread.start()
# 设置录制状态
self.sync_recording = True
result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat()
result['message'] = '同步录制已启动'
logger.debug(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}')
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.error(f'启动同步录制失败: {e}')
result['message'] = f'启动录制失败: {str(e)}'
# 清理已创建的写入器
self._cleanup_video_writers()
return result
2025-07-28 11:59:56 +08:00
2025-08-06 08:48:38 +08:00
def stop_recording(self, session_id: str, video_data_base64: str = None) -> Dict[str, Any]:
"""停止同步录制
Args:
session_id: 检测会话ID
2025-08-06 08:48:38 +08:00
video_data_base64: 屏幕录制视频的base64编码数据可选
Returns:
Dict: 录制停止状态和信息
"""
result = {
'success': False,
'session_id': session_id,
'recording_duration': 0,
'video_files': [],
'message': ''
}
2025-07-28 11:59:56 +08:00
try:
# 检查录制状态
if not self.sync_recording:
result['message'] = '当前没有进行录制'
return result
2025-07-28 11:59:56 +08:00
if self.current_session_id != session_id:
result['message'] = f'会话ID不匹配当前录制会话: {self.current_session_id}'
return result
# 设置停止事件
self.recording_stop_event.set()
2025-08-06 08:48:38 +08:00
session_data = self.db_manager.get_session_data(session_id)
base_path = os.path.join('data', 'patients', session_data['patient_id'], session_id)
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
body_video_path = os.path.join(base_path, 'body.mp4')
screen_video_path = os.path.join(base_path, 'screen.mp4')
# 等待录制线程结束
threads_to_join = [
(self.feet_recording_thread, 'feet'),
2025-08-06 08:48:38 +08:00
(self.body_recording_thread, 'body')
]
for thread, name in threads_to_join:
if thread and thread.is_alive():
thread.join(timeout=3)
if thread.is_alive():
logger.debug(f'{name}录制线程未能正常结束')
# 计算录制时长
if self.recording_start_time:
duration = (datetime.now() - self.recording_start_time).total_seconds()
result['recording_duration'] = duration
# 清理视频写入器并收集文件信息
video_files = self._cleanup_video_writers()
2025-08-06 08:48:38 +08:00
# 保存传入的屏幕录制视频数据,替代原有屏幕录制视频保存逻辑
if video_data_base64:
try:
video_bytes = base64.b64decode(video_data_base64)
with open(screen_video_path, 'wb') as f:
f.write(video_bytes)
video_files.append(screen_video_path)
logger.info(f'屏幕录制视频保存成功,路径: {screen_video_path}, 文件大小: {os.path.getsize(screen_video_path)} 字节')
except Exception as e:
logger.error(f'保存屏幕录制视频失败: {e}', exc_info=True)
logger.debug(f'视频数据长度: {len(video_data_base64)}')
raise
result['video_files'] = video_files
# 更新数据库中的会话信息
if self.db_manager and result['recording_duration'] > 0:
try:
duration_seconds = int(result['recording_duration'])
self.db_manager.update_session_duration(session_id, duration_seconds)
2025-08-06 08:48:38 +08:00
self.db_manager.update_session_normal_video_path(session_id, feet_video_path)
self.db_manager.update_session_femtobolt_video_path(session_id, body_video_path)
self.db_manager.update_session_screen_video_path(session_id, screen_video_path)
self.db_manager.update_session_status(session_id, 'completed')
logger.debug(f'数据库会话信息更新成功 - 会话ID: {session_id}, 持续时间: {duration_seconds}')
except Exception as db_error:
logger.error(f'更新数据库会话信息失败: {db_error}')
# 重置录制状态
self.sync_recording = False
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
2025-07-28 11:59:56 +08:00
result['success'] = True
result['message'] = '同步录制已停止'
2025-07-28 11:59:56 +08:00
logger.debug(f'同步录制已停止 - 会话ID: {session_id}, 录制时长: {result["recording_duration"]:.2f}')
2025-07-28 11:59:56 +08:00
except Exception as e:
2025-08-06 08:48:38 +08:00
logger.error(f'停止同步录制失败: {e}', exc_info=True)
result['message'] = f'停止录制失败: {str(e)}'
return result
2025-07-28 11:59:56 +08:00
def add_screen_frame(self, frame_data: str):
"""添加屏幕录制帧
Args:
frame_data: base64编码的屏幕截图数据
"""
if self.sync_recording and not self.screen_frame_queue.full():
try:
self.screen_frame_queue.put(frame_data, block=False)
except:
# 队列满时丢弃帧
pass
2025-07-28 11:59:56 +08:00
def _feet_recording_thread(self):
"""足部视频录制线程"""
try:
while self.sync_recording and not self.recording_stop_event.is_set():
if self.camera and self.camera.isOpened() and self.feet_video_writer:
ret, frame = self.camera.read()
if ret:
self.feet_video_writer.write(frame)
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.error(f'足部录制线程异常: {e}')
def _body_recording_thread(self):
"""身体视频录制线程"""
try:
while self.sync_recording and not self.recording_stop_event.is_set():
if self.femtobolt_camera and self.body_video_writer:
try:
capture = self.femtobolt_camera.update()
if capture.color is not None:
# 转换颜色格式
color_image = capture.color
color_image = cv2.cvtColor(color_image, cv2.COLOR_BGRA2BGR)
# 调整到录制分辨率
color_image = cv2.resize(color_image, (1280, 720))
self.body_video_writer.write(color_image)
except Exception as e:
logger.error(f'FemtoBolt录制帧处理失败: {e}')
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.error(f'身体录制线程异常: {e}')
def _screen_recording_thread(self):
"""屏幕录制线程"""
try:
while self.sync_recording and not self.recording_stop_event.is_set():
try:
# 从队列获取屏幕帧
frame_data = self.screen_frame_queue.get(timeout=1)
# 解码base64图像
image_data = base64.b64decode(frame_data)
nparr = np.frombuffer(image_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is not None and self.screen_video_writer:
# 调整到录制分辨率
frame = cv2.resize(frame, (1920, 1080))
self.screen_video_writer.write(frame)
except queue.Empty:
continue
except Exception as e:
logger.error(f'屏幕录制帧处理失败: {e}')
except Exception as e:
logger.error(f'屏幕录制线程异常: {e}')
def _cleanup_video_writers(self) -> List[str]:
"""清理视频写入器并返回文件列表"""
video_files = []
2025-07-28 11:59:56 +08:00
try:
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
if self.current_patient_id and self.current_session_id:
feet_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'feet.mp4')
if os.path.exists(feet_path):
video_files.append(feet_path)
2025-07-28 11:59:56 +08:00
if self.body_video_writer:
self.body_video_writer.release()
self.body_video_writer = None
if self.current_patient_id and self.current_session_id:
body_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'body.mp4')
if os.path.exists(body_path):
video_files.append(body_path)
2025-07-28 11:59:56 +08:00
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
if self.current_patient_id and self.current_session_id:
screen_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'screen.mp4')
if os.path.exists(screen_path):
video_files.append(screen_path)
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.error(f'清理视频写入器失败: {e}')
return video_files
2025-07-28 11:59:56 +08:00
def cleanup(self):
"""清理资源"""
try:
# 停止推流
self.stop_streaming()
# 停止录制
if self.sync_recording:
self.stop_recording(self.current_session_id)
2025-07-28 11:59:56 +08:00
if self.camera:
self.camera.release()
if hasattr(self, 'video_writer') and self.video_writer:
self.video_writer.release()
# 清理FemtoBolt录像写入器
if hasattr(self, 'femtobolt_color_writer') and self.femtobolt_color_writer:
self.femtobolt_color_writer.release()
if hasattr(self, 'femtobolt_depth_writer') and self.femtobolt_depth_writer:
self.femtobolt_depth_writer.release()
# 清理同步录制写入器
if self.feet_video_writer:
self.feet_video_writer.release()
if self.body_video_writer:
self.body_video_writer.release()
if self.screen_video_writer:
self.screen_video_writer.release()
if self.femtobolt_camera:
self.femtobolt_camera = None
logger.debug('设备资源已清理')
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.error(f'清理设备资源失败: {e}')
class MockIMUDevice:
"""模拟IMU设备"""
def __init__(self):
self.noise_level = 0.1
def read_data(self) -> Dict[str, Any]:
"""读取IMU数据"""
return {
'accel': {
'x': np.random.normal(0, self.noise_level),
'y': np.random.normal(0, self.noise_level),
'z': np.random.normal(9.8, self.noise_level) # 重力加速度
},
'gyro': {
'x': np.random.normal(0, self.noise_level),
'y': np.random.normal(0, self.noise_level),
'z': np.random.normal(0, self.noise_level)
},
'temperature': np.random.normal(25, 2),
'timestamp': datetime.now().isoformat()
}
class MockPressureDevice:
"""模拟压力传感器设备"""
def __init__(self):
self.base_pressure = 500 # 基础压力值
self.noise_level = 10
def read_data(self) -> Dict[str, Any]:
"""读取压力数据"""
# 模拟轻微的左右脚压力差异
left_pressure = self.base_pressure + np.random.normal(0, self.noise_level)
right_pressure = self.base_pressure + np.random.normal(0, self.noise_level)
return {
'left_foot': max(0, left_pressure),
'right_foot': max(0, right_pressure),
'timestamp': datetime.now().isoformat()
2025-07-31 17:23:05 +08:00
}
class VideoStreamManager:
"""视频推流管理器"""
def __init__(self, socketio=None):
self.socketio = socketio
2025-08-02 16:52:17 +08:00
self.device_index = None
self.video_thread = None
self.video_running = False
2025-07-31 17:23:05 +08:00
# 用于异步编码的线程池和队列
self.encoding_executor = ThreadPoolExecutor(max_workers=2)
self.frame_queue = queue.Queue(maxsize=1) # 只保留最新的一帧
# 内存优化配置
self.frame_skip_counter = 0
self.FRAME_SKIP_RATIO = 1 # 每3帧发送1帧减少网络和内存压力
self.MAX_FRAME_SIZE = (640, 480) # 进一步减小帧尺寸以节省内存
self.MAX_MEMORY_USAGE = 200 * 1024 * 1024 # 200MB内存限制
self.memory_check_counter = 0
self.MEMORY_CHECK_INTERVAL = 50 # 每50帧检查一次内存
# 读取RTSP配置
self._load_rtsp_config()
def _load_rtsp_config(self):
"""加载RTSP配置"""
try:
config = configparser.ConfigParser()
2025-08-02 16:52:17 +08:00
config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
2025-07-31 17:23:05 +08:00
config.read(config_path, encoding='utf-8')
2025-08-02 16:52:17 +08:00
device_index_str = config.get('CAMERA', 'device_index', fallback='0')
self.device_index = int(device_index_str) if device_index_str else 0
logger.info(f'视频监控设备配置加载完成,设备号: {self.device_index}')
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'视频监控设备配置失败: {e}')
self.device_index = None
2025-07-31 17:23:05 +08:00
def get_memory_usage(self):
"""获取当前进程内存使用量(字节)"""
try:
process = psutil.Process(os.getpid())
return process.memory_info().rss
except:
return 0
def async_encode_frame(self, frame, frame_count):
"""异步编码帧 - 内存优化版本"""
try:
# 内存检查
self.memory_check_counter += 1
if self.memory_check_counter >= self.MEMORY_CHECK_INTERVAL:
self.memory_check_counter = 0
current_memory = self.get_memory_usage()
if current_memory > self.MAX_MEMORY_USAGE:
logger.warning(f"内存使用过高: {current_memory / 1024 / 1024:.2f}MB强制清理")
gc.collect()
# 如果内存仍然过高,跳过此帧
if self.get_memory_usage() > self.MAX_MEMORY_USAGE:
del frame
return
# 更激进的图像尺寸压缩以节省内存
height, width = frame.shape[:2]
target_width, target_height = self.MAX_FRAME_SIZE
if width > target_width or height > target_height:
# 计算缩放比例,保持宽高比
scale_w = target_width / width
scale_h = target_height / height
scale = min(scale_w, scale_h)
new_width = int(width * scale)
new_height = int(height * scale)
# 使用更快的插值方法减少CPU使用
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
# 优化JPEG编码参数优先考虑速度和内存
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), 50, # 进一步降低质量以减少内存使用
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1, # 启用优化
int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0 # 禁用渐进式以减少内存
]
success, buffer = cv2.imencode('.jpg', frame, encode_param)
if not success:
logger.error('图像编码失败')
return
# 立即释放frame内存
del frame
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_frame', {
2025-07-31 17:23:05 +08:00
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
# 立即释放base64字符串
del jpg_as_text
except Exception as e:
logger.error(f'异步编码帧失败: {e}')
finally:
# 定期强制垃圾回收
if self.memory_check_counter % 10 == 0:
gc.collect()
def frame_encoding_worker(self):
"""帧编码工作线程"""
2025-08-02 16:52:17 +08:00
while self.video_running:
2025-07-31 17:23:05 +08:00
try:
# 从队列获取帧
frame, frame_count = self.frame_queue.get(timeout=1)
# 提交到线程池进行异步编码
self.encoding_executor.submit(self.async_encode_frame, frame, frame_count)
except queue.Empty:
continue
except Exception as e:
logger.error(f'帧编码工作线程异常: {e}')
2025-07-31 17:23:05 +08:00
def generate_test_frame(self, frame_count):
"""生成测试帧"""
width, height = 640, 480
# 创建黑色背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
# 添加动态元素
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# 添加时间戳
cv2.putText(frame, timestamp, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加帧计数
cv2.putText(frame, f'TEST Frame: {frame_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加移动的圆形
center_x = int(320 + 200 * np.sin(frame_count * 0.1))
center_y = int(240 + 100 * np.cos(frame_count * 0.1))
cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1)
# 添加变化的矩形
rect_size = int(50 + 30 * np.sin(frame_count * 0.05))
cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1)
return frame
2025-08-02 16:52:17 +08:00
def generate_video_frames(self):
"""生成视频监控帧"""
2025-07-31 17:23:05 +08:00
frame_count = 0
error_count = 0
use_test_mode = False
last_frame_time = time.time()
logger.debug(f'开始生成视频监控帧,设备号: {self.device_index}')
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
cap = cv2.VideoCapture(self.device_index)
2025-07-31 17:23:05 +08:00
if not cap.isOpened():
logger.debug(f'无法打开视频监控流: {self.device_index},切换到测试模式')
2025-07-31 17:23:05 +08:00
use_test_mode = True
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'started', 'message': '使用测试视频源'})
2025-07-31 17:23:05 +08:00
else:
# 最激进的实时优化设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) # 完全禁用缓冲区
cap.set(cv2.CAP_PROP_FPS, 60) # 提高帧率到60fps
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # MJPEG编码
# 设置更低的分辨率以减少处理时间
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
logger.debug('视频监控流已打开,开始推送帧(激进实时模式)')
2025-07-31 17:23:05 +08:00
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'started', 'message': '使用视频监控视频源(激进实时模式)'})
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
self.video_running = True
2025-07-31 17:23:05 +08:00
# 启动帧编码工作线程
encoding_thread = threading.Thread(target=self.frame_encoding_worker)
encoding_thread.daemon = True
encoding_thread.start()
2025-08-02 16:52:17 +08:00
while self.video_running:
2025-07-31 17:23:05 +08:00
if use_test_mode:
# 使用测试模式生成帧
frame = self.generate_test_frame(frame_count)
ret = True
else:
2025-08-02 16:52:17 +08:00
# 使用视频监控流,添加帧跳过机制减少延迟
2025-07-31 17:23:05 +08:00
ret, frame = cap.read()
if not ret:
error_count += 1
logger.debug(f'视频监控读取帧失败(第{error_count}次),尝试重连...')
2025-07-31 17:23:05 +08:00
if 'cap' in locals():
cap.release()
if error_count > 5:
logger.debug('视频监控连接失败次数过多,切换到测试模式')
2025-07-31 17:23:05 +08:00
use_test_mode = True
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'switched', 'message': '已切换到测试视频源'})
2025-07-31 17:23:05 +08:00
continue
# 立即重连,不等待
2025-08-02 16:52:17 +08:00
cap = cv2.VideoCapture(self.device_index)
2025-07-31 17:23:05 +08:00
if cap.isOpened():
# 重连时应用相同的激进实时设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
cap.set(cv2.CAP_PROP_FPS, 60)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
continue
error_count = 0 # 重置错误计数
# 内存优化的帧跳过策略
# 减少跳帧数量,避免过度内存使用
skip_count = 0
while skip_count < 3: # 减少到最多跳过3帧
temp_ret, temp_frame = cap.read()
if temp_ret:
# 立即释放之前的帧
if 'frame' in locals():
del frame
frame = temp_frame
skip_count += 1
else:
break
# 降低帧率以减少内存压力
current_time = time.time()
if current_time - last_frame_time < 1/20: # 降低到20fps最大频率
continue
last_frame_time = current_time
frame_count += 1
# 实现帧跳过以减少内存和网络压力
self.frame_skip_counter += 1
if self.frame_skip_counter % (self.FRAME_SKIP_RATIO + 1) != 0:
# 跳过此帧,立即释放内存
del frame
continue
try:
# 将帧放入队列进行异步处理
try:
# 非阻塞方式放入队列,如果队列满了就丢弃旧帧
self.frame_queue.put_nowait((frame.copy(), frame_count))
except queue.Full:
# 队列满了,清空队列并放入新帧
try:
old_frame, _ = self.frame_queue.get_nowait()
del old_frame # 立即释放旧帧内存
except queue.Empty:
pass
self.frame_queue.put_nowait((frame.copy(), frame_count))
# 立即释放原始帧内存
del frame
if frame_count % 60 == 0: # 每60帧记录一次
2025-07-31 17:23:05 +08:00
# 定期强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f'帧队列处理失败: {e}')
except Exception as e:
# logger.error(f'监控视频推流异常: {e}')
2025-07-31 17:23:05 +08:00
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'error', 'message': f'推流异常: {str(e)}'})
2025-07-31 17:23:05 +08:00
finally:
if 'cap' in locals():
cap.release()
2025-08-02 16:52:17 +08:00
self.video_running = False
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
def start_video_stream(self):
"""启动视频监控推流"""
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
if self.video_thread and self.video_thread.is_alive():
logger.warning('视频监控线程已在运行')
return {'status': 'already_running', 'message': '视频监控已在运行'}
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
if not self.device_index:
logger.error('视频监控相机未配置')
return {'status': 'error', 'message': '视频监控相机未配置'}
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
logger.info(f'视频启动监控线程,设备号: {self.device_index}')
self.video_thread = threading.Thread(target=self.generate_video_frames)
self.video_thread.daemon = True
self.video_thread.start()
self.video_running = True
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
logger.info('视频监控线程已启动')
return {'status': 'started', 'message': '视频监控线程已启动'}
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'视频监控线程启动失败: {e}')
return {'status': 'error', 'message': f'视频监控线程启动失败: {str(e)}'}
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
def stop_video_stream(self):
"""停止视频监控推流"""
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
self.video_running = False
logger.info('视频监控推流已停止')
return {'status': 'stopped', 'message': '视频监控推流已停止'}
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'停止视频监控推流失败: {e}')
2025-07-31 17:23:05 +08:00
return {'status': 'error', 'message': f'停止失败: {str(e)}'}
def is_streaming(self):
"""检查是否正在推流"""
2025-08-02 16:52:17 +08:00
return self.video_running
2025-07-31 17:23:05 +08:00
def get_stream_status(self):
"""获取推流状态"""
return {
2025-08-02 16:52:17 +08:00
'running': self.video_running,
'device_index': self.device_index,
'thread_alive': self.video_thread.is_alive() if self.video_thread else False
2025-07-31 17:23:05 +08:00
}
2025-07-31 17:23:05 +08:00
def _collect_head_pose_data(self) -> Dict[str, Any]:
"""采集头部姿态数据从IMU设备获取"""
2025-07-31 17:23:05 +08:00
try:
# 模拟IMU头部姿态数据
head_pose = {
'roll': np.random.uniform(-30, 30),
'pitch': np.random.uniform(-30, 30),
'yaw': np.random.uniform(-180, 180),
'acceleration': {
'x': np.random.uniform(-2, 2),
'y': np.random.uniform(-2, 2),
'z': np.random.uniform(8, 12)
},
'gyroscope': {
'x': np.random.uniform(-5, 5),
'y': np.random.uniform(-5, 5),
'z': np.random.uniform(-5, 5)
},
'timestamp': datetime.now().isoformat()
}
return head_pose
except Exception as e:
logger.error(f'头部姿态数据采集失败: {e}')
return None
def _collect_body_pose_data(self) -> Dict[str, Any]:
"""采集身体姿态数据从FemtoBolt深度相机获取"""
try:
# 模拟身体姿态关键点数据
body_pose = {
'keypoints': {
'head': {'x': 320, 'y': 100, 'confidence': 0.95},
'neck': {'x': 320, 'y': 150, 'confidence': 0.92},
'left_shoulder': {'x': 280, 'y': 180, 'confidence': 0.88},
'right_shoulder': {'x': 360, 'y': 180, 'confidence': 0.90},
'left_elbow': {'x': 250, 'y': 220, 'confidence': 0.85},
'right_elbow': {'x': 390, 'y': 220, 'confidence': 0.87},
'left_wrist': {'x': 220, 'y': 260, 'confidence': 0.82},
'right_wrist': {'x': 420, 'y': 260, 'confidence': 0.84},
'spine': {'x': 320, 'y': 250, 'confidence': 0.93},
'left_hip': {'x': 300, 'y': 350, 'confidence': 0.89},
'right_hip': {'x': 340, 'y': 350, 'confidence': 0.91},
'left_knee': {'x': 290, 'y': 450, 'confidence': 0.86},
'right_knee': {'x': 350, 'y': 450, 'confidence': 0.88},
'left_ankle': {'x': 285, 'y': 550, 'confidence': 0.83},
'right_ankle': {'x': 355, 'y': 550, 'confidence': 0.85}
},
'balance_score': np.random.uniform(0.6, 1.0),
'center_of_mass': {'x': 320, 'y': 350},
'timestamp': datetime.now().isoformat()
}
return body_pose
except Exception as e:
logger.error(f'身体姿态数据采集失败: {e}')
return None
def _capture_body_image(self, data_dir: Path) -> Optional[str]:
"""采集身体视频截图从FemtoBolt深度相机获取"""
try:
# 模拟从FemtoBolt深度相机获取图像
# 实际实现中应该从深度相机获取真实图像
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
2025-07-31 17:23:05 +08:00
# 保存图片
image_path = data_dir / 'body_image.jpg'
cv2.imwrite(str(image_path), image)
2025-07-31 17:23:05 +08:00
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'身体截图保存失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Dict[str, Any]:
"""采集足部压力数据(从压力传感器获取)"""
try:
# 模拟压力传感器数据
pressure_data = {
'left_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'right_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'balance_ratio': 0,
'timestamp': datetime.now().isoformat()
}
# 计算总压力和平衡比例
left_total = sum(pressure_data['left_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
right_total = sum(pressure_data['right_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
pressure_data['left_foot']['total_pressure'] = left_total
pressure_data['right_foot']['total_pressure'] = right_total
if left_total + right_total > 0:
pressure_data['balance_ratio'] = left_total / (left_total + right_total)
return pressure_data
except Exception as e:
logger.error(f'足部压力数据采集失败: {e}')
return None
def _capture_foot_image(self, data_dir: Path) -> Optional[str]:
"""采集足部监测视频截图(从摄像头获取)"""
try:
if self.camera is not None:
ret, frame = self.camera.read()
if ret:
# 保存图片
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), frame)
return str(image_path.relative_to(Path.cwd()))
# 如果摄像头不可用,生成模拟图像
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), image)
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'足部截图保存失败: {e}')
return None
def _generate_foot_pressure_image(self, data_dir: Path) -> Optional[str]:
"""生成足底压力数据图(从压力传感器数据生成)"""
try:
# 创建压力分布热力图
fig_width, fig_height = 400, 600
pressure_map = np.zeros((fig_height, fig_width, 3), dtype=np.uint8)
# 模拟左脚压力分布
left_foot_x = fig_width // 4
left_foot_y = fig_height // 2
# 模拟右脚压力分布
right_foot_x = 3 * fig_width // 4
right_foot_y = fig_height // 2
# 绘制压力点(用不同颜色表示压力大小)
for i in range(20):
x = np.random.randint(left_foot_x - 50, left_foot_x + 50)
y = np.random.randint(left_foot_y - 100, left_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
x = np.random.randint(right_foot_x - 50, right_foot_x + 50)
y = np.random.randint(right_foot_y - 100, right_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
# 保存图片
image_path = data_dir / 'foot_data_image.jpg'
cv2.imwrite(str(image_path), pressure_map)
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'足底压力数据图生成失败: {e}')
return None
def _save_screen_image(self, data_dir: Path, screen_image_base64: str) -> Optional[str]:
"""保存屏幕录制截图从前端传入的base64数据"""
try:
# 解码base64数据
if screen_image_base64.startswith('data:image/'):
# 移除data:image/jpeg;base64,前缀
base64_data = screen_image_base64.split(',')[1]
else:
base64_data = screen_image_base64
# 解码并保存图片
image_data = base64.b64decode(base64_data)
image_path = data_dir / 'screen_image.jpg'
2025-07-31 17:23:05 +08:00
with open(image_path, 'wb') as f:
f.write(image_data)
2025-07-31 17:23:05 +08:00
return str(image_path.relative_to(Path.cwd()))
2025-07-31 17:23:05 +08:00
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
return None