BodyBalanceEvaluation/backend/device_manager.py

2719 lines
130 KiB
Python
Raw Normal View History

2025-07-28 11:59:56 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备管理模块
负责摄像头IMU传感器和压力传感器的连接和数据采集
2025-07-31 17:23:05 +08:00
以及视频推流功能
2025-07-28 11:59:56 +08:00
"""
import cv2
import numpy as np
import time
import threading
import json
2025-07-31 17:23:05 +08:00
import queue
import base64
import gc
import os
import psutil
import configparser
2025-07-28 11:59:56 +08:00
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
2025-07-31 17:23:05 +08:00
from concurrent.futures import ThreadPoolExecutor
2025-07-28 11:59:56 +08:00
import logging
2025-08-11 09:23:04 +08:00
# matplotlib相关导入用于深度图渲染
try:
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.pyplot as plt
MATPLOTLIB_AVAILABLE = True
except ImportError:
MATPLOTLIB_AVAILABLE = False
print("警告: matplotlib库未安装将使用默认深度图渲染")
# 数据库管理
2025-08-06 08:48:38 +08:00
# from backend.app import get_detection_sessions
from database import DatabaseManager
# FemtoBolt深度相机支持
try:
import pykinect_azure as pykinect
# 重新启用FemtoBolt功能使用正确的Orbbec SDK K4A Wrapper路径
2025-08-06 14:51:42 +08:00
FEMTOBOLT_AVAILABLE = True
print("信息: pykinect_azure库已安装FemtoBolt深度相机功能已启用")
print("使用Orbbec SDK K4A Wrapper以确保与FemtoBolt设备的兼容性")
except ImportError:
FEMTOBOLT_AVAILABLE = False
print("警告: pykinect_azure库未安装FemtoBolt深度相机功能将不可用")
print("请使用以下命令安装: pip install pykinect_azure")
2025-07-28 11:59:56 +08:00
logger = logging.getLogger(__name__)
class DeviceManager:
"""设备管理器"""
def __init__(self, db_manager: DatabaseManager = None):
2025-07-28 11:59:56 +08:00
self.camera = None
self.femtobolt_camera = None
2025-07-28 11:59:56 +08:00
self.imu_device = None
self.pressure_device = None
self.device_status = {
'camera': False,
'femtobolt': False,
2025-07-28 11:59:56 +08:00
'imu': False,
'pressure': False
}
self.calibration_data = {}
self.data_lock = threading.Lock()
2025-08-06 14:51:42 +08:00
self.camera_lock = threading.Lock() # 摄像头访问锁
2025-07-28 11:59:56 +08:00
self.latest_data = {}
# 数据库连接
self.db_manager = db_manager
# 推流状态和线程
self.camera_streaming = False
self.femtobolt_streaming = False
self.camera_streaming_thread = None
self.femtobolt_streaming_thread = None
self.streaming_stop_event = threading.Event()
2025-08-06 14:51:42 +08:00
# 全局帧缓存机制
self.frame_cache = {}
self.frame_cache_lock = threading.RLock() # 可重入锁
self.max_cache_size = 10 # 最大缓存帧数
self.cache_timeout = 5.0 # 缓存超时时间(秒)
# 同步录制状态
self.sync_recording = False
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
# 三路视频录制器
self.feet_video_writer = None
self.body_video_writer = None
self.screen_video_writer = None
# 录制线程和控制
self.feet_recording_thread = None
self.body_recording_thread = None
self.screen_recording_thread = None
self.recording_stop_event = threading.Event()
# 屏幕录制队列
self.screen_frame_queue = queue.Queue(maxsize=100)
# 兼容旧版录制状态
self.recording = False
self.video_writer = None
# FemtoBolt相机相关
self.femtobolt_config = None
self.femtobolt_recording = False
self.femtobolt_color_writer = None
self.femtobolt_depth_writer = None
# WebSocket连接用于推流
self.socketio = None
2025-07-28 11:59:56 +08:00
# 初始化设备
self._init_devices()
2025-08-06 14:51:42 +08:00
2025-07-28 11:59:56 +08:00
def _init_devices(self):
"""初始化所有设备"""
try:
self._init_camera()
self._init_femtobolt_camera()
2025-07-28 11:59:56 +08:00
self._init_imu()
self._init_pressure_sensor()
logger.info('设备初始化完成')
except Exception as e:
logger.error(f'设备初始化失败: {e}')
def _init_camera(self):
"""初始化足部监视摄像头"""
2025-07-28 11:59:56 +08:00
try:
# 从数据库读取摄像头设备索引配置
device_index = 0 # 默认值
if self.db_manager:
try:
monitor_config = self.db_manager.get_system_setting('monitor_device_index')
if monitor_config:
device_index = int(monitor_config)
logger.info(f'从数据库读取摄像头设备索引: {device_index}')
else:
logger.info('数据库中未找到monitor_device_index配置使用默认值0')
except Exception as e:
logger.warning(f'读取摄像头设备索引配置失败使用默认值0: {e}')
else:
logger.warning('数据库管理器未初始化使用默认摄像头索引0')
# 尝试连接指定索引的摄像头
2025-08-11 09:23:04 +08:00
# self.camera = cv2.VideoCapture(device_index)
# if self.camera.isOpened():
# # 设置摄像头参数
# self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# self.camera.set(cv2.CAP_PROP_FPS, 30)
# # 设置缓冲区大小为1避免帧积累
# self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
2025-07-28 11:59:56 +08:00
2025-08-11 09:23:04 +08:00
self.device_status['camera'] = True
# logger.info(f'摄像头初始化成功,设备索引: {device_index}')
# else:
# logger.warning(f'摄像头连接失败,设备索引: {device_index}')
# self.camera = None
2025-07-28 11:59:56 +08:00
except Exception as e:
logger.error(f'摄像头初始化异常: {e}')
self.camera = None
def _init_femtobolt_camera(self):
"""初始化FemtoBolt深度相机"""
if not FEMTOBOLT_AVAILABLE:
logger.warning('FemtoBolt深度相机库未安装跳过初始化')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
return
try:
# 初始化pykinect_azure库优先使用指定SDK路径
# 首先尝试手动指定路径(优先级最高)
sdk_paths = self._get_femtobolt_sdk_paths()
for sdk_path in sdk_paths:
if os.path.exists(sdk_path):
try:
pykinect.initialize_libraries(track_body=False, module_k4a_path=sdk_path)
logger.info(f'✓ 成功使用FemtoBolt SDK: {sdk_path}')
break
except Exception as e:
logger.warning(f'✗ FemtoBolt SDK路径失败: {sdk_path} - {e}')
continue
# 配置FemtoBolt设备参数
self.femtobolt_config = pykinect.default_configuration
logger.info('FemtoBolt配置参数。。。。。。。。。。。。。。。。。')
logger.warning(pykinect.default_configuration)
# 从config.ini读取配置
import configparser
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), '..', 'config.ini'))
2025-08-11 09:23:04 +08:00
# color_res_str = config.get('DEFAULT', 'femtobolt_color_resolution', fallback='1080P')
# depth_range_min = config.getint('DEFAULT', 'femtobolt_depth_range_min', fallback=500)
# depth_range_max = config.getint('DEFAULT', 'femtobolt_depth_range_max', fallback=4500)
2025-08-11 09:23:04 +08:00
# # 解析分辨率配置,分为宽度和高度
# resolution_map = {
# '1024x1024': (1024, 1024),
# '1920x1080': (1920, 1080),
# '1280x720': (1280, 720),
# '720x720': (720, 720)
# }
# width, height = resolution_map.get(color_res_str, (1920, 1080))
# 假设SDK支持设置宽高参数示例代码如下需根据实际SDK调整
2025-08-11 09:23:04 +08:00
# if hasattr(self.femtobolt_config, 'color_resolution_width') and hasattr(self.femtobolt_config, 'color_resolution_height'):
# self.femtobolt_config.color_resolution_width = width
# self.femtobolt_config.color_resolution_height = height
# else:
# logger.info('FemtoBolt存在分辨率参数。。。。。。。。。。。。。。。。。')
# # 兼容原有枚举设置
# if color_res_str == '720P':
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_720P
# elif color_res_str == '1080P':
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
# else:
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
2025-08-11 09:23:04 +08:00
# self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.femtobolt_config.camera_fps = pykinect.K4A_FRAMES_PER_SECOND_15
self.femtobolt_config.synchronized_images_only = False
self.femtobolt_config.color_resolution = 0
# 视效范围参数示例假设SDK支持depth_range_min和depth_range_max
2025-08-07 14:49:18 +08:00
# 直接尝试启动设备pykinect_azure库没有设备数量检测API
2025-08-11 09:23:04 +08:00
# logger.info('准备启动FemtoBolt设备...')
2025-08-07 14:49:18 +08:00
# 启动FemtoBolt设备
2025-08-11 09:23:04 +08:00
logger.info(f'尝试启动FemtoBolt设备...,参数详情是{self.femtobolt_config}')
2025-08-07 14:49:18 +08:00
self.femtobolt_camera = pykinect.start_device(config=self.femtobolt_config)
if self.femtobolt_camera:
self.device_status['femtobolt'] = True
logger.info('✓ FemtoBolt深度相机初始化成功!')
else:
raise Exception('设备启动返回None')
except Exception as e:
logger.warning(f'FemtoBolt深度相机初始化失败: {e}')
logger.warning('FemtoBolt深度相机功能将不可用但不影响其他功能')
logger.warning('可能的解决方案:')
logger.warning('1. 检查FemtoBolt设备是否正确连接并被识别')
logger.warning('2. 安装Orbbec官方的K4A兼容驱动程序')
logger.warning('3. 确保没有其他应用程序占用设备')
logger.warning('4. 尝试重新插拔设备或重启计算机')
logger.warning('5. 考虑使用Orbbec原生SDK而非Azure Kinect SDK')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
# 不再抛出异常,让系统继续运行其他功能
def _get_femtobolt_sdk_paths(self) -> List[str]:
"""获取FemtoBolt SDK可能的路径列表"""
import platform
sdk_paths = []
if platform.system() == "Windows":
# 优先使用Orbbec SDK K4A Wrapper与azure_kinect_image_example.py一致
2025-08-06 08:48:38 +08:00
base_dir = os.path.dirname(os.path.abspath(__file__))
2025-08-11 09:23:04 +08:00
dll_path = os.path.join(base_dir, "dll","femtobolt","bin", "k4a.dll")
sdk_paths.append(dll_path)
return sdk_paths
2025-07-28 11:59:56 +08:00
def _init_imu(self):
"""初始化IMU传感器"""
try:
# 这里应该连接实际的IMU设备
# 目前使用模拟数据
self.imu_device = MockIMUDevice()
self.device_status['imu'] = True
logger.info('IMU传感器初始化成功模拟')
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
self.imu_device = None
def _init_pressure_sensor(self):
"""初始化压力传感器"""
try:
# 这里应该连接实际的压力传感器
# 目前使用模拟数据
self.pressure_device = MockPressureDevice()
self.device_status['pressure'] = True
logger.info('压力传感器初始化成功(模拟)')
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
self.pressure_device = None
def get_device_status(self) -> Dict[str, bool]:
"""获取设备状态"""
return self.device_status.copy()
def get_connected_devices(self) -> List[str]:
"""获取已连接的设备列表"""
return [device for device, status in self.device_status.items() if status]
def refresh_devices(self):
"""刷新设备连接"""
logger.info('刷新设备连接...')
2025-08-06 14:51:42 +08:00
# 使用锁保护摄像头重新初始化
with self.camera_lock:
if self.camera:
self.camera.release()
self.camera = None
2025-07-28 11:59:56 +08:00
self._init_devices()
def calibrate_devices(self) -> Dict[str, Any]:
"""校准设备"""
calibration_result = {}
try:
# 摄像头校准
2025-08-07 14:38:08 +08:00
# if self.device_status['camera']:
# camera_calibration = self._calibrate_camera()
# calibration_result['camera'] = camera_calibration
2025-07-28 11:59:56 +08:00
# IMU校准
if self.device_status['imu']:
imu_calibration = self._calibrate_imu()
calibration_result['imu'] = imu_calibration
# 压力传感器校准
if self.device_status['pressure']:
pressure_calibration = self._calibrate_pressure()
calibration_result['pressure'] = pressure_calibration
self.calibration_data = calibration_result
logger.info('设备校准完成')
except Exception as e:
logger.error(f'设备校准失败: {e}')
raise
return calibration_result
def _calibrate_camera(self) -> Dict[str, Any]:
"""校准摄像头"""
if not self.camera or not self.camera.isOpened():
return {'status': 'failed', 'error': '摄像头未连接'}
try:
# 获取几帧图像进行校准
frames = []
for _ in range(10):
ret, frame = self.camera.read()
if ret:
frames.append(frame)
time.sleep(0.1)
if not frames:
return {'status': 'failed', 'error': '无法获取图像'}
# 计算平均亮度和对比度
avg_brightness = np.mean([np.mean(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) for frame in frames])
calibration = {
'status': 'success',
'brightness': float(avg_brightness),
'resolution': (int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))),
'fps': float(self.camera.get(cv2.CAP_PROP_FPS)),
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]:
"""校准IMU传感器"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
# 收集静态数据进行零点校准
samples = []
for _ in range(100):
data = self.imu_device.read_data()
samples.append(data)
time.sleep(0.01)
# 计算零点偏移
accel_offset = {
'x': np.mean([s['accel']['x'] for s in samples]),
'y': np.mean([s['accel']['y'] for s in samples]),
'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿
}
gyro_offset = {
'x': np.mean([s['gyro']['x'] for s in samples]),
'y': np.mean([s['gyro']['y'] for s in samples]),
'z': np.mean([s['gyro']['z'] for s in samples])
}
# 计算头部姿态零点偏移(正立状态为标准零位)
head_pose_offset = {
'rotation': np.mean([s['head_pose']['rotation'] for s in samples if 'head_pose' in s]),
'tilt': np.mean([s['head_pose']['tilt'] for s in samples if 'head_pose' in s]),
'pitch': np.mean([s['head_pose']['pitch'] for s in samples if 'head_pose' in s])
}
2025-07-28 11:59:56 +08:00
calibration = {
'status': 'success',
'accel_offset': accel_offset,
'gyro_offset': gyro_offset,
'head_pose_offset': head_pose_offset, # 头部姿态零点偏移
2025-07-28 11:59:56 +08:00
'timestamp': datetime.now().isoformat()
}
# 保存校准数据到设备实例
if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration)
2025-07-28 11:59:56 +08:00
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_pressure(self) -> Dict[str, Any]:
"""校准压力传感器"""
if not self.pressure_device:
return {'status': 'failed', 'error': '压力传感器未连接'}
try:
# 收集零压力数据
samples = []
for _ in range(50):
data = self.pressure_device.read_data()
samples.append(data)
time.sleep(0.02)
# 计算零点偏移
zero_offset = {
'left_foot': np.mean([s['left_foot'] for s in samples]),
'right_foot': np.mean([s['right_foot'] for s in samples])
}
calibration = {
'status': 'success',
'zero_offset': zero_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def collect_data(self, session_id: str, patient_id: str, screen_image_base64: str = None) -> Dict[str, Any]:
2025-08-06 14:51:42 +08:00
# 实例化VideoStreamManagerVideoStreamManager类在同一文件中定义
2025-08-07 14:38:08 +08:00
video_stream_manager = VideoStreamManager(device_manager=self)
"""采集所有设备数据并保存到指定目录结构
2025-07-28 11:59:56 +08:00
Args:
session_id: 检测会话ID
patient_id: 患者ID
screen_image_base64: 前端界面截图的base64数据
2025-07-28 11:59:56 +08:00
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
# 创建数据存储目录
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
2025-08-11 09:23:04 +08:00
data_dir.mkdir(parents=True, exist_ok=True)
2025-08-07 14:38:08 +08:00
2025-08-11 09:23:04 +08:00
# 设置目录权限为777完全权限
try:
import os
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': None,
'body_pose': None,
'body_image': None,
'foot_data': None,
'foot_image': None,
'foot_data_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
2025-08-06 08:48:38 +08:00
# # 1. 采集头部姿态数据从IMU设备获取
# if self.device_status['imu']:
# head_pose_data = self._collect_head_pose_data()
# if head_pose_data:
# data['head_pose'] = json.dumps(head_pose_data)
# logger.debug(f'头部姿态数据采集成功: {session_id}')
# # 2. 采集身体姿态数据从FemtoBolt深度相机获取
# if self.device_status['femtobolt']:
# body_pose_data = self._collect_body_pose_data()
# if body_pose_data:
# data['body_pose'] = json.dumps(body_pose_data)
# logger.debug(f'身体姿态数据采集成功: {session_id}')
2025-08-06 14:51:42 +08:00
# 3. 采集身体视频截图从FemtoBolt深度相机获取
if self.device_status['femtobolt']:
try:
body_image_path = video_stream_manager._capture_body_image(data_dir, self)
if body_image_path:
data['body_image'] = str(body_image_path)
logger.debug(f'身体截图保存成功: {body_image_path}')
except Exception as e:
logger.error(f'调用_video_stream_manager._capture_body_image异常: {e}')
2025-08-06 08:48:38 +08:00
# # 4. 采集足部压力数据(从压力传感器获取)
# if self.device_status['pressure']:
# foot_data = self._collect_foot_pressure_data()
# if foot_data:
# data['foot_data'] = json.dumps(foot_data)
# logger.debug(f'足部压力数据采集成功: {session_id}')
2025-08-06 14:51:42 +08:00
# 5. 采集足部监测视频截图(从摄像头获取)
if self.device_status['camera']:
2025-08-07 14:38:08 +08:00
foot_image_path = video_stream_manager._capture_foot_image(data_dir,self)
2025-08-06 14:51:42 +08:00
if foot_image_path:
data['foot_image'] = str(foot_image_path)
logger.debug(f'足部截图保存成功: {foot_image_path}')
2025-08-06 08:48:38 +08:00
# # 6. 生成足底压力数据图(从压力传感器数据生成)
# if self.device_status['pressure']:
# foot_data_image_path = self._generate_foot_pressure_image(data_dir)
# if foot_data_image_path:
# data['foot_data_image'] = str(foot_data_image_path)
# logger.debug(f'足底压力数据图生成成功: {foot_data_image_path}')
# 7. 保存屏幕录制截图从前端传入的base64数据
if screen_image_base64:
2025-08-06 08:48:38 +08:00
try:
2025-08-06 14:51:42 +08:00
# logger.debug(f'屏幕截图保存.................{screen_image_base64}')
2025-08-06 08:48:38 +08:00
# 保存屏幕截图的base64数据为图片文件
screen_image_path = None
if screen_image_base64:
try:
if screen_image_base64.startswith('"
"""生成足部压力图片的base64数据"""
try:
import base64
from io import BytesIO
2025-08-07 14:38:08 +08:00
import matplotlib
matplotlib.use('Agg') # 设置非交互式后端避免Tkinter错误
import matplotlib.pyplot as plt
import matplotlib.patches as patches
2025-08-07 14:38:08 +08:00
import logging
# 临时禁用PIL的调试日志
pil_logger = logging.getLogger('PIL')
original_level = pil_logger.level
pil_logger.setLevel(logging.WARNING)
# 创建图形
fig, ax = plt.subplots(1, 1, figsize=(6, 8))
ax.set_xlim(0, 10)
ax.set_ylim(0, 12)
ax.set_aspect('equal')
ax.axis('off')
# 定义颜色映射(根据压力值)
max_pressure = max(left_front, left_rear, right_front, right_rear)
if max_pressure > 0:
left_front_color = plt.cm.Reds(left_front / max_pressure)
left_rear_color = plt.cm.Reds(left_rear / max_pressure)
right_front_color = plt.cm.Reds(right_front / max_pressure)
right_rear_color = plt.cm.Reds(right_rear / max_pressure)
else:
left_front_color = left_rear_color = right_front_color = right_rear_color = 'lightgray'
# 绘制左脚
left_front_rect = patches.Rectangle((1, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=left_front_color)
left_rear_rect = patches.Rectangle((1, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=left_rear_color)
# 绘制右脚
right_front_rect = patches.Rectangle((7, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=right_front_color)
right_rear_rect = patches.Rectangle((7, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=right_rear_color)
# 添加到图形
ax.add_patch(left_front_rect)
ax.add_patch(left_rear_rect)
ax.add_patch(right_front_rect)
ax.add_patch(right_rear_rect)
# 添加标签
ax.text(2, 8, f'{left_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 4, f'{left_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 8, f'{right_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 4, f'{right_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 0.5, '左足', ha='center', va='center', fontsize=12, weight='bold')
ax.text(8, 0.5, '右足', ha='center', va='center', fontsize=12, weight='bold')
# 保存为base64
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, facecolor='white')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
plt.close(fig)
2025-08-07 14:38:08 +08:00
# 恢复PIL的日志级别
pil_logger.setLevel(original_level)
return f"data:image/png;base64,{image_base64}"
except Exception as e:
2025-08-07 14:38:08 +08:00
# 确保在异常情况下也恢复PIL的日志级别
try:
pil_logger.setLevel(original_level)
except:
pass
logger.warning(f"生成压力图片失败: {e}")
# 返回一个简单的占位符base64图片
return ""
2025-07-31 17:23:05 +08:00
class VideoStreamManager:
"""视频推流管理器"""
2025-08-07 14:38:08 +08:00
def __init__(self, socketio=None, device_manager=None):
2025-07-31 17:23:05 +08:00
self.socketio = socketio
2025-08-07 14:38:08 +08:00
self.device_manager = device_manager
2025-08-02 16:52:17 +08:00
self.device_index = None
self.video_thread = None
self.video_running = False
2025-07-31 17:23:05 +08:00
2025-08-11 09:23:04 +08:00
# # 用于异步编码的线程池和队列
2025-07-31 17:23:05 +08:00
self.encoding_executor = ThreadPoolExecutor(max_workers=2)
self.frame_queue = queue.Queue(maxsize=1) # 只保留最新的一帧
# 内存优化配置
self.frame_skip_counter = 0
self.FRAME_SKIP_RATIO = 1 # 每3帧发送1帧减少网络和内存压力
self.MAX_FRAME_SIZE = (640, 480) # 进一步减小帧尺寸以节省内存
self.MAX_MEMORY_USAGE = 200 * 1024 * 1024 # 200MB内存限制
self.memory_check_counter = 0
2025-08-11 09:23:04 +08:00
# 移除了MEMORY_CHECK_INTERVAL改为每30帧检查一次内存
2025-07-31 17:23:05 +08:00
# 读取RTSP配置
self._load_rtsp_config()
def _load_rtsp_config(self):
"""加载RTSP配置"""
try:
config = configparser.ConfigParser()
2025-08-02 16:52:17 +08:00
config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
2025-07-31 17:23:05 +08:00
config.read(config_path, encoding='utf-8')
2025-08-02 16:52:17 +08:00
device_index_str = config.get('CAMERA', 'device_index', fallback='0')
self.device_index = int(device_index_str) if device_index_str else 0
logger.info(f'视频监控设备配置加载完成,设备号: {self.device_index}')
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'视频监控设备配置失败: {e}')
self.device_index = None
2025-07-31 17:23:05 +08:00
def get_memory_usage(self):
"""获取当前进程内存使用量(字节)"""
try:
process = psutil.Process(os.getpid())
return process.memory_info().rss
except:
return 0
def async_encode_frame(self, frame, frame_count):
"""异步编码帧 - 内存优化版本"""
try:
# 内存检查
self.memory_check_counter += 1
if self.memory_check_counter >= self.MEMORY_CHECK_INTERVAL:
self.memory_check_counter = 0
current_memory = self.get_memory_usage()
if current_memory > self.MAX_MEMORY_USAGE:
logger.warning(f"内存使用过高: {current_memory / 1024 / 1024:.2f}MB强制清理")
gc.collect()
# 如果内存仍然过高,跳过此帧
if self.get_memory_usage() > self.MAX_MEMORY_USAGE:
del frame
return
# 更激进的图像尺寸压缩以节省内存
height, width = frame.shape[:2]
target_width, target_height = self.MAX_FRAME_SIZE
if width > target_width or height > target_height:
# 计算缩放比例,保持宽高比
scale_w = target_width / width
scale_h = target_height / height
scale = min(scale_w, scale_h)
new_width = int(width * scale)
new_height = int(height * scale)
# 使用更快的插值方法减少CPU使用
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
2025-08-11 09:23:04 +08:00
self.device_manager._save_frame_to_cache(frame, 'camera')
2025-07-31 17:23:05 +08:00
# 优化JPEG编码参数优先考虑速度和内存
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), 50, # 进一步降低质量以减少内存使用
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1, # 启用优化
int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0 # 禁用渐进式以减少内存
]
success, buffer = cv2.imencode('.jpg', frame, encode_param)
if not success:
logger.error('图像编码失败')
return
# 立即释放frame内存
del frame
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_frame', {
2025-07-31 17:23:05 +08:00
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
# 立即释放base64字符串
del jpg_as_text
except Exception as e:
logger.error(f'异步编码帧失败: {e}')
finally:
# 定期强制垃圾回收
if self.memory_check_counter % 10 == 0:
gc.collect()
def frame_encoding_worker(self):
"""帧编码工作线程"""
2025-08-02 16:52:17 +08:00
while self.video_running:
2025-07-31 17:23:05 +08:00
try:
# 从队列获取帧
frame, frame_count = self.frame_queue.get(timeout=1)
2025-08-11 09:23:04 +08:00
2025-07-31 17:23:05 +08:00
# 提交到线程池进行异步编码
self.encoding_executor.submit(self.async_encode_frame, frame, frame_count)
except queue.Empty:
continue
except Exception as e:
logger.error(f'帧编码工作线程异常: {e}')
2025-07-31 17:23:05 +08:00
def generate_test_frame(self, frame_count):
"""生成测试帧"""
2025-08-07 14:38:08 +08:00
width, height = self.MAX_FRAME_SIZE
2025-07-31 17:23:05 +08:00
# 创建黑色背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
# 添加动态元素
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# 添加时间戳
cv2.putText(frame, timestamp, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加帧计数
cv2.putText(frame, f'TEST Frame: {frame_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加移动的圆形
center_x = int(320 + 200 * np.sin(frame_count * 0.1))
center_y = int(240 + 100 * np.cos(frame_count * 0.1))
cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1)
# 添加变化的矩形
rect_size = int(50 + 30 * np.sin(frame_count * 0.05))
cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1)
return frame
2025-08-02 16:52:17 +08:00
def generate_video_frames(self):
"""生成视频监控帧"""
2025-07-31 17:23:05 +08:00
frame_count = 0
error_count = 0
use_test_mode = False
last_frame_time = time.time()
2025-08-07 14:38:08 +08:00
width,height=self.MAX_FRAME_SIZE
logger.debug(f'开始生成视频监控帧,设备号: {self.device_index}')
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
cap = cv2.VideoCapture(self.device_index)
2025-07-31 17:23:05 +08:00
if not cap.isOpened():
logger.debug(f'无法打开视频监控流: {self.device_index},切换到测试模式')
2025-07-31 17:23:05 +08:00
use_test_mode = True
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'started', 'message': '使用测试视频源'})
2025-07-31 17:23:05 +08:00
else:
# 最激进的实时优化设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0) # 完全禁用缓冲区
cap.set(cv2.CAP_PROP_FPS, 60) # 提高帧率到60fps
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')) # MJPEG编码
# 设置更低的分辨率以减少处理时间
2025-08-07 14:38:08 +08:00
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
2025-07-31 17:23:05 +08:00
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'started', 'message': '使用视频监控视频源(激进实时模式)'})
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
self.video_running = True
2025-07-31 17:23:05 +08:00
2025-08-11 09:23:04 +08:00
# # 启动帧编码工作线程
# encoding_thread = threading.Thread(target=self.frame_encoding_worker)
# encoding_thread.daemon = True
# encoding_thread.start()
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
while self.video_running:
2025-07-31 17:23:05 +08:00
if use_test_mode:
# 使用测试模式生成帧
frame = self.generate_test_frame(frame_count)
ret = True
else:
2025-08-02 16:52:17 +08:00
# 使用视频监控流,添加帧跳过机制减少延迟
2025-07-31 17:23:05 +08:00
ret, frame = cap.read()
if not ret:
error_count += 1
logger.debug(f'视频监控读取帧失败(第{error_count}次),尝试重连...')
2025-07-31 17:23:05 +08:00
if 'cap' in locals():
cap.release()
if error_count > 5:
logger.debug('视频监控连接失败次数过多,切换到测试模式')
2025-07-31 17:23:05 +08:00
use_test_mode = True
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'switched', 'message': '已切换到测试视频源'})
2025-07-31 17:23:05 +08:00
continue
# 立即重连,不等待
2025-08-02 16:52:17 +08:00
cap = cv2.VideoCapture(self.device_index)
2025-07-31 17:23:05 +08:00
if cap.isOpened():
# 重连时应用相同的激进实时设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
cap.set(cv2.CAP_PROP_FPS, 60)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
2025-08-07 14:38:08 +08:00
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
2025-07-31 17:23:05 +08:00
continue
error_count = 0 # 重置错误计数
# 内存优化的帧跳过策略
# 减少跳帧数量,避免过度内存使用
skip_count = 0
while skip_count < 3: # 减少到最多跳过3帧
temp_ret, temp_frame = cap.read()
if temp_ret:
# 立即释放之前的帧
if 'frame' in locals():
del frame
frame = temp_frame
skip_count += 1
else:
break
# 降低帧率以减少内存压力
current_time = time.time()
if current_time - last_frame_time < 1/20: # 降低到20fps最大频率
continue
last_frame_time = current_time
frame_count += 1
# 实现帧跳过以减少内存和网络压力
self.frame_skip_counter += 1
if self.frame_skip_counter % (self.FRAME_SKIP_RATIO + 1) != 0:
# 跳过此帧,立即释放内存
del frame
continue
try:
2025-08-11 09:23:04 +08:00
# 直接在主循环中执行帧处理逻辑(替代异步工作线程)
# 内存检查
self.memory_check_counter += 1
if self.memory_check_counter % 30 == 0:
memory_usage = psutil.virtual_memory().percent
if memory_usage > 85:
logger.warning(f'内存使用率过高: {memory_usage}%,跳过当前帧')
del frame
continue
# 按照MAX_FRAME_SIZE裁剪帧
cropped_frame = frame.copy()
width, height = self.MAX_FRAME_SIZE
if cropped_frame.shape[1] > width or cropped_frame.shape[0] > height:
# 计算裁剪区域(居中裁剪)
start_x = max(0, (cropped_frame.shape[1] - width) // 2)
start_y = max(0, (cropped_frame.shape[0] - height) // 2)
end_x = min(cropped_frame.shape[1], start_x + width)
end_y = min(cropped_frame.shape[0], start_y + height)
cropped_frame = cropped_frame[start_y:end_y, start_x:end_x]
2025-08-07 14:38:08 +08:00
# 保存帧到全局缓存
if self.device_manager:
2025-08-11 09:23:04 +08:00
self.device_manager._save_frame_to_cache(cropped_frame, 'camera')
2025-08-07 14:38:08 +08:00
# 每1000帧记录一次缓存保存状态
if frame_count % 1000 == 0:
logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存")
else:
logger.warning("VideoStreamManager未关联DeviceManager无法保存帧到缓存")
2025-08-11 09:23:04 +08:00
# JPEG编码和socketio发送
2025-07-31 17:23:05 +08:00
try:
2025-08-11 09:23:04 +08:00
# 使用较低的JPEG质量以节省内存
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
result, buffer = cv2.imencode('.jpg', cropped_frame, encode_param)
if result:
# 转换为base64字符串
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
if self.socketio:
self.socketio.emit('video_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
# 立即释放base64字符串
del jpg_as_text
2025-07-31 17:23:05 +08:00
2025-08-11 09:23:04 +08:00
except Exception as e:
logger.error(f'帧编码失败: {e}')
# 立即释放帧内存
2025-07-31 17:23:05 +08:00
del frame
2025-08-11 09:23:04 +08:00
del cropped_frame
2025-07-31 17:23:05 +08:00
if frame_count % 60 == 0: # 每60帧记录一次
2025-07-31 17:23:05 +08:00
# 定期强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f'帧队列处理失败: {e}')
except Exception as e:
# logger.error(f'监控视频推流异常: {e}')
2025-07-31 17:23:05 +08:00
if self.socketio:
2025-08-02 16:52:17 +08:00
self.socketio.emit('video_status', {'status': 'error', 'message': f'推流异常: {str(e)}'})
2025-07-31 17:23:05 +08:00
finally:
if 'cap' in locals():
cap.release()
2025-08-02 16:52:17 +08:00
self.video_running = False
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
def start_video_stream(self):
"""启动视频监控推流"""
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
if self.video_thread and self.video_thread.is_alive():
logger.warning('视频监控线程已在运行')
return {'status': 'already_running', 'message': '视频监控已在运行'}
2025-08-11 09:23:04 +08:00
# logger.error(f'视频监控相机未配置2222222222{self.device_index}')
# if not self.device_index:
# logger.error('视频监控相机未配置')
# return {'status': 'error', 'message': '视频监控相机未配置'}
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
logger.info(f'视频启动监控线程,设备号: {self.device_index}')
self.video_thread = threading.Thread(target=self.generate_video_frames)
self.video_thread.daemon = True
self.video_thread.start()
self.video_running = True
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
logger.info('视频监控线程已启动')
return {'status': 'started', 'message': '视频监控线程已启动'}
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'视频监控线程启动失败: {e}')
return {'status': 'error', 'message': f'视频监控线程启动失败: {str(e)}'}
2025-07-31 17:23:05 +08:00
2025-08-02 16:52:17 +08:00
def stop_video_stream(self):
"""停止视频监控推流"""
2025-07-31 17:23:05 +08:00
try:
2025-08-02 16:52:17 +08:00
self.video_running = False
logger.info('视频监控推流已停止')
return {'status': 'stopped', 'message': '视频监控推流已停止'}
2025-07-31 17:23:05 +08:00
except Exception as e:
2025-08-02 16:52:17 +08:00
logger.error(f'停止视频监控推流失败: {e}')
2025-07-31 17:23:05 +08:00
return {'status': 'error', 'message': f'停止失败: {str(e)}'}
def is_streaming(self):
"""检查是否正在推流"""
2025-08-02 16:52:17 +08:00
return self.video_running
2025-07-31 17:23:05 +08:00
def get_stream_status(self):
"""获取推流状态"""
return {
2025-08-02 16:52:17 +08:00
'running': self.video_running,
'device_index': self.device_index,
'thread_alive': self.video_thread.is_alive() if self.video_thread else False
2025-07-31 17:23:05 +08:00
}
2025-07-31 17:23:05 +08:00
def _collect_head_pose_data(self) -> Dict[str, Any]:
"""采集头部姿态数据从IMU设备获取"""
2025-07-31 17:23:05 +08:00
try:
# 模拟IMU头部姿态数据
head_pose = {
'roll': np.random.uniform(-30, 30),
'pitch': np.random.uniform(-30, 30),
'yaw': np.random.uniform(-180, 180),
'acceleration': {
'x': np.random.uniform(-2, 2),
'y': np.random.uniform(-2, 2),
'z': np.random.uniform(8, 12)
},
'gyroscope': {
'x': np.random.uniform(-5, 5),
'y': np.random.uniform(-5, 5),
'z': np.random.uniform(-5, 5)
},
'timestamp': datetime.now().isoformat()
}
return head_pose
except Exception as e:
logger.error(f'头部姿态数据采集失败: {e}')
return None
def _collect_body_pose_data(self) -> Dict[str, Any]:
"""采集身体姿态数据从FemtoBolt深度相机获取"""
try:
# 模拟身体姿态关键点数据
body_pose = {
'keypoints': {
'head': {'x': 320, 'y': 100, 'confidence': 0.95},
'neck': {'x': 320, 'y': 150, 'confidence': 0.92},
'left_shoulder': {'x': 280, 'y': 180, 'confidence': 0.88},
'right_shoulder': {'x': 360, 'y': 180, 'confidence': 0.90},
'left_elbow': {'x': 250, 'y': 220, 'confidence': 0.85},
'right_elbow': {'x': 390, 'y': 220, 'confidence': 0.87},
'left_wrist': {'x': 220, 'y': 260, 'confidence': 0.82},
'right_wrist': {'x': 420, 'y': 260, 'confidence': 0.84},
'spine': {'x': 320, 'y': 250, 'confidence': 0.93},
'left_hip': {'x': 300, 'y': 350, 'confidence': 0.89},
'right_hip': {'x': 340, 'y': 350, 'confidence': 0.91},
'left_knee': {'x': 290, 'y': 450, 'confidence': 0.86},
'right_knee': {'x': 350, 'y': 450, 'confidence': 0.88},
'left_ankle': {'x': 285, 'y': 550, 'confidence': 0.83},
'right_ankle': {'x': 355, 'y': 550, 'confidence': 0.85}
},
'balance_score': np.random.uniform(0.6, 1.0),
'center_of_mass': {'x': 320, 'y': 350},
'timestamp': datetime.now().isoformat()
}
return body_pose
except Exception as e:
logger.error(f'身体姿态数据采集失败: {e}')
return None
2025-08-06 14:51:42 +08:00
def _capture_body_image(self, data_dir: Path, device_manager) -> Optional[str]:
"""采集身体视频截图从FemtoBolt深度相机获取"""
try:
2025-08-06 14:51:42 +08:00
image = None
# 检查是否有device_manager实例且FemtoBolt深度相机可用
if (device_manager is not None and
FEMTOBOLT_AVAILABLE and
hasattr(device_manager, 'femtobolt_camera') and
device_manager.femtobolt_camera is not None):
# 从FemtoBolt深度相机获取真实图像
logger.info('正在从FemtoBolt深度相机获取身体图像...')
capture = device_manager.femtobolt_camera.update()
if capture is not None:
# 获取深度图像
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 读取config.ini中的深度范围配置
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
try:
depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
except Exception:
depth_range_min = None
depth_range_max = None
# 优化深度图彩色映射范围外用黑色区间内用Jet模型从蓝色到黄色到红色渐变
if depth_range_min is not None and depth_range_max is not None:
# 归一化深度值到0-255范围
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# 应用OpenCV的COLORMAP_JET进行伪彩色映射
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 范围外用黑色
mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
depth_colored[mask_outside] = [0, 0, 0] # BGR黑色
else:
# 如果没有配置,使用默认伪彩色映射
depth_colored = cv2.convertScaleAbs(depth_image, alpha=0.03)
depth_colored = cv2.applyColorMap(depth_colored, cv2.COLORMAP_JET)
# 转换颜色格式(如果需要)
if len(depth_colored.shape) == 3 and depth_colored.shape[2] == 4:
depth_colored = cv2.cvtColor(depth_colored, cv2.COLOR_BGRA2BGR)
elif len(depth_colored.shape) == 3 and depth_colored.shape[2] == 3:
pass
# 预处理裁剪成宽460高819保持高度不裁剪宽度从中间裁剪
height, width = depth_colored.shape[:2]
target_width = 460
target_height = 819
# 计算宽度裁剪起点
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
cropped_image = depth_colored[:, left:right]
else:
cropped_image = depth_colored
# 如果高度不足target_height进行上下填充黑边
cropped_height = cropped_image.shape[0]
if cropped_height < target_height:
pad_top = (target_height - cropped_height) // 2
pad_bottom = target_height - cropped_height - pad_top
cropped_image = cv2.copyMakeBorder(cropped_image, pad_top, pad_bottom, 0, 0, cv2.BORDER_CONSTANT, value=[0,0,0])
elif cropped_height > target_height:
# 如果高度超过target_height裁剪高度中间部分
top = (cropped_height - target_height) // 2
cropped_image = cropped_image[top:top+target_height, :]
# 最终调整大小保持宽460高819
image = cv2.resize(cropped_image, (target_width, target_height))
logger.info(f'成功获取FemtoBolt深度图像尺寸: {image.shape}')
else:
logger.warning('无法从FemtoBolt获取深度图像使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'FemtoBolt Unavailable', (75, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
else:
logger.warning('FemtoBolt capture为None使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'Capture Failed', (120, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
else:
logger.warning('FemtoBolt深度相机不可用使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'Camera Not Available', (60, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
2025-07-31 17:23:05 +08:00
# 保存图片
image_path = data_dir / 'body_image.jpg'
cv2.imwrite(str(image_path), image)
2025-08-06 14:51:42 +08:00
logger.info(f'身体图像已保存到: {image_path}')
2025-07-31 17:23:05 +08:00
2025-08-06 14:51:42 +08:00
return image_path
except Exception as e:
logger.error(f'身体截图保存失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Dict[str, Any]:
"""采集足部压力数据(从压力传感器获取)"""
try:
# 模拟压力传感器数据
pressure_data = {
'left_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'right_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'balance_ratio': 0,
'timestamp': datetime.now().isoformat()
}
# 计算总压力和平衡比例
left_total = sum(pressure_data['left_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
right_total = sum(pressure_data['right_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
pressure_data['left_foot']['total_pressure'] = left_total
pressure_data['right_foot']['total_pressure'] = right_total
if left_total + right_total > 0:
pressure_data['balance_ratio'] = left_total / (left_total + right_total)
return pressure_data
except Exception as e:
logger.error(f'足部压力数据采集失败: {e}')
return None
2025-08-07 14:38:08 +08:00
def _capture_foot_image(self, data_dir: Path, device_manager=None) -> Optional[str]:
2025-08-06 14:51:42 +08:00
"""采集足部监测视频截图(从全局缓存获取)"""
try:
2025-08-06 14:51:42 +08:00
image = None
2025-08-07 14:38:08 +08:00
# 直接使用self获取缓存帧
logger.info('正在从全局缓存获取最新图像...')
# 从全局缓存获取最新帧
frame, frame_timestamp = device_manager._get_latest_frame_from_cache('camera')
#frame, frame_count = self.frame_queue.get(timeout=1)
if frame is not None:
# 使用缓存中的图像
image = frame.copy() # 复制帧数据避免引用问题
current_time = time.time()
frame_age = current_time - frame_timestamp if frame_timestamp else 0
logger.info(f'成功获取缓存图像,尺寸: {image.shape},帧龄: {frame_age:.2f}')
2025-08-06 14:51:42 +08:00
else:
2025-08-07 14:38:08 +08:00
logger.warning('缓存中无可用图像,使用模拟图像')
2025-08-06 14:51:42 +08:00
image = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (590, 430), (0, 255, 0), 2)
2025-08-07 14:38:08 +08:00
cv2.putText(image, 'No Cached Frame', (120, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
2025-08-06 14:51:42 +08:00
# 保存图片
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), image)
2025-08-06 14:51:42 +08:00
logger.info(f'足部图像已保存到: {image_path}')
2025-08-06 14:51:42 +08:00
return image_path
except Exception as e:
logger.error(f'足部截图保存失败: {e}')
2025-08-06 14:51:42 +08:00
# 即使出错也要保存一个模拟图像
try:
image = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (590, 430), (255, 0, 0), 2)
cv2.putText(image, 'Error Occurred', (180, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), image)
return image_path
except Exception:
return None
def _generate_foot_pressure_image(self, data_dir: Path) -> Optional[str]:
"""生成足底压力数据图(从压力传感器数据生成)"""
try:
# 创建压力分布热力图
fig_width, fig_height = 400, 600
pressure_map = np.zeros((fig_height, fig_width, 3), dtype=np.uint8)
# 模拟左脚压力分布
left_foot_x = fig_width // 4
left_foot_y = fig_height // 2
# 模拟右脚压力分布
right_foot_x = 3 * fig_width // 4
right_foot_y = fig_height // 2
# 绘制压力点(用不同颜色表示压力大小)
for i in range(20):
x = np.random.randint(left_foot_x - 50, left_foot_x + 50)
y = np.random.randint(left_foot_y - 100, left_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
x = np.random.randint(right_foot_x - 50, right_foot_x + 50)
y = np.random.randint(right_foot_y - 100, right_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
# 保存图片
image_path = data_dir / 'foot_data_image.jpg'
cv2.imwrite(str(image_path), pressure_map)
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'足底压力数据图生成失败: {e}')
return None
def _save_screen_image(self, data_dir: Path, screen_image_base64: str) -> Optional[str]:
"""保存屏幕录制截图从前端传入的base64数据"""
try:
# 解码base64数据
if screen_image_base64.startswith('data:image/'):
# 移除data:image/jpeg;base64,前缀
base64_data = screen_image_base64.split(',')[1]
else:
base64_data = screen_image_base64
# 解码并保存图片
image_data = base64.b64decode(base64_data)
image_path = data_dir / 'screen_image.jpg'
2025-07-31 17:23:05 +08:00
with open(image_path, 'wb') as f:
f.write(image_data)
2025-07-31 17:23:05 +08:00
return str(image_path.relative_to(Path.cwd()))
2025-07-31 17:23:05 +08:00
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
return None