BodyBalanceEvaluation/backend/device_manager.py
2025-08-20 10:30:51 +08:00

3629 lines
171 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备管理模块
负责摄像头、IMU传感器和压力传感器的连接和数据采集
以及视频推流功能
"""
import cv2
import numpy as np
import time
import threading
import json
import queue
import base64
import gc
import os
import psutil
import configparser
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from concurrent.futures import ThreadPoolExecutor
import logging
# 添加串口通信支持
import serial
# SMiTSense DLL支持
import ctypes
from ctypes import Structure, c_int, c_float, c_char_p, c_void_p, c_uint32, c_uint16, POINTER, byref
# matplotlib相关导入用于深度图渲染
try:
from matplotlib.colors import LinearSegmentedColormap
import matplotlib.pyplot as plt
MATPLOTLIB_AVAILABLE = True
except ImportError:
MATPLOTLIB_AVAILABLE = False
print("警告: matplotlib库未安装将使用默认深度图渲染")
# 数据库管理
# from backend.app import get_detection_sessions
from database import DatabaseManager
# FemtoBolt深度相机支持
try:
import pykinect_azure as pykinect
# 重新启用FemtoBolt功能使用正确的Orbbec SDK K4A Wrapper路径
FEMTOBOLT_AVAILABLE = True
print("信息: pykinect_azure库已安装FemtoBolt深度相机功能已启用")
print("使用Orbbec SDK K4A Wrapper以确保与FemtoBolt设备的兼容性")
except ImportError:
FEMTOBOLT_AVAILABLE = False
print("警告: pykinect_azure库未安装FemtoBolt深度相机功能将不可用")
print("请使用以下命令安装: pip install pykinect_azure")
logger = logging.getLogger(__name__)
class DeviceManager:
"""设备管理器"""
def __init__(self, db_manager: DatabaseManager = None):
self.camera = None
self.femtobolt_camera = None
self.imu_device = None
self.pressure_device = None
self.device_status = {
'camera': False,
'femtobolt': False,
'imu': False,
'pressure': False
}
self.calibration_data = {}
self.data_lock = threading.Lock()
self.camera_lock = threading.Lock() # 摄像头访问锁
self.latest_data = {}
# 数据库连接
self.db_manager = db_manager
# 推流状态和线程
self.camera_streaming = False
self.femtobolt_streaming = False
self.imu_streaming = False
self.pressure_streaming = False
self.camera_streaming_thread = None
self.femtobolt_streaming_thread = None
self.imu_thread = None
self.pressure_thread = None
self.streaming_stop_event = threading.Event()
# 全局帧缓存机制
self.frame_cache = {}
self.frame_cache_lock = threading.RLock() # 可重入锁
self.max_cache_size = 10 # 最大缓存帧数
self.cache_timeout = 5.0 # 缓存超时时间(秒)
# 同步录制状态
self.sync_recording = False
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
# 三路视频录制器
self.feet_video_writer = None
self.body_video_writer = None
self.screen_video_writer = None
# 录制线程和控制
self.feet_recording_thread = None
self.body_recording_thread = None
self.screen_recording_thread = None
self.recording_stop_event = threading.Event()
# 屏幕录制队列
self.screen_frame_queue = queue.Queue(maxsize=100)
# 兼容旧版录制状态
self.recording = False
self.video_writer = None
# FemtoBolt相机相关
self.femtobolt_config = None
self.femtobolt_recording = False
self.femtobolt_color_writer = None
self.femtobolt_depth_writer = None
# WebSocket连接用于推流
self.socketio = None
# 延迟设备初始化,避免启动时阻塞
# self._init_devices() # 注释掉自动初始化,改为按需初始化
def _init_devices(self):
"""初始化所有设备"""
# 分别初始化各个设备,单个设备失败不影响其他设备
# try:
# self._init_camera()
# except Exception as e:
# logger.error(f'摄像头初始化失败: {e}')
try:
if FEMTOBOLT_AVAILABLE:
self._init_femtobolt_camera()
except Exception as e:
logger.error(f'FemtoBolt深度相机初始化失败: {e}')
try:
logger.error('IMU传感器初始化!!!!!!!!!!!!!!!!')
self._init_imu()
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
try:
self._init_pressure_sensor()
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
logger.info('设备初始化完成')
def _init_camera(self):
"""初始化足部监视摄像头"""
try:
# 从数据库读取摄像头设备索引配置
device_index = 0 # 默认值
if self.db_manager:
try:
monitor_config = self.db_manager.get_system_setting('monitor_device_index')
if monitor_config:
device_index = int(monitor_config)
logger.info(f'从数据库读取摄像头设备索引: {device_index}')
else:
logger.info('数据库中未找到monitor_device_index配置使用默认值0')
except Exception as e:
logger.warning(f'读取摄像头设备索引配置失败使用默认值0: {e}')
else:
logger.warning('数据库管理器未初始化使用默认摄像头索引0')
self.device_status['camera'] = True
except Exception as e:
logger.error(f'摄像头初始化异常: {e}')
self.camera = None
def _init_femtobolt_camera(self):
"""初始化FemtoBolt深度相机"""
if not FEMTOBOLT_AVAILABLE:
logger.warning('FemtoBolt深度相机库未安装跳过初始化')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
return
try:
# 初始化pykinect_azure库优先使用指定SDK路径
# 首先尝试手动指定路径(优先级最高)
sdk_paths = self._get_femtobolt_sdk_paths()
for sdk_path in sdk_paths:
if os.path.exists(sdk_path):
try:
pykinect.initialize_libraries(track_body=False, module_k4a_path=sdk_path)
logger.info(f'✓ 成功使用FemtoBolt SDK: {sdk_path}')
break
except Exception as e:
logger.warning(f'✗ FemtoBolt SDK路径失败: {sdk_path} - {e}')
continue
# 配置FemtoBolt设备参数
self.femtobolt_config = pykinect.default_configuration
# logger.info('FemtoBolt配置参数。。。。。。。。。。。。。。。。。')
# logger.warning(pykinect.default_configuration)
# 从config.ini读取配置
import configparser
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), 'config.ini'))
# color_res_str = config.get('DEFAULT', 'femtobolt_color_resolution', fallback='1080P')
# depth_range_min = config.getint('DEFAULT', 'femtobolt_depth_range_min', fallback=500)
# depth_range_max = config.getint('DEFAULT', 'femtobolt_depth_range_max', fallback=4500)
# # 解析分辨率配置,分为宽度和高度
# resolution_map = {
# '1024x1024': (1024, 1024),
# '1920x1080': (1920, 1080),
# '1280x720': (1280, 720),
# '720x720': (720, 720)
# }
# width, height = resolution_map.get(color_res_str, (1920, 1080))
# 假设SDK支持设置宽高参数示例代码如下需根据实际SDK调整
# if hasattr(self.femtobolt_config, 'color_resolution_width') and hasattr(self.femtobolt_config, 'color_resolution_height'):
# self.femtobolt_config.color_resolution_width = width
# self.femtobolt_config.color_resolution_height = height
# else:
# logger.info('FemtoBolt存在分辨率参数。。。。。。。。。。。。。。。。。')
# # 兼容原有枚举设置
# if color_res_str == '720P':
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_720P
# elif color_res_str == '1080P':
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
# else:
# self.femtobolt_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_1080P
self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
# self.femtobolt_config.depth_mode = pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.femtobolt_config.camera_fps = pykinect.K4A_FRAMES_PER_SECOND_15
self.femtobolt_config.synchronized_images_only = False
self.femtobolt_config.color_resolution = 0
# 视效范围参数示例假设SDK支持depth_range_min和depth_range_max
# 直接尝试启动设备pykinect_azure库没有设备数量检测API
# logger.info('准备启动FemtoBolt设备...')
# 启动FemtoBolt设备
# logger.info(f'尝试启动FemtoBolt设备...,参数详情是{self.femtobolt_config}')
self.femtobolt_camera = pykinect.start_device(config=self.femtobolt_config)
if self.femtobolt_camera:
self.device_status['femtobolt'] = True
logger.info('✓ FemtoBolt深度相机初始化成功!')
else:
raise Exception('设备启动返回None')
except Exception as e:
logger.warning(f'FemtoBolt深度相机初始化失败: {e}')
logger.warning('FemtoBolt深度相机功能将不可用但不影响其他功能')
logger.warning('可能的解决方案:')
logger.warning('1. 检查FemtoBolt设备是否正确连接并被识别')
logger.warning('2. 安装Orbbec官方的K4A兼容驱动程序')
logger.warning('3. 确保没有其他应用程序占用设备')
logger.warning('4. 尝试重新插拔设备或重启计算机')
logger.warning('5. 考虑使用Orbbec原生SDK而非Azure Kinect SDK')
self.femtobolt_camera = None
self.device_status['femtobolt'] = False
# 不再抛出异常,让系统继续运行其他功能
def _get_femtobolt_sdk_paths(self) -> List[str]:
"""获取FemtoBolt SDK可能的路径列表"""
import platform
sdk_paths = []
if platform.system() == "Windows":
# 优先使用Orbbec SDK K4A Wrapper与azure_kinect_image_example.py一致
base_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_dir, "dll","femtobolt","bin", "k4a.dll")
sdk_paths.append(dll_path)
return sdk_paths
def _init_imu(self):
"""初始化IMU传感器"""
logger.info('开始初始化IMU传感器...')
try:
# 从config.ini读取串口配置
config = configparser.ConfigParser()
# 优先读取根目录config.ini否则读取backend/config.ini
root_config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
app_root_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
logger.debug(f'尝试读取配置文件: {root_config_path}, {app_root_config_path}')
read_files = config.read([app_root_config_path, root_config_path], encoding='utf-8')
logger.debug(f'成功读取的配置文件: {read_files}')
if not read_files:
logger.warning('未能读取到config.ini将使用默认串口配置COM7@9600')
imu_port = config.get('DEVICES', 'imu_port', fallback='COM7')
imu_baudrate = config.getint('DEVICES', 'imu_baudrate', fallback=9600)
logger.info(f'从配置文件读取IMU串口配置: {imu_port}@{imu_baudrate}')
# 初始化真实IMU设备
logger.debug('创建RealIMUDevice实例...')
self.imu_device = RealIMUDevice(port=imu_port, baudrate=imu_baudrate)
# 测试读取数据
logger.debug('测试IMU设备数据读取...')
test_data = self.imu_device.read_data()
logger.debug(f'IMU设备测试数据: {test_data}')
self.device_status['imu'] = True
logger.info(f'IMU传感器初始化成功真实设备: {imu_port}@{imu_baudrate}')
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}', exc_info=True)
self.imu_device = None
self.device_status['imu'] = False
def _init_pressure_sensor(self):
"""初始化压力传感器"""
try:
# 优先尝试连接真实设备
try:
self.pressure_device = RealPressureDevice()
self.device_status['pressure'] = True
logger.info('压力传感器初始化成功(真实设备)')
return
except Exception as real_e:
logger.warning(f'真实压力传感器初始化失败,使用模拟设备。原因: {real_e}')
# 回退到模拟设备
self.pressure_device = MockPressureDevice()
self.device_status['pressure'] = True
logger.info('压力传感器初始化成功(模拟)')
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
self.pressure_device = None
def get_device_status(self) -> Dict[str, bool]:
"""获取设备状态"""
return self.device_status.copy()
def get_connected_devices(self) -> List[str]:
"""获取已连接的设备列表"""
return [device for device, status in self.device_status.items() if status]
def refresh_devices(self):
"""刷新设备连接"""
logger.info('刷新设备连接...')
# 使用锁保护摄像头重新初始化
with self.camera_lock:
if self.camera:
self.camera.release()
self.camera = None
self._init_devices()
def calibrate_devices(self) -> Dict[str, Any]:
"""校准设备"""
calibration_result = {}
try:
# 摄像头校准
# if self.device_status['camera']:
# camera_calibration = self._calibrate_camera()
# calibration_result['camera'] = camera_calibration
# IMU校准
if self.device_status['imu']:
imu_calibration = self._calibrate_imu()
calibration_result['imu'] = imu_calibration
# 压力传感器校准
if self.device_status['pressure']:
pressure_calibration = self._calibrate_pressure()
calibration_result['pressure'] = pressure_calibration
self.calibration_data = calibration_result
logger.info('设备校准完成')
except Exception as e:
logger.error(f'设备校准失败: {e}')
raise
return calibration_result
def _calibrate_camera(self) -> Dict[str, Any]:
"""校准摄像头"""
if not self.camera or not self.camera.isOpened():
return {'status': 'failed', 'error': '摄像头未连接'}
try:
# 获取几帧图像进行校准
frames = []
for _ in range(10):
ret, frame = self.camera.read()
if ret:
frames.append(frame)
time.sleep(0.1)
if not frames:
return {'status': 'failed', 'error': '无法获取图像'}
# 计算平均亮度和对比度
avg_brightness = np.mean([np.mean(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) for frame in frames])
calibration = {
'status': 'success',
'brightness': float(avg_brightness),
'resolution': (int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))),
'fps': float(self.camera.get(cv2.CAP_PROP_FPS)),
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]:
"""标准校准:采样较多帧,计算稳定零点偏移"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
samples = []
for _ in range(100):
data = self.imu_device.read_data(apply_calibration=False)
if data and 'head_pose' in data:
samples.append(data['head_pose'])
time.sleep(0.01)
if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行校准'}
head_pose_offset = {
'rotation': float(np.mean([s['rotation'] for s in samples])),
'tilt': float(np.mean([s['tilt'] for s in samples])),
'pitch': float(np.mean([s['pitch'] for s in samples]))
}
calibration = {
'status': 'success',
'head_pose_offset': head_pose_offset,
'timestamp': datetime.now().isoformat()
}
if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration)
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _quick_calibrate_imu(self) -> Dict[str, Any]:
"""快速校准:采样少量帧,以当前姿态为零点(用于每次推流启动)"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
samples = []
for _ in range(10): # 少量采样,加快启动
data = self.imu_device.read_data(apply_calibration=False)
if data and 'head_pose' in data:
samples.append(data['head_pose'])
time.sleep(0.01)
if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行快速校准'}
head_pose_offset = {
'rotation': float(np.median([s['rotation'] for s in samples])),
'tilt': float(np.median([s['tilt'] for s in samples])),
'pitch': float(np.median([s['pitch'] for s in samples]))
}
calibration = {
'status': 'success',
'head_pose_offset': head_pose_offset,
'timestamp': datetime.now().isoformat()
}
if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration)
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_pressure(self) -> Dict[str, Any]:
"""校准压力传感器"""
if not self.pressure_device:
return {'status': 'failed', 'error': '压力传感器未连接'}
try:
# 收集零压力数据
samples = []
for _ in range(50):
data = self.pressure_device.read_data()
samples.append(data)
time.sleep(0.02)
# 计算零点偏移
zero_offset = {
'left_foot': np.mean([s['left_foot'] for s in samples]),
'right_foot': np.mean([s['right_foot'] for s in samples])
}
calibration = {
'status': 'success',
'zero_offset': zero_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def collect_data(self, session_id: str, patient_id: str, screen_image_base64: str = None) -> Dict[str, Any]:
# 实例化VideoStreamManagerVideoStreamManager类在同一文件中定义
video_stream_manager = VideoStreamManager(device_manager=self)
"""采集所有设备数据并保存到指定目录结构
Args:
session_id: 检测会话ID
patient_id: 患者ID
screen_image_base64: 前端界面截图的base64数据
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
# 创建数据存储目录
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import os
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': None,
'body_pose': None,
'body_image': None,
'foot_data': None,
'foot_image': None,
'foot_data_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# # 1. 采集头部姿态数据从IMU设备获取
# if self.device_status['imu']:
# head_pose_data = self._collect_head_pose_data()
# if head_pose_data:
# data['head_pose'] = json.dumps(head_pose_data)
# logger.debug(f'头部姿态数据采集成功: {session_id}')
# # 2. 采集身体姿态数据从FemtoBolt深度相机获取
# if self.device_status['femtobolt']:
# body_pose_data = self._collect_body_pose_data()
# if body_pose_data:
# data['body_pose'] = json.dumps(body_pose_data)
# logger.debug(f'身体姿态数据采集成功: {session_id}')
# 3. 采集身体视频截图从FemtoBolt深度相机获取
if self.device_status['femtobolt']:
try:
body_image_path = video_stream_manager._capture_body_image(data_dir, self)
if body_image_path:
data['body_image'] = str(body_image_path)
logger.debug(f'身体截图保存成功: {body_image_path}')
except Exception as e:
logger.error(f'调用_video_stream_manager._capture_body_image异常: {e}')
# # 4. 采集足部压力数据(从压力传感器获取)
# if self.device_status['pressure']:
# foot_data = self._collect_foot_pressure_data()
# if foot_data:
# data['foot_data'] = json.dumps(foot_data)
# logger.debug(f'足部压力数据采集成功: {session_id}')
# 5. 采集足部监测视频截图(从摄像头获取)
if self.device_status['camera']:
foot_image_path = video_stream_manager._capture_foot_image(data_dir,self)
if foot_image_path:
data['foot_image'] = str(foot_image_path)
logger.debug(f'足部截图保存成功: {foot_image_path}')
# # 6. 生成足底压力数据图(从压力传感器数据生成)
# if self.device_status['pressure']:
# foot_data_image_path = self._generate_foot_pressure_image(data_dir)
# if foot_data_image_path:
# data['foot_data_image'] = str(foot_data_image_path)
# logger.debug(f'足底压力数据图生成成功: {foot_data_image_path}')
# 7. 保存屏幕录制截图从前端传入的base64数据
if screen_image_base64:
try:
# logger.debug(f'屏幕截图保存.................{screen_image_base64}')
# 保存屏幕截图的base64数据为图片文件
screen_image_path = None
if screen_image_base64:
try:
if screen_image_base64.startswith('data:image/'):
base64_data = screen_image_base64.split(',')[1]
else:
base64_data = screen_image_base64
image_data = base64.b64decode(base64_data)
image_path = data_dir / 'screen_image.png'
with open(image_path, 'wb') as f:
f.write(image_data)
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
screen_image_path = str(abs_image_path.relative_to(abs_cwd))
logger.debug(f'屏幕截图保存成功: {screen_image_path}')
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
import traceback
logger.error(traceback.format_exc())
if screen_image_path:
data['screen_image'] = str(screen_image_path)
logger.debug(f'屏幕截图保存成功: {screen_image_path}')
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
import traceback
logger.error(traceback.format_exc())
# 更新最新数据
with self.data_lock:
self.latest_data = data.copy()
logger.debug(f'数据采集完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
logger.error(f'数据采集失败: {e}')
return data
def start_femtobolt_stream(self):
"""开始FemtoBolt深度相机推流"""
if not FEMTOBOLT_AVAILABLE or self.femtobolt_camera is None:
logger.error('FemtoBolt深度相机未初始化')
return False
try:
# 检查是否已经在推流
if self.femtobolt_streaming:
logger.warning('FemtoBolt深度相机推流已在运行')
return True
# 重置停止事件
self.streaming_stop_event.clear()
# 设置推流标志
self.femtobolt_streaming = True
# 启动推流线程
self.femtobolt_streaming_thread = threading.Thread(
target=self._femtobolt_streaming_thread,
daemon=True,
name='FemtoBoltStreamingThread'
)
self.femtobolt_streaming_thread.start()
# logger.info('FemtoBolt深度相机推流已开始')
return True
except Exception as e:
logger.error(f'FemtoBolt深度相机推流启动失败: {e}')
self.femtobolt_streaming = False
return False
def stop_femtobolt_stream(self):
"""停止FemtoBolt深度相机推流"""
self.femtobolt_streaming = False
logger.debug('FemtoBolt深度相机推流已停止')
def set_socketio(self, socketio):
"""设置WebSocket连接"""
self.socketio = socketio
def start_imu_streaming(self):
"""启动IMU头部姿态数据推流"""
try:
if self.imu_streaming:
logger.warning('IMU数据推流已在运行')
return True
if not self.imu_device:
logger.error('IMU设备未初始化')
return False
# 在启动推流前进行快速零点校准(自动以当前姿态为基准)
logger.info('正在进行IMU零点校准...')
calibration_result = self._quick_calibrate_imu()
if calibration_result.get('status') == 'success':
logger.info(f'IMU零点校准完成: {calibration_result["head_pose_offset"]}')
else:
logger.warning(f'IMU零点校准失败将使用默认零偏移: {calibration_result.get("error", "未知错误")}')
self.imu_streaming = True
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
self.imu_thread.start()
logger.info('IMU头部姿态数据推流已启动')
return True
except Exception as e:
logger.error(f'启动IMU数据推流失败: {e}')
self.imu_streaming = False
return False
def stop_imu_streaming(self):
"""停止IMU头部姿态数据推流"""
try:
if not self.imu_streaming:
logger.warning('IMU数据推流未运行')
return True
self.imu_streaming = False
if self.imu_thread and self.imu_thread.is_alive():
self.imu_thread.join(timeout=2)
logger.info('IMU头部姿态数据推流已停止')
return True
except Exception as e:
logger.error(f'停止IMU数据推流失败: {e}')
return False
def start_pressure_streaming(self):
"""启动压力传感器足部压力数据推流"""
try:
if self.pressure_streaming:
logger.warning('压力传感器数据推流已在运行')
return True
# 确保设备已初始化(懒加载+自动重连)
if not self.pressure_device:
try:
self._init_pressure_sensor()
except Exception as init_e:
logger.error(f'压力传感器设备初始化失败: {init_e}')
return False
else:
# 如果是真实设备且未连接,尝试重连
try:
if hasattr(self.pressure_device, 'is_connected') and not getattr(self.pressure_device, 'is_connected', True):
logger.info('检测到压力设备未连接,尝试重新初始化...')
self._init_pressure_sensor()
except Exception as reinit_e:
logger.warning(f'压力设备重连失败: {reinit_e}')
# 再次确认
if not self.pressure_device:
logger.error('压力传感器设备未初始化')
return False
self.pressure_streaming = True
self.pressure_thread = threading.Thread(target=self._pressure_streaming_thread, daemon=True)
self.pressure_thread.start()
logger.info('压力传感器足部压力数据推流已启动')
return True
except Exception as e:
logger.error(f'启动压力传感器数据推流失败: {e}')
self.pressure_streaming = False
return False
def stop_pressure_streaming(self):
"""停止压力传感器足部压力数据推流"""
try:
if not self.pressure_streaming:
logger.warning('压力传感器数据推流未运行')
return True
self.pressure_streaming = False
if self.pressure_thread and self.pressure_thread.is_alive():
self.pressure_thread.join(timeout=2)
# 关闭压力设备连接
if self.pressure_device and hasattr(self.pressure_device, 'close'):
try:
self.pressure_device.close()
except Exception as close_e:
logger.warning(f'关闭压力设备连接失败: {close_e}')
logger.info('压力传感器足部压力数据推流已停止')
return True
except Exception as e:
logger.error(f'停止压力传感器数据推流失败: {e}')
return False
# def _femtobolt_streaming_thread(self):
# import matplotlib
# matplotlib.use("Agg") # 无GUI后端
# import matplotlib.pyplot as plt
# from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
# from matplotlib.colors import LinearSegmentedColormap
# frame_count = 0
# try:
# # 读取一次配置避免每帧IO
# config = configparser.ConfigParser()
# config.read('config.ini', encoding='utf-8')
# try:
# depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
# depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
# except Exception:
# depth_range_min = None
# depth_range_max = None
# # 如果可以用matplotlib提前初始化绘图对象
# if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] * 4
# mcmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
# # 创建独立figure和axes
# fig, ax = plt.subplots(figsize=(7, 7))
# canvas = FigureCanvas(fig)
# # 灰色背景(假设分辨率不会超过)
# max_h, max_w = 1080, 1920
# background = np.ones((max_h, max_w)) * 0.5
# ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
# ax.grid(True, which='both', axis='both',
# color='white', linestyle='-', linewidth=1, zorder=0)
# ax.set_axis_off()
# plt.tight_layout(pad=0)
# contour = None # 用于保存等高线对象
# while self.femtobolt_streaming and not self.streaming_stop_event.is_set():
# if self.femtobolt_camera and self.socketio:
# try:
# capture = self.femtobolt_camera.update()
# if capture is not None:
# ret, depth_image = capture.get_depth_image()
# if ret and depth_image is not None:
# if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# # 数据过滤
# depth_image = depth_image.copy()
# depth_image[(depth_image > depth_range_max) |
# (depth_image < depth_range_min)] = 0
# depth_masked = np.ma.masked_equal(depth_image, 0)
# # 删除旧等高线
# if contour:
# for coll in contour.collections:
# coll.remove()
# # 绘制新等高线
# contour = ax.contourf(
# depth_masked,
# levels=200,
# cmap=mcmap,
# vmin=depth_range_min,
# vmax=depth_range_max,
# origin='upper',
# zorder=2
# )
# # 渲染到numpy
# canvas.draw()
# img = np.frombuffer(canvas.tostring_rgb(), dtype=np.uint8)
# img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
# depth_colored = img
# else:
# # OpenCV伪彩模式
# depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
# depth_normalized = ((depth_normalized - depth_range_min) /
# (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
# depth_colored[mask_outside] = [0, 0, 0]
# # 裁剪
# height, width = depth_colored.shape[:2]
# target_width = height // 2
# if width > target_width:
# left = (width - target_width) // 2
# right = left + target_width
# depth_colored = depth_colored[:, left:right]
# # 缓存帧
# self._save_frame_to_cache(depth_colored.copy(), 'femtobolt')
# # 发送
# success, buffer = cv2.imencode('.jpg', depth_colored, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
# if success and self.socketio:
# jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# self.socketio.emit('depth_camera_frame', {
# 'image': jpg_as_text,
# 'frame_id': frame_count,
# 'timestamp': time.time()
# })
# frame_count += 1
# else:
# time.sleep(0.01)
# except Exception as e:
# logger.debug(f'FemtoBolt帧推送失败: {e}')
# time.sleep(0.1)
# time.sleep(1 / 30) # 30 FPS
# except Exception as e:
# logger.debug(f'FemtoBolt推流线程异常: {e}')
# finally:
# self.femtobolt_streaming = False
# def _femtobolt_streaming_thread(self):
# """FemtoBolt深度相机推流线程优化版本"""
# frame_count = 0
# import matplotlib
# matplotlib.use("Agg") # 使用无GUI的Agg后端加速渲染
# import matplotlib.pyplot as plt
# from matplotlib.colors import LinearSegmentedColormap
# from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
# try:
# # 读取深度范围配置(只读一次)
# config = configparser.ConfigParser()
# config.read('config.ini')
# try:
# depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
# depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
# except Exception:
# depth_range_min = None
# depth_range_max = None
# # 如果启用matplotlib模式提前准备绘图对象
# if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] * 4
# mcmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
# fig, ax = plt.subplots(figsize=(7, 7))
# canvas = FigureCanvas(fig)
# background = np.ones((720, 1280)) * 0.5 # 假设最大分辨率,后面裁剪
# bg_img = ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
# ax.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=1, zorder=0)
# contour = None # 等高线对象占位
# ax.set_axis_off()
# plt.tight_layout(pad=0)
# while self.femtobolt_streaming and not self.streaming_stop_event.is_set():
# if self.femtobolt_camera and self.socketio:
# try:
# capture = self.femtobolt_camera.update()
# if capture is not None:
# ret, depth_image = capture.get_depth_image()
# if ret and depth_image is not None:
# if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# # 过滤范围外值
# depth_image = depth_image.copy()
# depth_image[(depth_image > depth_range_max) | (depth_image < depth_range_min)] = 0
# depth_masked = np.ma.masked_equal(depth_image, 0)
# # 清理旧的等高线
# if contour:
# for coll in contour.collections:
# coll.remove()
# # 绘制新的等高线
# contour = ax.contourf(
# depth_masked, levels=200, cmap=mcmap,
# vmin=depth_range_min, vmax=depth_range_max,
# origin='upper', zorder=2
# )
# # 渲染到 numpy
# canvas.draw()
# img = np.frombuffer(canvas.tostring_rgb(), dtype=np.uint8)
# img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
# depth_colored = img
# else:
# # OpenCV 伪彩
# depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
# depth_normalized = ((depth_normalized - depth_range_min) /
# (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
# depth_colored[mask_outside] = [0, 0, 0]
# # 裁剪
# height, width = depth_colored.shape[:2]
# target_width = height // 2
# if width > target_width:
# left = (width - target_width) // 2
# right = left + target_width
# depth_colored = depth_colored[:, left:right]
# # 保存到缓存
# self._save_frame_to_cache(depth_colored.copy(), 'femtobolt')
# # 编码并推送
# success, buffer = cv2.imencode('.jpg', depth_colored, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
# if success and self.socketio:
# jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# self.socketio.emit('depth_camera_frame', {
# 'image': jpg_as_text,
# 'frame_id': frame_count,
# 'timestamp': time.time()
# })
# frame_count += 1
# else:
# time.sleep(0.01)
# except Exception as e:
# logger.debug(f'FemtoBolt帧推送失败: {e}')
# time.sleep(0.1)
# time.sleep(1 / 30) # 控制帧率
# except Exception as e:
# logger.debug(f'FemtoBolt推流线程异常: {e}')
# finally:
# self.femtobolt_streaming = False
# def _femtobolt_streaming_thread(self):
"""FemtoBolt深度相机推流线程"""
frame_count = 0
try:
while self.femtobolt_streaming and not self.streaming_stop_event.is_set():
if self.femtobolt_camera and self.socketio:
try:
# 获取FemtoBolt帧
capture = self.femtobolt_camera.update()
# 检查capture是否有效并获取彩色深度图像
if capture is not None:
ret, depth_image = capture.get_depth_image()
height2, width2 = depth_image.shape[:2]
# logger.debug(f'FemtoBolt原始帧宽: {width2}')
# logger.debug(f'FemtoBolt原始帧高: {height2}')
if ret and depth_image is not None:
# 读取config.ini中的深度范围配置
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
try:
depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
except Exception:
depth_range_min = None
depth_range_max = None
# 使用matplotlib渲染深度图参考display_x.py
if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
import numpy as np
import cv2
# 假设 depth_image 已经是 np.uint16 格式
depth_image = depth_image.copy()
depth_image[depth_image > depth_range_max] = 0
depth_image[depth_image < depth_range_min] = 0
# 归一化到 0-255
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) /(depth_range_max - depth_range_min) * 255).astype(np.uint8)
# 用 OpenCV 生成彩色映射(用 COLORMAP_JET 或自定义 LUT 代替 LinearSegmentedColormap
# === 对比度增强 ===
alpha = 1.5 # 对比度增益 (>1 增强对比1.5 比较明显)
beta = 0 # 亮度偏移
depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=alpha, beta=beta)
# 可选:伽马校正,增强中间层次感
gamma = 0.8 # <1 提亮暗部, >1 压暗暗部
look_up_table = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8")
depth_normalized = cv2.LUT(depth_normalized, look_up_table)
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 创建灰色背景
rows, cols = depth_colored.shape[:2]
background = np.ones_like(depth_colored, dtype=np.uint8) * 128 # 灰色
# 画网格(和 matplotlib 网格类似)
rows, cols = depth_colored.shape[:2]
grid_color = (255, 255, 255) # 白色
line_thickness = 1
grid_bg = np.zeros_like(depth_colored)
cell_size = 50 # 可以根据原 contourf 分辨率调
for x in range(0, cols, cell_size):
cv2.line(grid_bg, (x, 0), (x, rows), grid_color, line_thickness)
for y in range(0, rows, cell_size):
cv2.line(grid_bg, (0, y), (cols, y), grid_color, line_thickness)
bg_with_grid = background.copy()
mask_grid = (grid_bg.sum(axis=2) > 0)
depth_colored[mask_grid] = grid_bg[mask_grid]
else:
# 如果没有matplotlib则使用原有OpenCV伪彩色映射
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
depth_colored[mask_outside] = [0, 0, 0]
height, width = depth_colored.shape[:2]
# logger.debug(f'FemtoBolt帧宽: {width}')
# logger.debug(f'FemtoBolt帧高: {height}')
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored = depth_colored[:, left:right]
height1, width1 = depth_colored.shape[:2]
# logger.debug(f'FemtoBolt帧裁剪完以后得宽: {width1}')
# logger.debug(f'FemtoBolt帧裁剪完以后得宽: {height1}')
# 保存处理好的身体帧到全局缓存
self._save_frame_to_cache(depth_colored.copy(), 'femtobolt')
success, buffer = cv2.imencode('.jpg', depth_colored, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if success and self.socketio:
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
self.socketio.emit('depth_camera_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
frame_count += 1
else:
# 如果没有获取到有效帧,短暂等待后继续
time.sleep(0.01)
except Exception as e:
logger.debug(f'FemtoBolt帧推送失败: {e}')
# 发生错误时短暂等待,避免快速循环
time.sleep(0.1)
# 控制帧率
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.debug(f'FemtoBolt推流线程异常: {e}')
finally:
self.femtobolt_streaming = False
def _femtobolt_streaming_thread(self):
"""FemtoBolt深度相机推流线程"""
frame_count = 0
try:
while self.femtobolt_streaming and not self.streaming_stop_event.is_set():
if self.femtobolt_camera and self.socketio:
try:
capture = self.femtobolt_camera.update()
if capture is not None:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
try:
depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
except Exception:
depth_range_min = None
depth_range_max = None
if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
import numpy as np
import cv2
depth_image = depth_image.copy()
# === 生成灰色背景 + 白色网格 ===
rows, cols = depth_image.shape[:2]
background = np.ones((rows, cols, 3), dtype=np.uint8) * 128
cell_size = 50
grid_color = (255, 255, 255)
grid_bg = np.zeros_like(background)
for x in range(0, cols, cell_size):
cv2.line(grid_bg, (x, 0), (x, rows), grid_color, 1)
for y in range(0, rows, cell_size):
cv2.line(grid_bg, (0, y), (cols, y), grid_color, 1)
mask_grid = (grid_bg.sum(axis=2) > 0)
background[mask_grid] = grid_bg[mask_grid]
# === 处理深度图满足区间的部分 ===
depth_clipped = depth_image.copy()
depth_clipped[depth_clipped < depth_range_min] = 0
depth_clipped[depth_clipped > depth_range_max] = 0
depth_normalized = np.clip(depth_clipped, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# 对比度和伽马校正
alpha, beta, gamma = 1.5, 0, 0.8
depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=alpha, beta=beta)
lut = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8")
depth_normalized = cv2.LUT(depth_normalized, lut)
# 伪彩色
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 将有效深度覆盖到灰色背景上
mask_valid = (depth_clipped > 0)
for c in range(3):
background[:, :, c][mask_valid] = depth_colored[:, :, c][mask_valid]
depth_colored_final = background
else:
# 没有matplotlib则使用原OpenCV伪彩色
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
depth_colored_final = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
depth_colored_final[mask_outside] = [128, 128, 128] # 灰色
# 裁剪宽度
height, width = depth_colored_final.shape[:2]
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored_final = depth_colored_final[:, left:right]
# 保存到缓存
self._save_frame_to_cache(depth_colored_final.copy(), 'femtobolt')
# 推送SocketIO
success, buffer = cv2.imencode('.jpg', depth_colored_final, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if success and self.socketio:
import base64, time
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
self.socketio.emit('depth_camera_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
frame_count += 1
else:
time.sleep(0.01)
else:
time.sleep(0.01)
except Exception as e:
logger.debug(f'FemtoBolt帧推送失败: {e}')
time.sleep(0.1)
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.debug(f'FemtoBolt推流线程异常: {e}')
finally:
self.femtobolt_streaming = False
def _imu_streaming_thread(self):
"""IMU头部姿态数据推流线程"""
# logger.info('IMU头部姿态数据推流线程已启动')
try:
loop_count = 0
while self.imu_streaming and self.socketio:
try:
loop_count += 1
# 从IMU设备读取数据
imu_data = self.imu_device.read_data()
if imu_data and 'head_pose' in imu_data:
# 直接使用设备提供的头部姿态数据,减少数据包装
head_pose = imu_data['head_pose']
# logger.warning(f'推送数据{head_pose}')
# 优化:直接发送最精简的数据格式,避免重复时间戳
rotation = head_pose.get('rotation')
tilt = head_pose.get('tilt')
pitch = head_pose.get('pitch')
try:
rotation = round(float(rotation), 2) if rotation is not None else rotation
except Exception:
pass
try:
tilt = round(float(tilt), 2) if tilt is not None else tilt
except Exception:
pass
try:
pitch = round(float(pitch), 2) if pitch is not None else pitch
except Exception:
pass
self.socketio.emit('imu_data', {
'rotation': rotation, # 旋转角:左旋(-), 右旋(+)
'tilt': tilt, # 倾斜角:左倾(-), 右倾(+)
'pitch': pitch, # 俯仰角:俯角(-), 仰角(+)
})
# 优化提高数据发送频率到30Hz降低延时
time.sleep(0.033)
except Exception as e:
# 减少异常日志的详细程度
logger.warning(f'IMU数据推流异常: {e}')
time.sleep(0.033)
except Exception as e:
logger.error(f'IMU推流线程异常: {e}', exc_info=True)
finally:
logger.info('IMU头部姿态数据推流线程已结束')
def _pressure_streaming_thread(self):
"""压力传感器足部压力数据推流线程"""
logger.info('压力传感器足部压力数据推流线程已启动')
try:
while self.pressure_streaming and self.socketio:
try:
# 从压力传感器设备读取数据
pressure_data = self.pressure_device.read_data()
if pressure_data and 'foot_pressure' in pressure_data:
foot_pressure = pressure_data['foot_pressure']
# logger.error(f"压力传感器数据{foot_pressure}")
# 获取各区域压力值
left_front = foot_pressure['left_front']
left_rear = foot_pressure['left_rear']
right_front = foot_pressure['right_front']
right_rear = foot_pressure['right_rear']
left_total = foot_pressure['left_total']
right_total = foot_pressure['right_total']
# 计算总压力
total_pressure = left_total + right_total
# 计算平衡比例(左脚压力占总压力的比例)
balance_ratio = left_total / total_pressure if total_pressure > 0 else 0.5
# 计算压力中心偏移
pressure_center_offset = (balance_ratio - 0.5) * 100 # 转换为百分比
# 计算前后足压力分布
left_front_ratio = left_front / left_total if left_total > 0 else 0.5
right_front_ratio = right_front / right_total if right_total > 0 else 0.5
# 构建完整的足部压力数据
complete_pressure_data = {
# 分区压力值
'pressure_zones': {
'left_front': left_front,
'left_rear': left_rear,
'right_front': right_front,
'right_rear': right_rear,
'left_total': left_total,
'right_total': right_total,
'total_pressure': total_pressure
},
# 平衡分析
'balance_analysis': {
'balance_ratio': round(balance_ratio, 3),
'pressure_center_offset': round(pressure_center_offset, 2),
'balance_status': 'balanced' if abs(pressure_center_offset) < 10 else 'unbalanced',
'left_front_ratio': round(left_front_ratio, 3),
'right_front_ratio': round(right_front_ratio, 3)
},
# 压力图片
'pressure_image': pressure_data.get('pressure_image', ''),
'timestamp': pressure_data['timestamp']
}
# 通过WebSocket发送足部压力数据
self.socketio.emit('pressure_data', {
'foot_pressure': complete_pressure_data,
'timestamp': datetime.now().isoformat()
})
# 控制数据发送频率20Hz
time.sleep(0.05)
except Exception as e:
logger.error(f'压力传感器数据推流异常: {e}')
time.sleep(0.1)
except Exception as e:
logger.error(f'压力传感器推流线程异常: {e}')
finally:
logger.info('压力传感器足部压力数据推流线程已结束')
def start_recording(self, session_id: str, patient_id: str) -> Dict[str, Any]:
video_manager=VideoStreamManager()
"""启动同步录制
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 录制启动状态和信息
{
'success': bool,
'session_id': str,
'patient_id': str,
'recording_start_time': str,
'video_paths': {
'feet_video': str,
'body_video': str,
'screen_video': str
},
'message': str
}
"""
result = {
'success': False,
'session_id': session_id,
'patient_id': patient_id,
'recording_start_time': None,
'video_paths': {
'feet_video': None,
'screen_video': None
},
'message': ''
}
try:
# 检查是否已在录制
if self.sync_recording:
result['message'] = f'已在录制中当前会话ID: {self.current_session_id}'
return result
# 设置录制参数
self.current_session_id = session_id
self.current_patient_id = patient_id
self.recording_start_time = datetime.now()
# 创建存储目录
base_path = os.path.join('data', 'patients', patient_id, session_id)
try:
os.makedirs(base_path, exist_ok=True)
logger.info(f'录制目录创建成功: {base_path}')
# 设置目录权限为777所有用户完全权限
try:
import stat
import subprocess
import platform
# 在Windows系统上使用icacls命令设置更详细的权限
if platform.system() == 'Windows':
try:
# 为Users用户组授予完全控制权限
subprocess.run([
'icacls', base_path, '/grant', 'Users:(OI)(CI)F'
], check=True, capture_output=True, text=True)
# 为Everyone用户组授予完全控制权限
subprocess.run([
'icacls', base_path, '/grant', 'Everyone:(OI)(CI)F'
], check=True, capture_output=True, text=True)
logger.info(f"已设置Windows目录权限Users和Everyone完全控制: {base_path}")
except subprocess.CalledProcessError as icacls_error:
logger.warning(f"Windows权限设置失败: {icacls_error}")
else:
logger.info(f"已设置目录权限为777: {base_path}")
except Exception as perm_error:
logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
except Exception as dir_error:
logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}')
result['success'] = False
result['message'] = f'创建录制目录失败: {dir_error}'
return result
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
screen_video_path = os.path.join(base_path, 'screen.webm')
result['video_paths']['feet_video'] = feet_video_path
result['video_paths']['screen_video'] = screen_video_path
# 更新数据库中的视频路径
if self.db_manager:
try:
# 更新会话状态为录制中
if not self.db_manager.update_session_status(session_id, 'recording'):
logger.error(f'更新会话状态为录制中失败 - 会话ID: {session_id}')
# 更新视频文件路径
self.db_manager.update_session_normal_video_path(session_id, feet_video_path)
self.db_manager.update_session_screen_video_path(session_id, screen_video_path)
logger.debug(f'数据库视频路径更新成功 - 会话ID: {session_id}')
except Exception as db_error:
logger.error(f'更新数据库视频路径失败: {db_error}')
# 数据库更新失败不影响录制启动,继续执行
# 视频编码参数
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fps = 30
# 初始化视频写入器
if self.device_status['camera']:
target_width,target_height = video_manager.MAX_FRAME_SIZE
self.feet_video_writer = cv2.VideoWriter(feet_video_path, fourcc, fps, (target_width, target_height))
# 检查视频写入器是否初始化成功
if self.feet_video_writer.isOpened():
logger.info(f'脚部视频写入器初始化成功: {feet_video_path}')
else:
logger.error(f'脚部视频写入器初始化失败: {feet_video_path}')
# # 获取摄像头分辨率
# if self.camera and self.camera.isOpened():
# target_width,target_height = video_manager.MAX_FRAME_SIZE
# self.feet_video_writer = cv2.VideoWriter(
# feet_video_path, fourcc, fps, (target_width, target_height)
# )
# # 检查视频写入器是否初始化成功
# if self.feet_video_writer.isOpened():
# logger.info(f'脚部视频写入器初始化成功: {feet_video_path}')
# else:
# logger.error(f'脚部视频写入器初始化失败: {feet_video_path}')
# else:
# logger.error('摄像头未打开,无法初始化脚部视频写入器')
else:
logger.warning('摄像头设备未启用,跳过脚部视频写入器初始化')
# # 屏幕录制写入器(默认分辨率,后续根据实际帧调整)
# self.screen_video_writer = cv2.VideoWriter(
# screen_video_path, fourcc, fps, (1920, 1080)
# )
# 重置停止事件
self.recording_stop_event.clear()
self.sync_recording = True
# 启动录制线程
if self.feet_video_writer:
self.feet_recording_thread = threading.Thread(
target=self._feet_recording_thread,
daemon=True,
name='FeetRecordingThread'
)
self.feet_recording_thread.start()
# #屏幕录制
# if self.screen_video_writer:
# self.screen_recording_thread = threading.Thread(
# target=self._screen_recording_thread,
# daemon=True,
# name='ScreenRecordingThread'
# )
# self.screen_recording_thread.start()
# 设置录制状态
result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat()
result['message'] = '同步录制已启动'
logger.debug(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}')
except Exception as e:
logger.error(f'启动同步录制失败: {e}')
result['message'] = f'启动录制失败: {str(e)}'
# 清理已创建的写入器
self._cleanup_video_writers()
return result
def stop_recording(self, session_id: str, video_data_base64) -> Dict[str, Any]:
"""停止同步录制
Args:
session_id: 检测会话ID
video_data_base64: 屏幕录制视频的base64编码数据可选
Returns:
Dict: 录制停止状态和信息
"""
result = {
'success': False,
'session_id': session_id,
'recording_duration': 0,
'video_files': [],
'message': ''
}
try:
# 检查录制状态
if not self.sync_recording:
result['message'] = '当前没有进行录制'
return result
if self.current_session_id != session_id:
result['message'] = f'会话ID不匹配当前录制会话: {self.current_session_id}'
return result
# 设置停止事件
self.recording_stop_event.set()
session_data = self.db_manager.get_session_data(session_id)
base_path = os.path.join('data', 'patients', session_data['patient_id'], session_id)
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
body_video_path = os.path.join(base_path, 'body.mp4')
screen_video_path = os.path.join(base_path, 'screen.webm')
# 等待录制线程结束
threads_to_join = [
(self.feet_recording_thread, 'feet'),
(self.body_recording_thread, 'body')
]
logger.info(f"正在停止录制线程 - 会话ID: {session_id}")
for thread, name in threads_to_join:
if thread and thread.is_alive():
logger.debug(f"等待{name}录制线程结束...")
thread.join(timeout=3)
if thread.is_alive():
logger.warning(f'{name}录制线程未能在3秒内正常结束可能存在阻塞')
else:
logger.debug(f'{name}录制线程已正常结束')
else:
logger.debug(f'{name}录制线程未运行或已结束')
# 计算录制时长
if self.recording_start_time:
duration = (datetime.now() - self.recording_start_time).total_seconds()
result['recording_duration'] = duration
# 清理视频写入器并收集文件信息
# video_files = self._cleanup_video_writers()
# 保存传入的屏幕录制视频数据,替代原有屏幕录制视频保存逻辑
# video_bytes = base64.b64decode(video_data_base64)
with open(screen_video_path, 'wb') as f:
f.write(video_data_base64)
# video_files.append(screen_video_path)
logger.info(f'屏幕录制视频保存成功,路径: {screen_video_path}, 文件大小: {os.path.getsize(screen_video_path)} 字节')
result['video_files'] = screen_video_path
# 更新数据库中的会话信息
if self.db_manager and result['recording_duration'] > 0:
try:
duration_seconds = int(result['recording_duration'])
self.db_manager.update_session_duration(session_id, duration_seconds)
self.db_manager.update_session_normal_video_path(session_id, feet_video_path)
self.db_manager.update_session_femtobolt_video_path(session_id, body_video_path)
self.db_manager.update_session_screen_video_path(session_id, screen_video_path)
# 更新会话状态为已完成
if self.db_manager.update_session_status(session_id, 'completed'):
logger.debug(f'数据库会话信息更新成功 - 会话ID: {session_id}, 持续时间: {duration_seconds}')
else:
logger.error(f'更新会话状态为已完成失败 - 会话ID: {session_id}')
except Exception as db_error:
logger.error(f'更新数据库会话信息失败: {db_error}')
# 重置录制状态
self.sync_recording = False
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
result['success'] = True
result['message'] = '同步录制已停止'
logger.debug(f'同步录制已停止 - 会话ID: {session_id}, 录制时长: {result["recording_duration"]:.2f}')
except Exception as e:
logger.error(f'停止同步录制失败: {e}', exc_info=True)
result['message'] = f'停止录制失败: {str(e)}'
return result
def add_screen_frame(self, frame_data: str):
"""添加屏幕录制帧
Args:
frame_data: base64编码的屏幕截图数据
"""
if self.sync_recording and not self.screen_frame_queue.full():
try:
self.screen_frame_queue.put(frame_data, block=False)
except:
# 队列满时丢弃帧
pass
def _feet_recording_thread(self):
"""足部视频录制线程"""
consecutive_failures = 0
max_consecutive_failures = 10
# logger.info(f"足部录制线程已启动 - 会话ID: {self.current_session_id}")
logger.info(f"视频写入器状态: {self.feet_video_writer.isOpened() if self.feet_video_writer else 'None'}")
try:
while self.sync_recording and not self.recording_stop_event.is_set():
if self.feet_video_writer:
# 从全局缓存获取最新帧
frame, frame_timestamp = self._get_latest_frame_from_cache('camera')
# 详细记录帧获取情况
if frame is not None:
logger.debug(f"成功获取帧 - 尺寸: {frame.shape}, 数据类型: {frame.dtype}, 时间戳: {frame_timestamp}")
# 检查视频写入器状态
if not self.feet_video_writer.isOpened():
logger.error(f"脚部视频写入器已关闭,无法写入帧 - 会话ID: {self.current_session_id}")
break
try:
# 复制帧数据避免引用问题
image = frame.copy()
# 写入录制文件
write_success = self.feet_video_writer.write(image)
# 检查写入是否成功
if write_success is False:
logger.error(f"视频帧写入返回False - 可能写入失败")
consecutive_failures += 1
else:
consecutive_failures = 0 # 重置失败计数
# 记录录制统计
if hasattr(self, 'recording_frame_count'):
self.recording_frame_count += 1
else:
self.recording_frame_count = 1
except Exception as write_error:
logger.error(f"写入脚部视频帧异常: {write_error}")
consecutive_failures += 1
if consecutive_failures >= 10:
logger.error("连续写入失败次数过多,停止录制")
break
else:
logger.warning(f"从缓存获取的帧为None - 连续失败{consecutive_failures + 1}")
consecutive_failures += 1
if consecutive_failures <= 3:
logger.warning(f"录制线程无法从缓存获取帧 (连续失败{consecutive_failures}次)")
elif consecutive_failures == max_consecutive_failures:
logger.error(f"录制线程连续失败{max_consecutive_failures}次,可能缓存无数据或推流已停止")
# 等待一段时间再重试
time.sleep(0.1)
else:
logger.error("足部视频写入器未初始化")
break
# 检查连续失败情况
if consecutive_failures >= max_consecutive_failures:
logger.error(f"连续失败次数达到上限({max_consecutive_failures}),停止录制")
break
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.error(f'足部录制线程异常: {e}')
finally:
logger.info(f"足部录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {getattr(self, 'recording_frame_count', 0)}")
# 确保视频写入器被正确关闭
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
logger.debug("足部视频写入器已释放")
def _body_recording_thread(self):
"""身体视频录制线程"""
consecutive_failures = 0
max_consecutive_failures = 10
# logger.info(f"身体录制线程启动 - 会话ID: {self.current_session_id}")
try:
while self.sync_recording and not self.recording_stop_event.is_set():
if self.body_video_writer:
# 从全局缓存获取最新帧
frame, frame_timestamp = self._get_latest_frame_from_cache('femtobolt')
if frame is not None:
# 检查视频写入器状态
if not self.body_video_writer.isOpened():
logger.error(f"身体视频写入器已关闭,无法写入帧 - 会话ID: {self.current_session_id}")
break
# 添加帧信息日志
logger.debug(f"获取到身体帧 - 形状: {frame.shape}, 数据类型: {frame.dtype}, 时间戳: {frame_timestamp}")
try:
# 复制帧数据避免引用问题
image = frame.copy()
# 检查图像有效性
if image is None or image.size == 0:
logger.warning(f"身体帧数据无效 - 会话ID: {self.current_session_id}")
consecutive_failures += 1
continue
# 确保图像数据类型正确
if image.dtype != np.uint8:
logger.debug(f"转换身体帧数据类型从 {image.dtype} 到 uint8")
image = image.astype(np.uint8)
# 确保图像是3通道BGR格式
if len(image.shape) != 3 or image.shape[2] != 3:
logger.warning(f"身体帧格式异常: {image.shape}期望3通道BGR格式")
consecutive_failures += 1
continue
# 检查并调整图像分辨率以匹配视频写入器
current_height, current_width = image.shape[:2]
expected_width, expected_height = 288, 576 # 默认期望分辨率
if current_width != expected_width or current_height != expected_height:
logger.debug(f"调整身体帧分辨率从 {current_width}x{current_height}{expected_width}x{expected_height}")
image = cv2.resize(image, (expected_width, expected_height))
# 确保图像数据连续性OpenCV要求
if not image.flags['C_CONTIGUOUS']:
logger.debug("转换身体帧为连续内存布局")
image = np.ascontiguousarray(image)
# 写入录制文件
logger.debug(f"尝试写入身体视频帧 - 图像形状: {image.shape}, 数据类型: {image.dtype}, 连续性: {image.flags['C_CONTIGUOUS']}")
write_success = self.body_video_writer.write(image)
# 检查写入是否成功 - cv2.VideoWriter.write()可能返回None、False或True
if write_success is False:
consecutive_failures += 1
logger.warning(f"身体视频帧写入明确失败 - 会话ID: {self.current_session_id}, 连续失败次数: {consecutive_failures}, 图像形状: {image.shape}, 写入器状态: {self.body_video_writer.isOpened()}")
if consecutive_failures >= max_consecutive_failures:
logger.error(f"身体视频写入连续失败{max_consecutive_failures}次,停止录制")
break
elif write_success is None:
# 某些OpenCV版本可能返回None这通常表示写入失败
consecutive_failures += 1
logger.warning(f"身体视频帧写入返回None - 会话ID: {self.current_session_id}, 连续失败次数: {consecutive_failures}, 可能是编解码器问题")
if consecutive_failures >= max_consecutive_failures:
logger.error(f"身体视频写入连续返回None {max_consecutive_failures}次,停止录制")
break
else:
consecutive_failures = 0
logger.debug(f"成功写入身体视频帧 - 会话ID: {self.current_session_id}")
# 释放图像内存
# del image
except Exception as e:
consecutive_failures += 1
logger.error(f'身体视频帧写入异常: {e}, 连续失败次数: {consecutive_failures}, 帧形状: {frame.shape if frame is not None else "None"}')
if consecutive_failures >= max_consecutive_failures:
logger.error(f"身体视频写入连续异常{max_consecutive_failures}次,停止录制")
break
else:
# 没有可用帧,短暂等待
logger.debug(f"未获取到身体帧,等待中... - 会话ID: {self.current_session_id}")
time.sleep(0.01)
continue
else:
logger.warning(f"身体视频写入器未初始化 - 会话ID: {self.current_session_id}")
time.sleep(0.1)
continue
# 控制录制帧率
time.sleep(1/30) # 30 FPS
except Exception as e:
logger.error(f'身体录制线程异常: {e}')
finally:
logger.info(f"身体录制线程结束 - 会话ID: {self.current_session_id}")
def _screen_recording_thread(self):
"""屏幕录制线程"""
try:
while self.sync_recording and not self.recording_stop_event.is_set():
try:
# 从队列获取屏幕帧
frame_data = self.screen_frame_queue.get(timeout=1)
# 解码base64图像
image_data = base64.b64decode(frame_data)
nparr = np.frombuffer(image_data, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is not None and self.screen_video_writer:
# 调整到录制分辨率
frame = cv2.resize(frame, (1920, 1080))
self.screen_video_writer.write(frame)
except queue.Empty:
continue
except Exception as e:
logger.error(f'屏幕录制帧处理失败: {e}')
except Exception as e:
logger.error(f'屏幕录制线程异常: {e}')
def _cleanup_video_writers(self) -> List[str]:
"""清理视频写入器并返回文件列表"""
video_files = []
try:
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
if self.current_patient_id and self.current_session_id:
feet_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'feet.mp4')
if os.path.exists(feet_path):
video_files.append(feet_path)
if self.body_video_writer:
self.body_video_writer.release()
self.body_video_writer = None
if self.current_patient_id and self.current_session_id:
body_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'body.mp4')
if os.path.exists(body_path):
video_files.append(body_path)
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
if self.current_patient_id and self.current_session_id:
screen_path = os.path.join('data', 'patients', self.current_patient_id,
self.current_session_id, 'screen.mp4')
if os.path.exists(screen_path):
video_files.append(screen_path)
except Exception as e:
logger.error(f'清理视频写入器失败: {e}')
return video_files
def _save_frame_to_cache(self, frame, frame_type='camera'):
"""保存帧到全局缓存"""
try:
import time
with self.frame_cache_lock:
current_time = time.time()
# 清理过期帧
self._cleanup_expired_frames()
# 如果缓存已满,移除最旧的帧
if frame_type in self.frame_cache and len(self.frame_cache[frame_type]) >= self.max_cache_size:
oldest_key = min(self.frame_cache[frame_type].keys())
del self.frame_cache[frame_type][oldest_key]
# 初始化帧类型缓存
if frame_type not in self.frame_cache:
self.frame_cache[frame_type] = {}
# 保存帧(深拷贝避免引用问题)
frame_data = {
'frame': frame.copy(),
'timestamp': current_time,
'frame_id': len(self.frame_cache[frame_type])
}
self.frame_cache[frame_type][current_time] = frame_data
# logger.debug(f'成功保存帧到缓存: {frame_type}, 缓存大小: {len(self.frame_cache[frame_type])}, 帧尺寸: {frame.shape}')
except Exception as e:
logger.error(f'保存帧到缓存失败: {e}')
def _get_latest_frame_from_cache(self, frame_type='camera'):
"""从缓存获取最新帧"""
try:
import time
with self.frame_cache_lock:
# logger.debug(f'尝试从缓存获取帧: {frame_type}')
if frame_type not in self.frame_cache:
logger.debug(f'缓存中不存在帧类型: {frame_type}, 可用类型: {list(self.frame_cache.keys())}')
return None, None
if not self.frame_cache[frame_type]:
logger.debug(f'帧类型 {frame_type} 的缓存为空')
return None, None
# 清理过期帧
self._cleanup_expired_frames()
if not self.frame_cache[frame_type]:
logger.debug(f'清理过期帧后,帧类型 {frame_type} 的缓存为空')
return None, None
# 获取最新帧
latest_timestamp = max(self.frame_cache[frame_type].keys())
frame_data = self.frame_cache[frame_type][latest_timestamp]
current_time = time.time()
frame_age = current_time - frame_data['timestamp']
# logger.debug(f'成功获取最新帧: {frame_type}, 帧龄: {frame_age:.2f}秒, 缓存大小: {len(self.frame_cache[frame_type])}')
return frame_data['frame'].copy(), frame_data['timestamp']
except Exception as e:
logger.error(f'从缓存获取帧失败: {e}')
return None, None
def _cleanup_expired_frames(self):
"""清理过期的缓存帧"""
try:
import time
current_time = time.time()
for frame_type in list(self.frame_cache.keys()):
expired_keys = []
for timestamp in self.frame_cache[frame_type].keys():
if current_time - timestamp > self.cache_timeout:
expired_keys.append(timestamp)
# 删除过期帧
for key in expired_keys:
del self.frame_cache[frame_type][key]
# if expired_keys:
# logger.debug(f'清理了 {len(expired_keys)} 个过期帧: {frame_type}')
except Exception as e:
logger.error(f'清理过期帧失败: {e}')
class RealIMUDevice:
"""真实IMU设备通过串口读取姿态数据"""
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.ser = None
self.buffer = bytearray()
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}')
self._connect()
def _connect(self):
try:
logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}')
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
if hasattr(self.ser, 'reset_input_buffer'):
try:
self.ser.reset_input_buffer()
logger.debug('已清空串口输入缓冲区')
except Exception as e:
logger.debug(f'重置串口输入缓冲区失败: {e}')
logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps')
except Exception as e:
# logger.error(f'IMU设备连接失败: {e}', exc_info=True)
self.ser = None
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data:
return raw_data
# 应用校准偏移
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
# 减去基准值(零点偏移)
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
@staticmethod
def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF
def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]:
if len(data) != 11:
logger.debug(f'无效数据包长度: {len(data)}')
return None
if data[0] != 0x55:
logger.debug(f'错误的包头: 0x{data[0]:02X}')
return None
if self._checksum(data) != data[-1]:
logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}')
return None
packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01°
pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = -round(rxl / 32768.0 * k_angle,2)
pitch = -round(pitchl / 32768.0 * k_angle,2)
yaw = -round(yawl / 32768.0 * k_angle,2)
temp = temp / 100.0
self.last_data = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'temperature': temp
}
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data
else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
return None
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
if not self.ser or not getattr(self.ser, 'is_open', False):
logger.warning('IMU串口未连接尝试重新连接...')
self._connect()
return {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
try:
bytes_waiting = self.ser.in_waiting
if bytes_waiting:
# logger.debug(f'串口缓冲区待读字节: {bytes_waiting}')
chunk = self.ser.read(bytes_waiting)
# logger.debug(f'读取到字节: {len(chunk)}')
self.buffer.extend(chunk)
while len(self.buffer) >= 11:
if self.buffer[0] != 0x55:
dropped = self.buffer.pop(0)
logger.debug(f'丢弃无效字节: 0x{dropped:02X}')
continue
packet = bytes(self.buffer[:11])
parsed = self._parse_packet(packet)
del self.buffer[:11]
if parsed is not None:
raw = {
'head_pose': {
'rotation': parsed['yaw'], # rotation = roll
'tilt': parsed['roll'], # tilt = yaw
'pitch': parsed['pitch'] # pitch = pitch
},
'temperature': parsed['temperature'],
'timestamp': datetime.now().isoformat()
}
# logger.debug(f'映射后的头部姿态: {raw}')
return self.apply_calibration(raw) if apply_calibration else raw
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
except Exception as e:
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
def __del__(self):
try:
if self.ser and getattr(self.ser, 'is_open', False):
self.ser.close()
logger.info('IMU设备串口已关闭')
except Exception:
pass
class MockIMUDevice:
"""模拟IMU设备"""
def __init__(self):
self.noise_level = 0.1
self.calibration_data = None # 校准数据
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 头部姿态零点偏移
def set_calibration(self, calibration: Dict[str, Any]):
"""设置校准数据"""
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对姿态"""
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
"""读取IMU数据"""
# 生成头部姿态角度数据,角度范围(-90°, +90°)
# 使用正弦波模拟自然的头部运动,添加随机噪声
import time
current_time = time.time()
# 旋转角(左旋为负,右旋为正)
rotation_angle = 30 * np.sin(current_time * 0.5) + np.random.normal(0, self.noise_level * 5)
rotation_angle = np.clip(rotation_angle, -90, 90)
# 倾斜角(左倾为负,右倾为正)
tilt_angle = 20 * np.sin(current_time * 0.3 + np.pi/4) + np.random.normal(0, self.noise_level * 5)
tilt_angle = np.clip(tilt_angle, -90, 90)
# 俯仰角(俯角为负,仰角为正)
pitch_angle = 15 * np.sin(current_time * 0.7 + np.pi/2) + np.random.normal(0, self.noise_level * 5)
pitch_angle = np.clip(pitch_angle, -90, 90)
# 生成原始数据
raw_data = {
'head_pose': {
'rotation': rotation_angle, # 旋转角:左旋(-), 右旋(+)
'tilt': tilt_angle, # 倾斜角:左倾(-), 右倾(+)
'pitch': pitch_angle # 俯仰角:俯角(-), 仰角(+)
},
'timestamp': datetime.now().isoformat()
}
# 应用校准并返回
return self.apply_calibration(raw_data) if apply_calibration else raw_data
class RealPressureDevice:
"""真实SMiTSense压力传感器设备"""
def __init__(self, dll_path=None):
"""初始化SMiTSense压力传感器
Args:
dll_path: DLL文件路径如果为None则使用默认路径
"""
self.dll = None
self.device_handle = None
self.is_connected = False
self.rows = 0
self.cols = 0
self.frame_size = 0
self.buf = None
# 设置DLL路径 - 使用正确的DLL文件名
if dll_path is None:
# 尝试多个可能的DLL文件名
dll_candidates = [
os.path.join(os.path.dirname(__file__), 'dll', 'smitsense', 'SMiTSenseUsbWrapper.dll'),
os.path.join(os.path.dirname(__file__), 'dll', 'smitsense', 'SMiTSenseUsb-F3.0.dll')
]
dll_path = None
for candidate in dll_candidates:
if os.path.exists(candidate):
dll_path = candidate
break
if dll_path is None:
raise FileNotFoundError(f"未找到SMiTSense DLL文件检查路径: {dll_candidates}")
self.dll_path = dll_path
logger.info(f'初始化真实压力传感器设备DLL路径: {dll_path}')
try:
self._load_dll()
self._initialize_device()
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
# 如果真实设备初始化失败,可以选择降级为模拟设备
raise
def _load_dll(self):
"""加载SMiTSense DLL并设置函数签名"""
try:
if not os.path.exists(self.dll_path):
raise FileNotFoundError(f"DLL文件未找到: {self.dll_path}")
# 加载DLL
self.dll = ctypes.WinDLL(self.dll_path)
logger.info(f"成功加载DLL: {self.dll_path}")
# 设置函数签名基于testsmit.py的工作代码
self.dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int]
self.dll.SMiTSenseUsb_Init.restype = ctypes.c_int
self.dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)]
self.dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int
self.dll.SMiTSenseUsb_OpenAndStart.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_uint16),
ctypes.POINTER(ctypes.c_uint16)
]
self.dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int
self.dll.SMiTSenseUsb_GetLatestFrame.argtypes = [
ctypes.POINTER(ctypes.c_uint16),
ctypes.c_int
]
self.dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int
self.dll.SMiTSenseUsb_StopAndClose.argtypes = []
self.dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int
logger.info("DLL函数签名设置完成")
except Exception as e:
logger.error(f"加载DLL失败: {e}")
raise
def _initialize_device(self):
"""初始化设备连接"""
try:
# 初始化USB连接
ret = self.dll.SMiTSenseUsb_Init(0)
if ret != 0:
raise RuntimeError(f"USB初始化失败: {ret}")
# 扫描设备
count = ctypes.c_int()
ret = self.dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count))
if ret != 0 or count.value == 0:
raise RuntimeError(f"设备扫描失败或未找到设备: {ret}, count: {count.value}")
logger.info(f"发现 {count.value} 个SMiTSense设备")
# 打开并启动第一个设备
rows = ctypes.c_uint16()
cols = ctypes.c_uint16()
ret = self.dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols))
if ret != 0:
raise RuntimeError(f"设备启动失败: {ret}")
self.rows = rows.value
self.cols = cols.value
self.frame_size = self.rows * self.cols
self.buf_type = ctypes.c_uint16 * self.frame_size
self.buf = self.buf_type()
self.is_connected = True
logger.info(f"SMiTSense压力传感器初始化成功: {self.rows}行 x {self.cols}")
except Exception as e:
logger.error(f"设备初始化失败: {e}")
raise
def read_data(self) -> Dict[str, Any]:
"""读取压力数据并转换为与MockPressureDevice兼容的格式"""
try:
if not self.is_connected or not self.dll:
logger.error("设备未连接")
return self._get_empty_data()
# 读取原始压力数据
ret = self.dll.SMiTSenseUsb_GetLatestFrame(self.buf, self.frame_size)
if ret != 0:
logger.warning(f"读取数据帧失败: {ret}")
return self._get_empty_data()
# 转换为numpy数组
raw_data = np.frombuffer(self.buf, dtype=np.uint16).reshape((self.rows, self.cols))
# 计算足部区域压力 (基于传感器的实际布局)
foot_zones = self._calculate_foot_pressure_zones(raw_data)
# 生成压力图像
pressure_image_base64 = self._generate_pressure_image(
foot_zones['left_front'],
foot_zones['left_rear'],
foot_zones['right_front'],
foot_zones['right_rear'],
raw_data
)
return {
'foot_pressure': {
'left_front': round(foot_zones['left_front'], 2),
'left_rear': round(foot_zones['left_rear'], 2),
'right_front': round(foot_zones['right_front'], 2),
'right_rear': round(foot_zones['right_rear'], 2),
'left_total': round(foot_zones['left_total'], 2),
'right_total': round(foot_zones['right_total'], 2)
},
'pressure_image': pressure_image_base64,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"读取压力数据异常: {e}")
return self._get_empty_data()
def _calculate_foot_pressure_zones(self, raw_data):
"""计算足部区域压力,返回百分比:
- 左足、右足:相对于双足总压的百分比
- 左前、左后:相对于左足总压的百分比
- 右前、右后:相对于右足总压的百分比
基于原始矩阵按行列各等分为四象限(上半部为前、下半部为后,左半部为左、右半部为右)。
"""
try:
# 防护:空数据
if raw_data is None:
raise ValueError("raw_data is None")
# 转为浮点以避免 uint16 溢出
rd = np.asarray(raw_data, dtype=np.float64)
rows, cols = rd.shape if rd.ndim == 2 else (0, 0)
if rows == 0 or cols == 0:
raise ValueError("raw_data has invalid shape")
# 行列对半分(上=前,下=后;左=左,右=右)
mid_r = rows // 2
mid_c = cols // 2
# 四象限求和
left_front = float(np.sum(rd[:mid_r, :mid_c], dtype=np.float64))
left_rear = float(np.sum(rd[mid_r:, :mid_c], dtype=np.float64))
right_front = float(np.sum(rd[:mid_r, mid_c:], dtype=np.float64))
right_rear = float(np.sum(rd[mid_r:, mid_c:], dtype=np.float64))
# 绝对总压
left_total_abs = left_front + left_rear
right_total_abs = right_front + right_rear
total_abs = left_total_abs + right_total_abs
# 左右足占比(相对于双足总压)
left_total_pct = float((left_total_abs / total_abs * 100) if total_abs > 0 else 0)
right_total_pct = float((right_total_abs / total_abs * 100) if total_abs > 0 else 0)
# 前后占比(相对于各自单足总压)
left_front_pct = float((left_front / left_total_abs * 100) if left_total_abs > 0 else 0)
left_rear_pct = float((left_rear / left_total_abs * 100) if left_total_abs > 0 else 0)
right_front_pct = float((right_front / right_total_abs * 100) if right_total_abs > 0 else 0)
right_rear_pct = float((right_rear / right_total_abs * 100) if right_total_abs > 0 else 0)
return {
'left_front': round(left_front_pct),
'left_rear': round(left_rear_pct),
'right_front': round(right_front_pct),
'right_rear': round(right_rear_pct),
'left_total': round(left_total_pct),
'right_total': round(right_total_pct),
'total_pressure': round(total_abs)
}
except Exception as e:
logger.error(f"计算足部区域压力异常: {e}")
return {
'left_front': 0, 'left_rear': 0, 'right_front': 0, 'right_rear': 0,
'left_total': 0, 'right_total': 0, 'total_pressure': 0
}
def _generate_pressure_image(self, left_front, left_rear, right_front, right_rear, raw_data=None) -> str:
"""生成足部压力图片的base64数据"""
try:
if MATPLOTLIB_AVAILABLE and raw_data is not None:
# 使用原始数据生成更详细的热力图
return self._generate_heatmap_image(raw_data)
else:
# 降级到简单的区域显示图
return self._generate_simple_pressure_image(left_front, left_rear, right_front, right_rear)
except Exception as e:
logger.warning(f"生成压力图片失败: {e}")
return ""
def _generate_heatmap_image(self, raw_data) -> str:
"""生成基于原始数据的热力图OpenCV实现固定范围映射效果与matplotlib一致"""
try:
import cv2
import numpy as np
import base64
from io import BytesIO
from PIL import Image
# 固定映射范围(与 matplotlib vmin/vmax 一致)
vmin, vmax = 0, 1000
norm_data = np.clip((raw_data - vmin) / (vmax - vmin) * 255, 0, 255).astype(np.uint8)
# 应用 jet 颜色映射
heatmap = cv2.applyColorMap(norm_data, cv2.COLORMAP_JET)
# OpenCV 生成的是 BGR转成 RGB
heatmap_rgb = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
# 转成 Pillow Image
img = Image.fromarray(heatmap_rgb)
# 输出为 Base64 PNG
buffer = BytesIO()
img.save(buffer, format="PNG")
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
return f"data:image/png;base64,{image_base64}"
except Exception as e:
logger.warning(f"生成热力图失败: {e}")
return self._generate_simple_pressure_image(0, 0, 0, 0)
# def _generate_heatmap_image(self, raw_data) -> str:
# """生成基于原始数据的热力图"""
# try:
# import matplotlib
# matplotlib.use('Agg')
# import matplotlib.pyplot as plt
# from io import BytesIO
# # 参考 tests/testsmit.py 的渲染方式:使用 jet 色图、nearest 插值、固定范围并关闭坐标轴
# fig, ax = plt.subplots()
# im = ax.imshow(raw_data, cmap='jet', interpolation='nearest', vmin=0, vmax=1000)
# ax.axis('off')
# # 紧凑布局并导出为 base64
# from io import BytesIO
# buffer = BytesIO()
# plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, pad_inches=0, facecolor='black')
# buffer.seek(0)
# image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# plt.close(fig)
# return f"data:image/png;base64,{image_base64}"
# except Exception as e:
# logger.warning(f"生成热力图失败: {e}")
# return self._generate_simple_pressure_image(0, 0, 0, 0)
def _generate_simple_pressure_image(self, left_front, left_rear, right_front, right_rear) -> str:
"""生成简单的足部压力区域图"""
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from io import BytesIO
# 创建图形
fig, ax = plt.subplots(1, 1, figsize=(6, 8))
ax.set_xlim(0, 10)
ax.set_ylim(0, 12)
ax.set_aspect('equal')
ax.axis('off')
# 定义颜色映射
max_pressure = max(left_front, left_rear, right_front, right_rear)
if max_pressure > 0:
left_front_color = plt.cm.Reds(left_front / max_pressure)
left_rear_color = plt.cm.Reds(left_rear / max_pressure)
right_front_color = plt.cm.Reds(right_front / max_pressure)
right_rear_color = plt.cm.Reds(right_rear / max_pressure)
else:
left_front_color = left_rear_color = right_front_color = right_rear_color = 'lightgray'
# 绘制足部区域
left_front_rect = patches.Rectangle((1, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=left_front_color)
left_rear_rect = patches.Rectangle((1, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=left_rear_color)
right_front_rect = patches.Rectangle((7, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=right_front_color)
right_rear_rect = patches.Rectangle((7, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=right_rear_color)
ax.add_patch(left_front_rect)
ax.add_patch(left_rear_rect)
ax.add_patch(right_front_rect)
ax.add_patch(right_rear_rect)
# 添加标签
ax.text(2, 8, f'{left_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 4, f'{left_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 8, f'{right_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 4, f'{right_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 0.5, '左足', ha='center', va='center', fontsize=12, weight='bold')
ax.text(8, 0.5, '右足', ha='center', va='center', fontsize=12, weight='bold')
# 保存为base64
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, facecolor='black')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
plt.close(fig)
return f"data:image/png;base64,{image_base64}"
except Exception as e:
logger.warning(f"生成简单压力图片失败: {e}")
return ""
def _get_empty_data(self):
"""返回空的压力数据"""
return {
'foot_pressure': {
'left_front': 0.0,
'left_rear': 0.0,
'right_front': 0.0,
'right_rear': 0.0,
'left_total': 0.0,
'right_total': 0.0
},
'pressure_image': "",
'timestamp': datetime.now().isoformat()
}
def close(self):
"""显式关闭压力传感器连接"""
try:
if self.is_connected and self.dll:
self.dll.SMiTSenseUsb_StopAndClose()
self.is_connected = False
logger.info('SMiTSense压力传感器连接已关闭')
except Exception as e:
logger.error(f'关闭压力传感器连接异常: {e}')
def __del__(self):
"""析构函数,确保资源清理"""
self.close()
class MockPressureDevice:
"""模拟压力传感器设备模拟真实SMiTSense设备的行为"""
def __init__(self):
self.base_pressure = 500 # 基础压力值
self.noise_level = 10
self.rows = 4 # 模拟传感器矩阵行数
self.cols = 4 # 模拟传感器矩阵列数
self.time_offset = np.random.random() * 10 # 随机时间偏移,让每个实例的波形不同
def read_data(self) -> Dict[str, Any]:
"""读取压力数据,模拟基于矩阵数据的真实设备行为"""
try:
# 生成模拟的传感器矩阵数据
raw_data = self._generate_simulated_matrix_data()
# 使用与真实设备相同的计算逻辑
foot_zones = self._calculate_foot_pressure_zones(raw_data)
# 生成压力图像
pressure_image_base64 = self._generate_pressure_image(
foot_zones['left_front'],
foot_zones['left_rear'],
foot_zones['right_front'],
foot_zones['right_rear'],
raw_data
)
return {
'foot_pressure': {
'left_front': round(foot_zones['left_front'], 2),
'left_rear': round(foot_zones['left_rear'], 2),
'right_front': round(foot_zones['right_front'], 2),
'right_rear': round(foot_zones['right_rear'], 2),
'left_total': round(foot_zones['left_total'], 2),
'right_total': round(foot_zones['right_total'], 2)
},
'pressure_image': pressure_image_base64,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f"模拟压力设备读取数据异常: {e}")
return self._get_empty_data()
def _generate_simulated_matrix_data(self):
"""生成模拟的传感器矩阵数据,模拟真实的足部压力分布"""
import time
current_time = time.time() + self.time_offset
# 创建4x4的传感器矩阵
matrix_data = np.zeros((self.rows, self.cols))
# 模拟动态的压力分布,使用正弦波叠加噪声
for i in range(self.rows):
for j in range(self.cols):
# 基础压力值,根据传感器位置不同
base_value = self.base_pressure * (0.3 + 0.7 * np.random.random())
# 添加时间变化(模拟人体重心变化)
time_variation = np.sin(current_time * 0.5 + i * 0.5 + j * 0.3) * 0.3
# 添加噪声
noise = np.random.normal(0, self.noise_level)
# 确保压力值非负
matrix_data[i, j] = max(0, base_value * (1 + time_variation) + noise)
return matrix_data
def _calculate_foot_pressure_zones(self, raw_data):
"""计算足部区域压力,返回百分比:
- 左足、右足:相对于双足总压的百分比
- 左前、左后:相对于左足总压的百分比
- 右前、右后:相对于右足总压的百分比
基于原始矩阵按行列各等分为四象限(上半部为前、下半部为后,左半部为左、右半部为右)。
"""
try:
# 防护:空数据
if raw_data is None:
raise ValueError("raw_data is None")
# 转为浮点以避免 uint16 溢出
rd = np.asarray(raw_data, dtype=np.float64)
rows, cols = rd.shape if rd.ndim == 2 else (0, 0)
if rows == 0 or cols == 0:
raise ValueError("raw_data has invalid shape")
# 行列对半分(上=前,下=后;左=左,右=右)
mid_r = rows // 2
mid_c = cols // 2
# 四象限求和
left_front = float(np.sum(rd[:mid_r, :mid_c], dtype=np.float64))
left_rear = float(np.sum(rd[mid_r:, :mid_c], dtype=np.float64))
right_front = float(np.sum(rd[:mid_r, mid_c:], dtype=np.float64))
right_rear = float(np.sum(rd[mid_r:, mid_c:], dtype=np.float64))
# 绝对总压
left_total_abs = left_front + left_rear
right_total_abs = right_front + right_rear
total_abs = left_total_abs + right_total_abs
# 左右足占比(相对于双足总压)
left_total_pct = float((left_total_abs / total_abs * 100) if total_abs > 0 else 0)
right_total_pct = float((right_total_abs / total_abs * 100) if total_abs > 0 else 0)
# 前后占比(相对于各自单足总压)
left_front_pct = float((left_front / left_total_abs * 100) if left_total_abs > 0 else 0)
left_rear_pct = float((left_rear / left_total_abs * 100) if left_total_abs > 0 else 0)
right_front_pct = float((right_front / right_total_abs * 100) if right_total_abs > 0 else 0)
right_rear_pct = float((right_rear / right_total_abs * 100) if right_total_abs > 0 else 0)
return {
'left_front': left_front_pct,
'left_rear': left_rear_pct,
'right_front': right_front_pct,
'right_rear': right_rear_pct,
'left_total': left_total_pct,
'right_total': right_total_pct,
'total_pressure': float(total_abs)
}
except Exception as e:
logger.error(f"计算足部区域压力异常: {e}")
return {
'left_front': 0, 'left_rear': 0, 'right_front': 0, 'right_rear': 0,
'left_total': 0, 'right_total': 0, 'total_pressure': 0
}
def _generate_pressure_image(self, left_front, left_rear, right_front, right_rear, raw_data=None) -> str:
"""生成足部压力图片的base64数据"""
try:
if MATPLOTLIB_AVAILABLE and raw_data is not None:
# 使用原始数据生成更详细的热力图
return self._generate_heatmap_image(raw_data)
else:
# 降级到简单的区域显示图
return self._generate_simple_pressure_image(left_front, left_rear, right_front, right_rear)
except Exception as e:
logger.warning(f"生成模拟压力图片失败: {e}")
return ""
def _generate_heatmap_image(self, raw_data) -> str:
"""生成基于原始数据的热力图"""
try:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from io import BytesIO
# 参考 tests/testsmit.py 的渲染方式:使用 jet 色图、nearest 插值、固定范围并关闭坐标轴
fig, ax = plt.subplots()
im = ax.imshow(raw_data, cmap='jet', interpolation='nearest', vmin=0, vmax=1000)
ax.axis('off')
# 紧凑布局并导出为 base64
from io import BytesIO
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, pad_inches=0)
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
plt.close(fig)
return f"data:image/png;base64,{image_base64}"
except Exception as e:
logger.warning(f"生成热力图失败: {e}")
return self._generate_simple_pressure_image(0, 0, 0, 0)
def _generate_simple_pressure_image(self, left_front, left_rear, right_front, right_rear) -> str:
"""生成简单的足部压力区域图"""
try:
import matplotlib
matplotlib.use('Agg') # 设置非交互式后端避免Tkinter错误
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from io import BytesIO
# 临时禁用PIL的调试日志
pil_logger = logging.getLogger('PIL')
original_level = pil_logger.level
pil_logger.setLevel(logging.WARNING)
# 创建图形
fig, ax = plt.subplots(1, 1, figsize=(6, 8))
ax.set_xlim(0, 10)
ax.set_ylim(0, 12)
ax.set_aspect('equal')
ax.axis('off')
# 定义颜色映射(根据压力值)
max_pressure = max(left_front, left_rear, right_front, right_rear)
if max_pressure > 0:
left_front_color = plt.cm.Reds(left_front / max_pressure)
left_rear_color = plt.cm.Reds(left_rear / max_pressure)
right_front_color = plt.cm.Reds(right_front / max_pressure)
right_rear_color = plt.cm.Reds(right_rear / max_pressure)
else:
left_front_color = left_rear_color = right_front_color = right_rear_color = 'lightgray'
# 绘制左脚
left_front_rect = patches.Rectangle((1, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=left_front_color)
left_rear_rect = patches.Rectangle((1, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=left_rear_color)
# 绘制右脚
right_front_rect = patches.Rectangle((7, 6), 2, 4, linewidth=1, edgecolor='black', facecolor=right_front_color)
right_rear_rect = patches.Rectangle((7, 2), 2, 4, linewidth=1, edgecolor='black', facecolor=right_rear_color)
# 添加到图形
ax.add_patch(left_front_rect)
ax.add_patch(left_rear_rect)
ax.add_patch(right_front_rect)
ax.add_patch(right_rear_rect)
# 添加标签
ax.text(2, 8, f'{left_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 4, f'{left_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 8, f'{right_front:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(8, 4, f'{right_rear:.1f}', ha='center', va='center', fontsize=10, weight='bold')
ax.text(2, 0.5, '左足', ha='center', va='center', fontsize=12, weight='bold')
ax.text(8, 0.5, '右足', ha='center', va='center', fontsize=12, weight='bold')
# 保存为base64
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, facecolor='white')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
plt.close(fig)
# 恢复PIL的日志级别
pil_logger.setLevel(original_level)
return f"data:image/png;base64,{image_base64}"
except Exception as e:
# 确保在异常情况下也恢复PIL的日志级别
try:
pil_logger.setLevel(original_level)
except:
pass
logger.warning(f"生成压力图片失败: {e}")
# 返回一个简单的占位符base64图片
return ""
def _get_empty_data(self):
"""返回空的压力数据"""
return {
'foot_pressure': {
'left_front': 0.0,
'left_rear': 0.0,
'right_front': 0.0,
'right_rear': 0.0,
'left_total': 0.0,
'right_total': 0.0
},
'pressure_image': "",
'timestamp': datetime.now().isoformat()
}
class VideoStreamManager:
"""视频推流管理器"""
def __init__(self, socketio=None, device_manager=None):
self.socketio = socketio
self.device_manager = device_manager
self.device_index = None
self.video_thread = None
self.video_running = False
# # 用于异步编码的线程池和队列
self.encoding_executor = ThreadPoolExecutor(max_workers=2)
self.frame_queue = queue.Queue(maxsize=1) # 只保留最新的一帧
# 内存优化配置
self.frame_skip_counter = 0
self.FRAME_SKIP_RATIO = 1 # 每3帧发送1帧减少网络和内存压力
self.MAX_FRAME_SIZE = (640, 480) # 进一步减小帧尺寸以节省内存
self.MAX_MEMORY_USAGE = 200 * 1024 * 1024 # 200MB内存限制
self.memory_check_counter = 0
# 移除了MEMORY_CHECK_INTERVAL改为每30帧检查一次内存
# 读取RTSP配置
self._load_rtsp_config()
def _load_rtsp_config(self):
"""加载RTSP配置"""
start_time = time.time()
logger.info(f'[TIMING] 开始加载RTSP配置 - {datetime.now().strftime("%H:%M:%S.%f")[:-3]}')
try:
config = configparser.ConfigParser()
config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
config.read(config_path, encoding='utf-8')
device_index_str = config.get('DEVICES', 'camera_index', fallback='0')
self.device_index = int(device_index_str) if device_index_str else 0
end_time = time.time()
logger.info(f'[TIMING] RTSP配置加载完成设备号: {self.device_index} - 耗时: {(end_time - start_time) * 1000:.2f}ms')
except Exception as e:
end_time = time.time()
logger.error(f'[TIMING] 视频监控设备配置失败: {e} - 耗时: {(end_time - start_time) * 1000:.2f}ms')
self.device_index = None
def get_memory_usage(self):
"""获取当前进程内存使用量(字节)"""
try:
process = psutil.Process(os.getpid())
return process.memory_info().rss
except:
return 0
def async_encode_frame(self, frame, frame_count):
"""异步编码帧 - 内存优化版本"""
try:
# 内存检查
self.memory_check_counter += 1
if self.memory_check_counter >= self.MEMORY_CHECK_INTERVAL:
self.memory_check_counter = 0
current_memory = self.get_memory_usage()
if current_memory > self.MAX_MEMORY_USAGE:
logger.warning(f"内存使用过高: {current_memory / 1024 / 1024:.2f}MB强制清理")
gc.collect()
# 如果内存仍然过高,跳过此帧
if self.get_memory_usage() > self.MAX_MEMORY_USAGE:
del frame
return
# 更激进的图像尺寸压缩以节省内存
height, width = frame.shape[:2]
target_width, target_height = self.MAX_FRAME_SIZE
if width > target_width or height > target_height:
# 计算缩放比例,保持宽高比
scale_w = target_width / width
scale_h = target_height / height
scale = min(scale_w, scale_h)
new_width = int(width * scale)
new_height = int(height * scale)
# 使用更快的插值方法减少CPU使用
frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
self.device_manager._save_frame_to_cache(frame, 'camera')
# 优化JPEG编码参数优先考虑速度和内存
encode_param = [
int(cv2.IMWRITE_JPEG_QUALITY), 50, # 进一步降低质量以减少内存使用
int(cv2.IMWRITE_JPEG_OPTIMIZE), 1, # 启用优化
int(cv2.IMWRITE_JPEG_PROGRESSIVE), 0 # 禁用渐进式以减少内存
]
success, buffer = cv2.imencode('.jpg', frame, encode_param)
if not success:
logger.error('图像编码失败')
return
# 立即释放frame内存
del frame
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
if self.socketio:
self.socketio.emit('video_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
# 立即释放base64字符串
del jpg_as_text
except Exception as e:
logger.error(f'异步编码帧失败: {e}')
finally:
# 定期强制垃圾回收
if self.memory_check_counter % 10 == 0:
gc.collect()
def frame_encoding_worker(self):
"""帧编码工作线程"""
while self.video_running:
try:
# 从队列获取帧
frame, frame_count = self.frame_queue.get(timeout=1)
# 提交到线程池进行异步编码
self.encoding_executor.submit(self.async_encode_frame, frame, frame_count)
except queue.Empty:
continue
except Exception as e:
logger.error(f'帧编码工作线程异常: {e}')
def generate_test_frame(self, frame_count):
"""生成测试帧"""
width, height = self.MAX_FRAME_SIZE
# 创建黑色背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
# 添加动态元素
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# 添加时间戳
cv2.putText(frame, timestamp, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加帧计数
cv2.putText(frame, f'TEST Frame: {frame_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
# 添加移动的圆形
center_x = int(320 + 200 * np.sin(frame_count * 0.1))
center_y = int(240 + 100 * np.cos(frame_count * 0.1))
cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1)
# 添加变化的矩形
rect_size = int(50 + 30 * np.sin(frame_count * 0.05))
cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1)
return frame
def generate_video_frames(self):
"""生成视频监控帧"""
t0 = time.time()
frame_count = 0
error_count = 0
use_test_mode = False
first_frame_sent = False
last_frame_time = time.time()
width,height=self.MAX_FRAME_SIZE
# logger.info(f'[TIMING] 进入generate_video_frames - {datetime.now().strftime("%H:%M:%S.%f")[:-3]}')
try:
t_open_start = time.time()
# logger.info(f'[TIMING] 开始打开VideoCapture({self.device_index})')
# 依次尝试不同后端选择最快可用的Windows推荐优先MSMF然后DSHOW
backends = [
(cv2.CAP_MSMF, 'MSMF'),
(cv2.CAP_DSHOW, 'DSHOW'),
(cv2.CAP_ANY, 'ANY')
]
cap = None
selected_backend = None
for api, name in backends:
try:
t_try = time.time()
logger.info(f'[TIMING] 尝试后端: {name}')
tmp = cv2.VideoCapture(self.device_index, api)
create_ms = (time.time() - t_try) * 1000
# logger.info(f'[TIMING] 后端{name} 创建VideoCapture耗时: {create_ms:.2f}ms')
if tmp.isOpened():
cap = tmp
selected_backend = name
# logger.info(f'[TIMING] 选择后端{name} 打开成功')
break
else:
tmp.release()
logger.info(f'[TIMING] 后端{name} 打开失败')
except Exception as e:
logger.warning(f'[TIMING] 后端{name} 异常: {e}')
# logger.info(f'[TIMING] VideoCapture对象创建耗时: {(time.time()-t_open_start)*1000:.2f}ms选用后端: {selected_backend}')
t_open_check = time.time()
if cap is None or not cap.isOpened():
logger.warning(f'[TIMING] 无法打开视频监控流: {self.device_index}切换到测试模式isOpened检查耗时: {(time.time()-t_open_check)*1000:.2f}ms')
use_test_mode = True
if self.socketio:
self.socketio.emit('video_status', {'status': 'started', 'message': '使用测试视频源'})
else:
# 设置相机属性(逐项记录耗时与是否成功)
total_set_start = time.time()
# 先设置编码
t_prop = time.time()
ok_fourcc = cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M','J','P','G'))
# logger.info(f'[TIMING] 设置FOURCC=MJPG 返回: {ok_fourcc} 耗时: {(time.time()-t_prop)*1000:.2f}ms')
# 再设置分辨率
t_prop = time.time()
ok_w = cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
# logger.info(f'[TIMING] 设置宽度={width} 返回: {ok_w} 耗时: {(time.time()-t_prop)*1000:.2f}ms')
t_prop = time.time()
ok_h = cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
# logger.info(f'[TIMING] 设置高度={height} 返回: {ok_h} 耗时: {(time.time()-t_prop)*1000:.2f}ms')
# 最后设置帧率和缓冲
t_prop = time.time()
ok_fps = cap.set(cv2.CAP_PROP_FPS, 30) # 先用30fps更兼容
# logger.info(f'[TIMING] 设置FPS=30 返回: {ok_fps} 耗时: {(time.time()-t_prop)*1000:.2f}ms')
t_prop = time.time()
ok_buf = cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) # 使用极小缓冲区(不支持的后端会忽略)
# logger.info(f'[TIMING] 设置BUFFERSIZE=1 返回: {ok_buf} 耗时: {(time.time()-t_prop)*1000:.2f}ms')
# logger.info(f'[TIMING] 设置相机属性耗时: {(time.time()-total_set_start)*1000:.2f}ms')
# 拉一帧,触发真实初始化
t_first_read = time.time()
warmup_ok, _ = cap.read()
# logger.info(f'[TIMING] 首帧读取耗时: {(time.time()-t_first_read)*1000:.2f}ms, 成功: {warmup_ok}')
if self.socketio:
self.socketio.emit('video_status', {'status': 'started', 'message': f'使用视频监控视频源({selected_backend or "unknown"}'})
self.video_running = True
# logger.info(f'[TIMING] generate_video_frames初始化总耗时: {(time.time()-t0)*1000:.2f}ms')
# # 启动帧编码工作线程
# encoding_thread = threading.Thread(target=self.frame_encoding_worker)
# encoding_thread.daemon = True
# encoding_thread.start()
while self.video_running:
if use_test_mode:
# 使用测试模式生成帧
frame = self.generate_test_frame(frame_count)
ret = True
else:
# 使用视频监控流,添加帧跳过机制减少延迟
ret, frame = cap.read()
if not ret:
error_count += 1
logger.debug(f'视频监控读取帧失败(第{error_count}次),尝试重连...')
if 'cap' in locals():
cap.release()
if error_count > 5:
logger.debug('视频监控连接失败次数过多,切换到测试模式')
use_test_mode = True
if self.socketio:
self.socketio.emit('video_status', {'status': 'switched', 'message': '已切换到测试视频源'})
continue
# 立即重连,不等待
cap = cv2.VideoCapture(self.device_index)
if cap.isOpened():
# 重连时应用相同的激进实时设置
cap.set(cv2.CAP_PROP_BUFFERSIZE, 0)
cap.set(cv2.CAP_PROP_FPS, 60)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc('M', 'J', 'P', 'G'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
continue
error_count = 0 # 重置错误计数
# 内存优化的帧跳过策略
# 减少跳帧数量,避免过度内存使用
skip_count = 0
while skip_count < 3: # 减少到最多跳过3帧
temp_ret, temp_frame = cap.read()
if temp_ret:
# 立即释放之前的帧
if 'frame' in locals():
del frame
frame = temp_frame
skip_count += 1
else:
break
# 降低帧率以减少内存压力
current_time = time.time()
if current_time - last_frame_time < 1/20: # 降低到20fps最大频率
continue
last_frame_time = current_time
frame_count += 1
# 实现帧跳过以减少内存和网络压力
self.frame_skip_counter += 1
if self.frame_skip_counter % (self.FRAME_SKIP_RATIO + 1) != 0:
# 跳过此帧,立即释放内存
del frame
continue
try:
# 直接在主循环中执行帧处理逻辑(替代异步工作线程)
# 内存检查
self.memory_check_counter += 1
if self.memory_check_counter % 30 == 0:
memory_usage = psutil.virtual_memory().percent
if memory_usage > 85:
logger.warning(f'内存使用率过高: {memory_usage}%,跳过当前帧')
del frame
continue
# 按照MAX_FRAME_SIZE裁剪帧
cropped_frame = frame.copy()
width, height = self.MAX_FRAME_SIZE
if cropped_frame.shape[1] > width or cropped_frame.shape[0] > height:
# 计算裁剪区域(居中裁剪)
start_x = max(0, (cropped_frame.shape[1] - width) // 2)
start_y = max(0, (cropped_frame.shape[0] - height) // 2)
end_x = min(cropped_frame.shape[1], start_x + width)
end_y = min(cropped_frame.shape[0], start_y + height)
cropped_frame = cropped_frame[start_y:end_y, start_x:end_x]
# 保存帧到全局缓存
if self.device_manager:
self.device_manager._save_frame_to_cache(cropped_frame, 'camera')
# 每1000帧记录一次缓存保存状态
# if frame_count % 1000 == 0:
# logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存")
else:
logger.warning("VideoStreamManager未关联DeviceManager无法保存帧到缓存")
# JPEG编码和socketio发送
try:
# 使用较低的JPEG质量以节省内存
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 70]
result, buffer = cv2.imencode('.jpg', cropped_frame, encode_param)
if result:
# 转换为base64字符串
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 立即释放buffer内存
del buffer
# 发送数据
if self.socketio:
self.socketio.emit('video_frame', {
'image': jpg_as_text,
'frame_id': frame_count,
'timestamp': time.time()
})
if not first_frame_sent:
first_frame_sent = True
# logger.info(f'[TIMING] 第一帧已发送 - 总耗时: {(time.time()-t0)*1000:.2f}ms')
# 立即释放base64字符串
del jpg_as_text
except Exception as e:
logger.error(f'帧编码失败: {e}')
# 立即释放帧内存
del frame
del cropped_frame
if frame_count % 60 == 0: # 每60帧记录一次
# 定期强制垃圾回收
gc.collect()
except Exception as e:
logger.error(f'帧队列处理失败: {e}')
except Exception as e:
# logger.error(f'监控视频推流异常: {e}')
if self.socketio:
self.socketio.emit('video_status', {'status': 'error', 'message': f'推流异常: {str(e)}'})
finally:
if 'cap' in locals():
cap.release()
self.video_running = False
def start_video_stream(self):
"""启动视频监控推流"""
try:
if self.video_thread and self.video_thread.is_alive():
logger.warning('视频监控线程已在运行')
return {'status': 'already_running', 'message': '视频监控已在运行'}
t_start = time.time()
logger.info(f'[TIMING] 准备启动视频监控线程,设备号: {self.device_index} - {datetime.now().strftime("%H:%M:%S.%f")[:-3]}')
self.video_thread = threading.Thread(target=self.generate_video_frames, name='VideoStreamThread')
self.video_thread.daemon = True
self.video_thread.start()
self.video_running = True
# logger.info(f'[TIMING] 视频监控线程创建完成,耗时: {(time.time()-t_start)*1000:.2f}ms')
return {'status': 'started', 'message': '视频监控线程已启动'}
except Exception as e:
logger.error(f'[TIMING] 视频监控线程启动失败: {e}')
return {'status': 'error', 'message': f'视频监控线程启动失败: {str(e)}'}
def stop_video_stream(self):
"""停止视频监控推流"""
try:
self.video_running = False
logger.info('视频监控推流已停止')
return {'status': 'stopped', 'message': '视频监控推流已停止'}
except Exception as e:
logger.error(f'停止视频监控推流失败: {e}')
return {'status': 'error', 'message': f'停止失败: {str(e)}'}
def is_streaming(self):
"""检查是否正在推流"""
return self.video_running
def get_stream_status(self):
"""获取推流状态"""
return {
'running': self.video_running,
'device_index': self.device_index,
'thread_alive': self.video_thread.is_alive() if self.video_thread else False
}
def _collect_head_pose_data(self) -> Dict[str, Any]:
"""采集头部姿态数据从IMU设备获取"""
try:
# 模拟IMU头部姿态数据
head_pose = {
'roll': np.random.uniform(-30, 30),
'pitch': np.random.uniform(-30, 30),
'yaw': np.random.uniform(-180, 180),
'acceleration': {
'x': np.random.uniform(-2, 2),
'y': np.random.uniform(-2, 2),
'z': np.random.uniform(8, 12)
},
'gyroscope': {
'x': np.random.uniform(-5, 5),
'y': np.random.uniform(-5, 5),
'z': np.random.uniform(-5, 5)
},
'timestamp': datetime.now().isoformat()
}
return head_pose
except Exception as e:
logger.error(f'头部姿态数据采集失败: {e}')
return None
def _collect_body_pose_data(self) -> Dict[str, Any]:
"""采集身体姿态数据从FemtoBolt深度相机获取"""
try:
# 模拟身体姿态关键点数据
body_pose = {
'keypoints': {
'head': {'x': 320, 'y': 100, 'confidence': 0.95},
'neck': {'x': 320, 'y': 150, 'confidence': 0.92},
'left_shoulder': {'x': 280, 'y': 180, 'confidence': 0.88},
'right_shoulder': {'x': 360, 'y': 180, 'confidence': 0.90},
'left_elbow': {'x': 250, 'y': 220, 'confidence': 0.85},
'right_elbow': {'x': 390, 'y': 220, 'confidence': 0.87},
'left_wrist': {'x': 220, 'y': 260, 'confidence': 0.82},
'right_wrist': {'x': 420, 'y': 260, 'confidence': 0.84},
'spine': {'x': 320, 'y': 250, 'confidence': 0.93},
'left_hip': {'x': 300, 'y': 350, 'confidence': 0.89},
'right_hip': {'x': 340, 'y': 350, 'confidence': 0.91},
'left_knee': {'x': 290, 'y': 450, 'confidence': 0.86},
'right_knee': {'x': 350, 'y': 450, 'confidence': 0.88},
'left_ankle': {'x': 285, 'y': 550, 'confidence': 0.83},
'right_ankle': {'x': 355, 'y': 550, 'confidence': 0.85}
},
'balance_score': np.random.uniform(0.6, 1.0),
'center_of_mass': {'x': 320, 'y': 350},
'timestamp': datetime.now().isoformat()
}
return body_pose
except Exception as e:
logger.error(f'身体姿态数据采集失败: {e}')
return None
def _capture_body_image(self, data_dir: Path, device_manager) -> Optional[str]:
"""采集身体视频截图从FemtoBolt深度相机获取"""
try:
image = None
# 检查是否有device_manager实例且FemtoBolt深度相机可用
if (device_manager is not None and
FEMTOBOLT_AVAILABLE and
hasattr(device_manager, 'femtobolt_camera') and
device_manager.femtobolt_camera is not None):
# 从FemtoBolt深度相机获取真实图像
logger.info('正在从FemtoBolt深度相机获取身体图像...')
capture = device_manager.femtobolt_camera.update()
if capture is not None:
# 获取深度图像
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 读取config.ini中的深度范围配置
import configparser
config = configparser.ConfigParser()
config.read('config.ini')
try:
depth_range_min = int(config.get('DEFAULT', 'femtobolt_depth_range_min', fallback='1400'))
depth_range_max = int(config.get('DEFAULT', 'femtobolt_depth_range_max', fallback='1900'))
except Exception:
depth_range_min = None
depth_range_max = None
# 优化深度图彩色映射范围外用黑色区间内用Jet模型从蓝色到黄色到红色渐变
if depth_range_min is not None and depth_range_max is not None:
# 归一化深度值到0-255范围
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
depth_normalized = ((depth_normalized - depth_range_min) / (depth_range_max - depth_range_min) * 255).astype(np.uint8)
# 应用OpenCV的COLORMAP_JET进行伪彩色映射
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 范围外用黑色
mask_outside = (depth_image < depth_range_min) | (depth_image > depth_range_max)
depth_colored[mask_outside] = [0, 0, 0] # BGR黑色
else:
# 如果没有配置,使用默认伪彩色映射
depth_colored = cv2.convertScaleAbs(depth_image, alpha=0.03)
depth_colored = cv2.applyColorMap(depth_colored, cv2.COLORMAP_JET)
# 转换颜色格式(如果需要)
if len(depth_colored.shape) == 3 and depth_colored.shape[2] == 4:
depth_colored = cv2.cvtColor(depth_colored, cv2.COLOR_BGRA2BGR)
elif len(depth_colored.shape) == 3 and depth_colored.shape[2] == 3:
pass
# 预处理裁剪成宽460高819保持高度不裁剪宽度从中间裁剪
height, width = depth_colored.shape[:2]
target_width = 460
target_height = 819
# 计算宽度裁剪起点
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
cropped_image = depth_colored[:, left:right]
else:
cropped_image = depth_colored
# 如果高度不足target_height进行上下填充黑边
cropped_height = cropped_image.shape[0]
if cropped_height < target_height:
pad_top = (target_height - cropped_height) // 2
pad_bottom = target_height - cropped_height - pad_top
cropped_image = cv2.copyMakeBorder(cropped_image, pad_top, pad_bottom, 0, 0, cv2.BORDER_CONSTANT, value=[0,0,0])
elif cropped_height > target_height:
# 如果高度超过target_height裁剪高度中间部分
top = (cropped_height - target_height) // 2
cropped_image = cropped_image[top:top+target_height, :]
# 最终调整大小保持宽460高819
image = cv2.resize(cropped_image, (target_width, target_height))
logger.info(f'成功获取FemtoBolt深度图像尺寸: {image.shape}')
else:
logger.warning('无法从FemtoBolt获取深度图像使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'FemtoBolt Unavailable', (75, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
else:
logger.warning('FemtoBolt capture为None使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'Capture Failed', (120, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
else:
logger.warning('FemtoBolt深度相机不可用使用模拟图像')
# 使用模拟图像作为备用
image = np.zeros((819, 460, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (410, 769), (0, 255, 0), 2)
cv2.putText(image, 'Camera Not Available', (60, 400), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# 保存图片
image_path = data_dir / 'body_image.jpg'
cv2.imwrite(str(image_path), image)
logger.info(f'身体图像已保存到: {image_path}')
return image_path
except Exception as e:
logger.error(f'身体截图保存失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Dict[str, Any]:
"""采集足部压力数据(从压力传感器获取)"""
try:
# 模拟压力传感器数据
pressure_data = {
'left_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'right_foot': {
'heel': np.random.uniform(0, 100),
'arch': np.random.uniform(0, 50),
'ball': np.random.uniform(0, 80),
'toes': np.random.uniform(0, 60),
'total_pressure': 0
},
'balance_ratio': 0,
'timestamp': datetime.now().isoformat()
}
# 计算总压力和平衡比例
left_total = sum(pressure_data['left_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
right_total = sum(pressure_data['right_foot'][key] for key in ['heel', 'arch', 'ball', 'toes'])
pressure_data['left_foot']['total_pressure'] = left_total
pressure_data['right_foot']['total_pressure'] = right_total
if left_total + right_total > 0:
pressure_data['balance_ratio'] = left_total / (left_total + right_total)
return pressure_data
except Exception as e:
logger.error(f'足部压力数据采集失败: {e}')
return None
def _capture_foot_image(self, data_dir: Path, device_manager=None) -> Optional[str]:
"""采集足部监测视频截图(从全局缓存获取)"""
try:
image = None
# 直接使用self获取缓存帧
logger.info('正在从全局缓存获取最新图像...')
# 从全局缓存获取最新帧
frame, frame_timestamp = device_manager._get_latest_frame_from_cache('camera')
#frame, frame_count = self.frame_queue.get(timeout=1)
if frame is not None:
# 使用缓存中的图像
image = frame.copy() # 复制帧数据避免引用问题
current_time = time.time()
frame_age = current_time - frame_timestamp if frame_timestamp else 0
logger.info(f'成功获取缓存图像,尺寸: {image.shape},帧龄: {frame_age:.2f}')
else:
logger.warning('缓存中无可用图像,使用模拟图像')
image = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (590, 430), (0, 255, 0), 2)
cv2.putText(image, 'No Cached Frame', (120, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
# 保存图片
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), image)
logger.info(f'足部图像已保存到: {image_path}')
return image_path
except Exception as e:
logger.error(f'足部截图保存失败: {e}')
# 即使出错也要保存一个模拟图像
try:
image = np.zeros((480, 640, 3), dtype=np.uint8)
cv2.rectangle(image, (50, 50), (590, 430), (255, 0, 0), 2)
cv2.putText(image, 'Error Occurred', (180, 250), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
image_path = data_dir / 'foot_image.jpg'
cv2.imwrite(str(image_path), image)
return image_path
except Exception:
return None
def _generate_foot_pressure_image(self, data_dir: Path) -> Optional[str]:
"""生成足底压力数据图(从压力传感器数据生成)"""
try:
# 创建压力分布热力图
fig_width, fig_height = 400, 600
pressure_map = np.zeros((fig_height, fig_width, 3), dtype=np.uint8)
# 模拟左脚压力分布
left_foot_x = fig_width // 4
left_foot_y = fig_height // 2
# 模拟右脚压力分布
right_foot_x = 3 * fig_width // 4
right_foot_y = fig_height // 2
# 绘制压力点(用不同颜色表示压力大小)
for i in range(20):
x = np.random.randint(left_foot_x - 50, left_foot_x + 50)
y = np.random.randint(left_foot_y - 100, left_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
x = np.random.randint(right_foot_x - 50, right_foot_x + 50)
y = np.random.randint(right_foot_y - 100, right_foot_y + 100)
pressure = np.random.randint(0, 255)
cv2.circle(pressure_map, (x, y), 5, (0, pressure, 255 - pressure), -1)
# 保存图片
image_path = data_dir / 'foot_data_image.jpg'
cv2.imwrite(str(image_path), pressure_map)
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'足底压力数据图生成失败: {e}')
return None
def _save_screen_image(self, data_dir: Path, screen_image_base64: str) -> Optional[str]:
"""保存屏幕录制截图从前端传入的base64数据"""
try:
# 解码base64数据
if screen_image_base64.startswith('data:image/'):
# 移除data:image/jpeg;base64,前缀
base64_data = screen_image_base64.split(',')[1]
else:
base64_data = screen_image_base64
# 解码并保存图片
image_data = base64.b64decode(base64_data)
image_path = data_dir / 'screen_image.jpg'
with open(image_path, 'wb') as f:
f.write(image_data)
return str(image_path.relative_to(Path.cwd()))
except Exception as e:
logger.error(f'屏幕截图保存失败: {e}')
return None