#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 综合录制管理器 支持屏幕录制和足部视频录制 """ import cv2 import numpy as np import pyautogui import threading import time from datetime import datetime import os import logging import json import base64 from pathlib import Path from typing import Optional, Dict, Any, List import sys import psutil import gc try: from .camera_manager import CameraManager from .femtobolt_manager import FemtoBoltManager from .pressure_manager import PressureManager except ImportError: from camera_manager import CameraManager from femtobolt_manager import FemtoBoltManager from pressure_manager import PressureManager class RecordingManager: def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None, femtobolt_manager: Optional[FemtoBoltManager] = None, pressure_manager: Optional[PressureManager] = None): """ 初始化录制管理器 Args: camera_manager: 相机管理器实例 db_manager: 数据库管理器实例 femtobolt_manager: FemtoBolt深度相机管理器实例 pressure_manager: 压力传感器管理器实例 """ self.camera_manager = camera_manager self.db_manager = db_manager self.femtobolt_manager = femtobolt_manager self.pressure_manager = pressure_manager # 录制状态 self.sync_recording = False self.is_recording = False self.recording_stop_event = threading.Event() # 会话信息 self.current_session_id = None self.current_patient_id = None self.recording_start_time = None # 视频写入器 self.feet_video_writer = None self.screen_video_writer = None self.femtobolt_video_writer = None # 录制线程 self.feet_recording_thread = None self.screen_recording_thread = None self.femtobolt_recording_thread = None # 屏幕录制参数 self.screen_fps = 25 # 与VideoWriter的fps保持一致 self.screen_region = None self.camera_region = None self.femtobolt_region = None # 屏幕尺寸 self.screen_size = pyautogui.size() # 输出目录 self.screen_output_dir = None self.camera_output_dir = None self.femtobolt_output_dir = None # 视频参数 self.MAX_FRAME_SIZE = (1280, 720) # 最大帧尺寸 # CPU监控和性能优化参数 self.cpu_threshold = 80.0 # CPU使用率阈值 self.memory_threshold = 85.0 # 内存使用率阈值 self.adaptive_fps = True # 是否启用自适应帧率 self.min_fps = 10 # 最小帧率 self.max_fps = 30 # 最大帧率 self.current_fps = self.screen_fps # 当前动态帧率 self.performance_check_interval = 30 # 性能检查间隔(帧数) self.frame_skip_count = 0 # 跳帧计数 self.last_performance_check = 0 # 上次性能检查时间 # 日志 self.logger = logging.getLogger(__name__) self.logger.info("录制管理器初始化完成") def _check_system_performance(self): """ 检查系统性能指标 Returns: Dict: 包含CPU和内存使用率的字典 """ try: cpu_percent = psutil.cpu_percent(interval=0.1) memory_info = psutil.virtual_memory() memory_percent = memory_info.percent return { 'cpu_percent': cpu_percent, 'memory_percent': memory_percent, 'available_memory_mb': memory_info.available / (1024 * 1024) } except Exception as e: self.logger.warning(f"性能检查失败: {e}") return {'cpu_percent': 0, 'memory_percent': 0, 'available_memory_mb': 0} def _adjust_recording_performance(self, performance_data): """ 根据系统性能调整录制参数 Args: performance_data: 性能数据字典 """ if not self.adaptive_fps: return cpu_percent = performance_data.get('cpu_percent', 0) memory_percent = performance_data.get('memory_percent', 0) # 根据CPU使用率调整帧率 if cpu_percent > self.cpu_threshold: # CPU使用率过高,降低帧率 self.current_fps = max(self.min_fps, self.current_fps - 2) self.frame_skip_count += 1 self.logger.warning(f"CPU使用率过高({cpu_percent:.1f}%),降低帧率至{self.current_fps}fps") elif cpu_percent < self.cpu_threshold - 20 and self.current_fps < self.max_fps: # CPU使用率较低,可以适当提高帧率 self.current_fps = min(self.max_fps, self.current_fps + 1) self.logger.info(f"CPU使用率正常({cpu_percent:.1f}%),提高帧率至{self.current_fps}fps") # 内存使用率过高时强制垃圾回收 if memory_percent > self.memory_threshold: gc.collect() self.logger.warning(f"内存使用率过高({memory_percent:.1f}%),执行垃圾回收") def start_recording(self, session_id: str, patient_id: str, screen_location: List[int], camera_location: List[int], femtobolt_location: List[int], recording_types: List[str] = None) -> Dict[str, Any]: """ 启动同步录制 Args: session_id: 检测会话ID patient_id: 患者ID screen_location: 屏幕录制区域 [x, y, w, h] camera_location: 相机录制区域 [x, y, w, h] femtobolt_location: FemtoBolt录制区域 [x, y, w, h] recording_types: 录制类型列表 ['screen', 'feet', 'femtobolt'],默认全部录制 Returns: Dict: 录制启动状态和信息 """ result = { 'success': False, 'session_id': session_id, 'patient_id': patient_id, 'recording_start_time': None, 'video_paths': { 'feet_video': None, 'screen_video': None, 'femtobolt_video': None }, 'message': '' } try: # 检查是否已在录制 if self.sync_recording: result['message'] = f'已在录制中,当前会话ID: {self.current_session_id}' return result # 设置默认录制类型 recording_types = ['screen', 'feet', 'femtobolt'] # recording_types = ['screen'] # 验证录制区域参数(仅对启用的录制类型进行验证) if 'screen' in recording_types: if not screen_location or not isinstance(screen_location, list) or len(screen_location) != 4: result['success'] = False result['message'] = '屏幕录制区域参数无效或缺失,必须是包含4个元素的数组[x, y, w, h]' return result if 'feet' in recording_types: if not camera_location or not isinstance(camera_location, list) or len(camera_location) != 4: result['success'] = False result['message'] = '相机录制区域参数无效或缺失,必须是包含4个元素的数组[x, y, w, h]' return result if 'femtobolt' in recording_types: if not femtobolt_location or not isinstance(femtobolt_location, list) or len(femtobolt_location) != 4: result['success'] = False result['message'] = 'FemtoBolt录制区域参数无效或缺失,必须是包含4个元素的数组[x, y, w, h]' return result # 设置录制参数 self.current_session_id = session_id self.current_patient_id = patient_id self.screen_region = tuple(screen_location) # [x, y, w, h] -> (x, y, w, h) self.camera_region = tuple(camera_location) # [x, y, w, h] -> (x, y, w, h) self.femtobolt_region = tuple(femtobolt_location) # [x, y, w, h] -> (x, y, w, h) self.recording_start_time = datetime.now() data_base_path = os.path.join('data', 'patients', patient_id, session_id) # 创建主存储目录 if getattr(sys, 'frozen', False): # 打包后的exe文件路径 exe_dir = os.path.dirname(sys.executable) base_path = os.path.join(exe_dir, 'data', 'patients', patient_id, session_id) else: base_path = os.path.join('data', 'patients', patient_id, session_id) try: # 设置目录权限 self._set_directory_permissions(base_path) os.makedirs(base_path, exist_ok=True) except Exception as dir_error: self.logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}') result['success'] = False result['message'] = f'创建录制目录失败: {dir_error}' return result # 定义视频文件路径 feet_video_path = os.path.join(base_path, 'feet.mp4') screen_video_path = os.path.join(base_path, 'screen.mp4') femtobolt_video_path = os.path.join(base_path, 'femtobolt.mp4') result['video_paths']['feet_video'] = feet_video_path result['video_paths']['screen_video'] = screen_video_path result['video_paths']['femtobolt_video'] = femtobolt_video_path # 准备数据库更新信息,返回给调用方统一处理 result['database_updates'] = { 'session_id': session_id, 'status': 'recording', 'video_paths': { 'normal_video_path': os.path.join(base_path, 'feet.mp4'), 'screen_video_path': os.path.join(base_path, 'screen.mp4'), 'femtobolt_video_path': os.path.join(base_path, 'femtobolt.mp4') } } self.logger.debug(f'数据库更新信息已准备 - 会话ID: {session_id}') # 视频编码参数 - 使用更兼容的编解码器 # 尝试多种编解码器以确保兼容性 try: fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 更兼容的编解码器 except: try: fourcc = cv2.VideoWriter_fourcc(*'XVID') # 备选编解码器 except: fourcc = cv2.VideoWriter_fourcc(*'MJPG') # 最后备选 fps = 25 #正常帧率 # 根据录制类型选择性地初始化视频写入器 self.screen_video_writer = None self.femtobolt_video_writer = None self.feet_video_writer = None if 'screen' in recording_types: self.screen_video_writer = cv2.VideoWriter( screen_video_path, fourcc, fps, (self.screen_region[2], self.screen_region[3]) ) if 'femtobolt' in recording_types: self.femtobolt_video_writer = cv2.VideoWriter( femtobolt_video_path, fourcc, fps, (self.femtobolt_region[2], self.femtobolt_region[3]) ) if 'feet' in recording_types: self.feet_video_writer = cv2.VideoWriter( feet_video_path, fourcc, fps, (self.camera_region[2], self.camera_region[3]) ) # 检查视频写入器状态(仅检查启用的录制类型) # 检查足部视频写入器 if 'feet' in recording_types: if self.feet_video_writer and self.feet_video_writer.isOpened(): self.logger.info(f'足部视频写入器初始化成功: {feet_video_path}') else: self.logger.error(f'足部视频写入器初始化失败: {feet_video_path}') else: self.logger.info('足部录制功能已禁用') # 检查屏幕视频写入器 if 'screen' in recording_types: if self.screen_video_writer and self.screen_video_writer.isOpened(): self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}') else: self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}') else: self.logger.info('屏幕录制功能已禁用') # 检查FemtoBolt视频写入器 if 'femtobolt' in recording_types: if self.femtobolt_video_writer and self.femtobolt_video_writer.isOpened(): self.logger.info(f'FemtoBolt视频写入器初始化成功: {femtobolt_video_path}') else: self.logger.error(f'FemtoBolt视频写入器初始化失败: {femtobolt_video_path}') else: self.logger.info('FemtoBolt录制功能已禁用') # 重置停止事件 self.recording_stop_event.clear() self.sync_recording = True # 根据录制类型启动对应的录制线程 if 'feet' in recording_types and self.feet_video_writer and self.feet_video_writer.isOpened(): self.feet_recording_thread = threading.Thread( target=self._generic_recording_thread, args=('camera', self.camera_region, feet_video_path, self.feet_video_writer), daemon=True, name='FeetRecordingThread' ) self.feet_recording_thread.start() self.logger.info(f'足部录制线程已启动 - 区域: {self.camera_region}, 输出文件: {feet_video_path}') if 'screen' in recording_types and self.screen_video_writer and self.screen_video_writer.isOpened(): self.screen_recording_thread = threading.Thread( target=self._generic_recording_thread, args=('screen', self.screen_region, screen_video_path, self.screen_video_writer), daemon=True, name='ScreenRecordingThread' ) self.screen_recording_thread.start() self.logger.info(f'屏幕录制线程已启动 - 区域: {self.screen_region}, 输出文件: {screen_video_path}') if 'femtobolt' in recording_types and self.femtobolt_video_writer and self.femtobolt_video_writer.isOpened(): self.femtobolt_recording_thread = threading.Thread( target=self._generic_recording_thread, args=('femtobolt', self.femtobolt_region, femtobolt_video_path, self.femtobolt_video_writer), daemon=True, name='FemtoBoltRecordingThread' ) self.femtobolt_recording_thread.start() self.logger.info(f'FemtoBolt录制线程已启动 - 区域: {self.femtobolt_region}, 输出文件: {femtobolt_video_path}') result['success'] = True result['recording_start_time'] = self.recording_start_time.isoformat() result['message'] = '同步录制已启动' self.logger.info(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}') except Exception as e: self.logger.error(f'启动同步录制失败: {e}') result['message'] = f'启动录制失败: {str(e)}' # 清理已创建的写入器 self._cleanup_video_writers() return result def stop_recording(self, session_id: str = None) -> Dict[str, Any]: """ 停止录制 Args: session_id: 会话ID,用于验证是否为当前录制会话 Returns: Dict: 停止录制的结果 """ result = { 'success': False, 'session_id': self.current_session_id, 'message': '' } try: # 验证会话ID if session_id and session_id != self.current_session_id: result['message'] = f'会话ID不匹配: 期望 {self.current_session_id}, 收到 {session_id}' return result if not self.sync_recording: result['message'] = '当前没有进行录制' return result # 设置停止标志 self.sync_recording = False self.recording_stop_event.set() # 等待录制线程结束 if hasattr(self, 'feet_recording_thread') and self.feet_recording_thread and self.feet_recording_thread.is_alive(): self.feet_recording_thread.join(timeout=5.0) if hasattr(self, 'screen_recording_thread') and self.screen_recording_thread and self.screen_recording_thread.is_alive(): self.screen_recording_thread.join(timeout=5.0) if hasattr(self, 'femtobolt_recording_thread') and self.femtobolt_recording_thread and self.femtobolt_recording_thread.is_alive(): self.femtobolt_recording_thread.join(timeout=5.0) # 清理视频写入器 self._cleanup_video_writers() # 准备数据库更新信息,返回给调用方统一处理 if self.current_session_id: result['database_updates'] = { 'session_id': self.current_session_id, 'status': 'completed' } self.logger.info(f'数据库更新信息已准备 - 会话ID: {self.current_session_id}') result['success'] = True result['message'] = '录制已停止' self.logger.info(f'录制已停止 - 会话ID: {self.current_session_id}') # 重置会话信息 self.current_session_id = None self.current_patient_id = None self.recording_start_time = None except Exception as e: self.logger.error(f'停止录制失败: {e}') result['message'] = f'停止录制失败: {str(e)}' return result def _generic_recording_thread(self, recording_type, region, output_file_name, video_writer): """ 通用录制线程,支持屏幕、相机和FemtoBolt录制 Args: recording_type: 录制类型 ('screen', 'camera', 'femtobolt') region: 录制区域 (x, y, width, height) output_file_name: 输出文件名 video_writer: 视频写入器对象 """ try: self.logger.info(f'{recording_type}录制线程启动 - 区域: {region}, 输出文件: {output_file_name}') frame_count = 0 # 使用当前动态帧率,支持自适应帧率调整 target_fps = self.current_fps frame_interval = 1.0 / target_fps last_frame_time = time.time() if not video_writer or not video_writer.isOpened(): self.logger.error(f'{recording_type}视频写入器初始化失败: {output_file_name}') return # 验证并解包region参数 if not region or len(region) != 4: self.logger.error(f'{recording_type}录制区域参数无效: {region}') return x, y, w, h = region self.logger.info(f'{recording_type}录制区域解包成功: x={x}, y={y}, w={w}, h={h}') while self.sync_recording and not self.recording_stop_event.is_set(): try: current_time = time.time() # 定期检查系统性能并调整录制参数 if frame_count % self.performance_check_interval == 0 and frame_count > 0: performance_data = self._check_system_performance() self._adjust_recording_performance(performance_data) # 更新帧率间隔 target_fps = self.current_fps frame_interval = 1.0 / target_fps self.logger.debug(f'{recording_type}性能检查完成,当前帧率: {target_fps}fps') # 控制帧率 if current_time - last_frame_time < frame_interval: time.sleep(0.001) continue frame = None # 获取帧数据 - 从屏幕截图生成 screenshot = pyautogui.screenshot(region=(x, y, w, h)) frame = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR) frame = cv2.resize(frame, (w, h)) # 写入视频帧 if frame is not None: video_writer.write(frame) frame_count += 1 else: # 如果没有获取到帧,短暂等待 time.sleep(0.01) last_frame_time = current_time except Exception as e: self.logger.error(f'{recording_type}录制线程错误: {e}') time.sleep(0.1) self.logger.info(f'{recording_type}录制线程结束,总帧数: {frame_count}') except Exception as e: self.logger.error(f'{recording_type}录制线程异常: {e}') finally: # 清理资源 if video_writer: try: video_writer.release() self.logger.info(f'{recording_type}视频写入器已释放') except Exception as e: self.logger.error(f'释放{recording_type}视频写入器失败: {e}') def _cleanup_video_writers(self): """清理视频写入器""" try: if hasattr(self, 'feet_video_writer') and self.feet_video_writer: self.feet_video_writer.release() self.feet_video_writer = None self.logger.debug("足部视频写入器已清理") if hasattr(self, 'screen_video_writer') and self.screen_video_writer: self.screen_video_writer.release() self.screen_video_writer = None self.logger.debug("屏幕视频写入器已清理") if hasattr(self, 'femtobolt_video_writer') and self.femtobolt_video_writer: self.femtobolt_video_writer.release() self.femtobolt_video_writer = None self.logger.debug("FemtoBolt视频写入器已清理") except Exception as e: self.logger.error(f"清理视频写入器失败: {e}") def _set_directory_permissions(self, path): """设置目录权限""" try: import subprocess import platform if platform.system() == 'Windows': try: # 为Users用户组授予完全控制权限 subprocess.run([ 'icacls', path, '/grant', 'Users:(OI)(CI)F' ], check=True, capture_output=True, text=True) # 为Everyone用户组授予完全控制权限 subprocess.run([ 'icacls', path, '/grant', 'Everyone:(OI)(CI)F' ], check=True, capture_output=True, text=True) self.logger.info(f"已设置Windows目录权限(Users和Everyone完全控制): {path}") except subprocess.CalledProcessError as icacls_error: self.logger.warning(f"Windows权限设置失败: {icacls_error}") else: self.logger.info(f"已设置目录权限为777: {path}") except Exception as perm_error: self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功") def set_screen_region(self, region): """设置屏幕录制区域""" if self.sync_recording: self.logger.warning("录制进行中,无法更改区域设置") return False self.screen_region = region if self.screen_region: x, y, width, height = self.screen_region # 确保区域在屏幕范围内 x = max(0, min(x, self.screen_size[0] - 1)) y = max(0, min(y, self.screen_size[1] - 1)) width = min(width, self.screen_size[0] - x) height = min(height, self.screen_size[1] - y) self.screen_region = (x, y, width, height) self.logger.info(f"录制区域已设置: {self.screen_region}") else: self.logger.info("录制模式已设置: 全屏录制") return True def set_recording_regions(self, screen_region=None, camera_region=None, femtobolt_region=None): """ 设置三个录制区域 Args: screen_region: 屏幕录制区域 (x, y, width, height) camera_region: 相机录制区域 (x, y, width, height) femtobolt_region: FemtoBolt录制区域 (x, y, width, height) """ if self.sync_recording: self.logger.warning("录制进行中,无法更改区域设置") return False self.screen_region = screen_region self.camera_region = camera_region self.femtobolt_region = femtobolt_region self.logger.info(f'录制区域已设置:') self.logger.info(f' 屏幕区域: {screen_region}') self.logger.info(f' 相机区域: {camera_region}') self.logger.info(f' FemtoBolt区域: {femtobolt_region}') return True def get_status(self): """获取录制状态""" return { 'recording': self.sync_recording, 'session_id': self.current_session_id, 'patient_id': self.current_patient_id, 'recording_start_time': self.recording_start_time.isoformat() if self.recording_start_time else None, 'screen_size': self.screen_size, 'screen_region': self.screen_region, 'screen_fps': self.screen_fps, 'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False, 'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False } def collect_detection_data(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]: """ 保存前端传入的检测数据和图片 Args: session_id: 检测会话ID patient_id: 患者ID detection_data: 前端传入的检测数据,包含base64格式的图片数据 Returns: Dict: 包含所有采集数据的字典,符合detection_data表结构 """ # 生成采集时间戳 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒 if getattr(sys, 'frozen', False): # 打包后的exe文件路径 exe_dir = os.path.dirname(sys.executable) data_dir = Path(os.path.join(exe_dir, 'data', 'patients', patient_id, session_id, timestamp)) else: data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}') # 创建数据存储目录 data_dir.mkdir(parents=True, exist_ok=True) # 设置目录权限为777(完全权限) try: import stat os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限 self.logger.debug(f"已设置目录权限为777: {data_dir}") except Exception as perm_error: self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功") # 初始化数据字典 data = { 'session_id': session_id, 'head_pose': detection_data.get('head_pose'), 'body_pose': None, 'body_image': None, 'foot_data': detection_data.get('foot_data'), 'foot_data_image': None, 'foot_image': None, 'screen_image': None, 'timestamp': timestamp } try: # 保存图片数据 image_fields = [ ('body_image', 'body'), ('foot_image', 'foot'), ('foot_data_image', 'foot_data'), ('screen_image', 'screen') ] for field, prefix in image_fields: base64_data = detection_data.get(field) if base64_data: try: # 移除base64头部信息 if ';base64,' in base64_data: base64_data = base64_data.split(';base64,')[1] # 解码base64数据 image_data = base64.b64decode(base64_data) # 生成图片文件名 filename = f'{prefix}_{timestamp}.jpg' file_path = data_dir / filename # 保存图片 with open(file_path, 'wb') as f: f.write(image_data) # 更新数据字典中的图片路径 data[field] = str(os.path.join('data', 'patients', patient_id, session_id, timestamp, filename)) self.logger.debug(f'{field}保存成功: {filename}') except Exception as e: self.logger.error(f'保存{field}失败: {e}') self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}') except Exception as e: self.logger.error(f'数据保存失败: {e}') return data def _collect_body_pose_data(self) -> Optional[Dict[str, Any]]: """ 从FemtoBolt深度相机采集身体姿态数据 Returns: Dict: 身体姿态数据字典 """ try: if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_pose_data'): pose_data = self.femtobolt_manager.get_pose_data() return pose_data else: self.logger.warning('FemtoBolt管理器未连接或不支持姿态数据采集') return None except Exception as e: self.logger.error(f'采集身体姿态数据失败: {e}') return None def _capture_body_image(self, data_dir) -> Optional[str]: """ 从FemtoBolt深度相机采集身体截图 Args: data_dir: 数据存储目录 Returns: str: 身体截图文件的相对路径 """ try: if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_latest_frame'): frame = self.femtobolt_manager.get_latest_frame() if frame is not None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] filename = f'body_{timestamp}.jpg' file_path = data_dir / filename # 保存图像 cv2.imwrite(str(file_path), frame) # 返回相对路径 return str(filename) else: self.logger.warning('FemtoBolt相机未获取到有效帧') return None else: self.logger.warning('FemtoBolt管理器未连接或不支持图像采集') return None except Exception as e: self.logger.error(f'采集身体截图失败: {e}') return None def _collect_foot_pressure_data(self) -> Optional[Dict[str, Any]]: """ 从压力传感器采集足部压力数据 Returns: Dict: 足部压力数据字典 """ try: if self.pressure_manager and hasattr(self.pressure_manager, 'get_pressure_data'): pressure_data = self.pressure_manager.get_pressure_data() return pressure_data else: self.logger.warning('压力传感器管理器未连接或不支持压力数据采集') return None except Exception as e: self.logger.error(f'采集足部压力数据失败: {e}') return None def _generate_foot_pressure_image(self, data_dir) -> Optional[str]: """ 生成足底压力数据图 Args: data_dir: 数据存储目录 Returns: str: 足底压力数据图文件的相对路径 """ try: if self.pressure_manager and hasattr(self.pressure_manager, 'generate_pressure_heatmap'): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] filename = f'foot_pressure_{timestamp}.jpg' file_path = data_dir / filename # 生成压力热力图 success = self.pressure_manager.generate_pressure_heatmap(str(file_path)) if success and file_path.exists(): # 返回相对路径 return str(file_path.relative_to(Path.cwd())) else: self.logger.warning('足底压力数据图生成失败') return None else: self.logger.warning('压力传感器管理器未连接或不支持压力图生成') return None except Exception as e: self.logger.error(f'生成足底压力数据图失败: {e}') return None def _capture_screen_image(self, data_dir) -> Optional[str]: """ 采集屏幕截图 Args: data_dir: 数据存储目录路径 Returns: str: 截图文件的相对路径,失败返回None """ try: # 截取屏幕 if self.screen_size: width, height = self.screen_size screenshot = pyautogui.screenshot(region=(0, 0, width, height)) else: screenshot = pyautogui.screenshot() # 保存截图 from pathlib import Path image_path = Path(data_dir) / 'screen_image.png' screenshot.save(str(image_path)) # # 返回相对路径 # abs_image_path = image_path.resolve() # abs_cwd = Path.cwd().resolve() # relative_path = abs_image_path.relative_to(abs_cwd) return str('screen_image.png') except Exception as e: self.logger.error(f'屏幕截图失败: {e}') return None def _capture_foot_image(self, data_dir) -> Optional[str]: """ 采集足部视频截图 Args: data_dir: 数据存储目录路径 Returns: str: 截图文件的相对路径,失败返回None """ try: if not self.camera_manager or not self.camera_manager.is_connected: self.logger.warning('相机设备未连接,无法采集足部截图') return None # 从相机管理器获取最新帧 frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera') if frame is None: self.logger.warning('无法从相机获取帧数据') return None # 调整帧尺寸 resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE) # 保存截图 from pathlib import Path image_path = Path(data_dir) / 'foot_image.png' cv2.imwrite(str(image_path), resized_frame) # 返回相对路径 # abs_image_path = image_path.resolve() # abs_cwd = Path.cwd().resolve() # relative_path = abs_image_path.relative_to(abs_cwd) return str(image_path) except Exception as e: self.logger.error(f'足部截图失败: {e}') return None # 保持向后兼容的ScreenRecorder类 class ScreenRecorder: def __init__(self, output_dir="recordings", fps=20, quality=80, region=None): """向后兼容的屏幕录制器""" self.recording_manager = RecordingManager() self.recording_manager.screen_fps = fps self.recording_manager.set_screen_region(region) self.output_dir = output_dir # 创建输出目录 if not os.path.exists(output_dir): os.makedirs(output_dir) def start_recording(self, filename=None): """开始录制""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"screen_record_{timestamp}" # 使用文件名作为会话ID session_id = filename patient_id = "default" return self.recording_manager.start_recording(session_id, patient_id) def stop_recording(self): """停止录制""" return self.recording_manager.stop_recording() def get_status(self): """获取状态""" return self.recording_manager.get_status()