BodyBalanceEvaluation/backend/devices/screen_recorder.py

851 lines
35 KiB
Python
Raw Normal View History

2025-08-20 08:54:36 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
2025-08-20 10:30:51 +08:00
综合录制管理器
支持屏幕录制和足部视频录制
2025-08-20 08:54:36 +08:00
"""
import cv2
import numpy as np
import pyautogui
import threading
import time
from datetime import datetime
import os
2025-08-20 10:30:51 +08:00
import logging
2025-08-20 16:04:38 +08:00
import json
import base64
from pathlib import Path
2025-08-20 10:30:51 +08:00
from typing import Optional, Dict, Any
2025-08-20 17:16:37 +08:00
import sys
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
try:
from .camera_manager import CameraManager
2025-08-20 16:04:38 +08:00
from .femtobolt_manager import FemtoBoltManager
from .pressure_manager import PressureManager
2025-08-20 10:30:51 +08:00
except ImportError:
from camera_manager import CameraManager
2025-08-20 16:04:38 +08:00
from femtobolt_manager import FemtoBoltManager
from pressure_manager import PressureManager
2025-08-20 10:30:51 +08:00
class RecordingManager:
2025-08-20 16:04:38 +08:00
def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None,
femtobolt_manager: Optional[FemtoBoltManager] = None,
pressure_manager: Optional[PressureManager] = None):
2025-08-20 08:54:36 +08:00
"""
2025-08-20 10:30:51 +08:00
初始化录制管理器
2025-08-20 08:54:36 +08:00
Args:
2025-08-20 10:30:51 +08:00
camera_manager: 相机管理器实例
db_manager: 数据库管理器实例
2025-08-20 16:04:38 +08:00
femtobolt_manager: FemtoBolt深度相机管理器实例
pressure_manager: 压力传感器管理器实例
2025-08-20 08:54:36 +08:00
"""
2025-08-20 10:30:51 +08:00
self.camera_manager = camera_manager
self.db_manager = db_manager
2025-08-20 16:04:38 +08:00
self.femtobolt_manager = femtobolt_manager
self.pressure_manager = pressure_manager
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 录制状态
self.sync_recording = False
self.recording_stop_event = threading.Event()
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 视频写入器
self.feet_video_writer = None
self.screen_video_writer = None
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 录制线程
self.feet_recording_thread = None
self.screen_recording_thread = None
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 屏幕录制参数
self.screen_fps = 20
self.screen_region = None
self.screen_size = pyautogui.size()
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 视频参数
self.MAX_FRAME_SIZE = (1280, 720) # 最大帧尺寸
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 日志
self.logger = logging.getLogger(__name__)
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
self.logger.info("录制管理器初始化完成")
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
def start_recording(self, session_id: str, patient_id: str) -> Dict[str, Any]:
2025-08-20 08:54:36 +08:00
"""
2025-08-20 10:30:51 +08:00
启动同步录制
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 录制启动状态和信息
2025-08-20 08:54:36 +08:00
"""
2025-08-20 10:30:51 +08:00
result = {
'success': False,
'session_id': session_id,
'patient_id': patient_id,
'recording_start_time': None,
'video_paths': {
'feet_video': None,
'screen_video': None
},
'message': ''
}
try:
# 检查是否已在录制
if self.sync_recording:
result['message'] = f'已在录制中当前会话ID: {self.current_session_id}'
return result
# 设置录制参数
self.current_session_id = session_id
self.current_patient_id = patient_id
self.recording_start_time = datetime.now()
# 创建存储目录
2025-08-20 17:16:37 +08:00
if getattr(sys, 'frozen', False):
# 打包后的exe文件路径
exe_dir = os.path.dirname(sys.executable)
base_path = os.path.join(exe_dir, 'data', 'patients', patient_id, session_id)
else:
base_path = os.path.join('data', 'patients', patient_id, session_id)
2025-08-20 10:30:51 +08:00
try:
os.makedirs(base_path, exist_ok=True)
self.logger.info(f'录制目录创建成功: {base_path}')
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 设置目录权限
self._set_directory_permissions(base_path)
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
except Exception as dir_error:
self.logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}')
result['success'] = False
result['message'] = f'创建录制目录失败: {dir_error}'
return result
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
screen_video_path = os.path.join(base_path, 'screen.mp4')
result['video_paths']['feet_video'] = feet_video_path
result['video_paths']['screen_video'] = screen_video_path
# 更新数据库中的视频路径
if self.db_manager:
try:
# 更新会话状态为录制中
if not self.db_manager.update_session_status(session_id, 'recording'):
self.logger.error(f'更新会话状态为录制中失败 - 会话ID: {session_id}')
# 更新视频文件路径
self.db_manager.update_session_normal_video_path(session_id, feet_video_path)
self.db_manager.update_session_screen_video_path(session_id, screen_video_path)
self.logger.debug(f'数据库视频路径更新成功 - 会话ID: {session_id}')
except Exception as db_error:
self.logger.error(f'更新数据库视频路径失败: {db_error}')
# 视频编码参数
2025-08-20 16:04:38 +08:00
fourcc = cv2.VideoWriter_fourcc(*'avc1')
2025-08-20 10:30:51 +08:00
fps = 30
# 初始化足部视频写入器
if self.camera_manager and self.camera_manager.is_connected:
target_width, target_height = self.MAX_FRAME_SIZE
self.feet_video_writer = cv2.VideoWriter(
feet_video_path, fourcc, fps, (target_width, target_height)
)
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
if self.feet_video_writer.isOpened():
self.logger.info(f'脚部视频写入器初始化成功: {feet_video_path}')
else:
self.logger.error(f'脚部视频写入器初始化失败: {feet_video_path}')
else:
self.logger.warning('相机设备未启用,跳过脚部视频写入器初始化')
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 初始化屏幕录制写入器
2025-08-20 16:04:38 +08:00
# record_size = self.screen_region[2:4] if self.screen_region else self.screen_size
# print('屏幕写入器的宽高..............',record_size)
2025-08-20 10:30:51 +08:00
self.screen_video_writer = cv2.VideoWriter(
2025-08-20 16:04:38 +08:00
screen_video_path, fourcc, fps, (self.screen_size[0],self.screen_size[1])
2025-08-20 10:30:51 +08:00
)
if self.screen_video_writer.isOpened():
self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}')
else:
self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}')
# 重置停止事件
self.recording_stop_event.clear()
self.sync_recording = True
# 启动录制线程
if self.feet_video_writer:
self.feet_recording_thread = threading.Thread(
target=self._feet_recording_thread,
daemon=True,
name='FeetRecordingThread'
)
self.feet_recording_thread.start()
if self.screen_video_writer:
self.screen_recording_thread = threading.Thread(
target=self._screen_recording_thread,
daemon=True,
name='ScreenRecordingThread'
)
self.screen_recording_thread.start()
result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat()
result['message'] = '同步录制已启动'
self.logger.info(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}')
except Exception as e:
self.logger.error(f'启动同步录制失败: {e}')
result['message'] = f'启动录制失败: {str(e)}'
# 清理已创建的写入器
self._cleanup_video_writers()
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
return result
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
def stop_recording(self, session_id: str = None) -> Dict[str, Any]:
2025-08-20 08:54:36 +08:00
"""
停止录制
2025-08-20 10:30:51 +08:00
Args:
session_id: 会话ID用于验证是否为当前录制会话
Returns:
Dict: 停止录制的结果
2025-08-20 08:54:36 +08:00
"""
2025-08-20 10:30:51 +08:00
result = {
'success': False,
'session_id': self.current_session_id,
'message': ''
}
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
try:
# 验证会话ID
if session_id and session_id != self.current_session_id:
result['message'] = f'会话ID不匹配: 期望 {self.current_session_id}, 收到 {session_id}'
return result
if not self.sync_recording:
result['message'] = '当前没有进行录制'
return result
# 设置停止标志
self.sync_recording = False
self.recording_stop_event.set()
# 等待录制线程结束
if self.feet_recording_thread and self.feet_recording_thread.is_alive():
self.feet_recording_thread.join(timeout=5.0)
if self.screen_recording_thread and self.screen_recording_thread.is_alive():
self.screen_recording_thread.join(timeout=5.0)
# 清理视频写入器
self._cleanup_video_writers()
# 更新数据库状态
if self.db_manager and self.current_session_id:
try:
self.db_manager.update_session_status(self.current_session_id, 'completed')
self.logger.info(f'会话状态已更新为完成 - 会话ID: {self.current_session_id}')
except Exception as db_error:
self.logger.error(f'更新数据库状态失败: {db_error}')
result['success'] = True
result['message'] = '录制已停止'
self.logger.info(f'录制已停止 - 会话ID: {self.current_session_id}')
# 重置会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
except Exception as e:
self.logger.error(f'停止录制失败: {e}')
result['message'] = f'停止录制失败: {str(e)}'
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
return result
def _feet_recording_thread(self):
"""足部视频录制线程"""
consecutive_failures = 0
max_consecutive_failures = 10
recording_frame_count = 0
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
self.logger.info(f"足部录制线程已启动 - 会话ID: {self.current_session_id}")
self.logger.info(f"视频写入器状态: {self.feet_video_writer.isOpened() if self.feet_video_writer else 'None'}")
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
try:
# 使用与屏幕录制相同的帧率控制
target_fps = 30 # 目标帧率
frame_interval = 1.0 / target_fps
last_frame_time = time.time()
while self.sync_recording and not self.recording_stop_event.is_set():
current_time = time.time()
# 检查是否到了下一帧的时间
if current_time - last_frame_time >= frame_interval:
if self.feet_video_writer:
# 从相机管理器的全局缓存获取最新帧
frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera')
if frame is not None:
self.logger.debug(f"成功获取帧 - 尺寸: {frame.shape}, 数据类型: {frame.dtype}, 时间戳: {frame_timestamp}")
# 检查视频写入器状态
if not self.feet_video_writer.isOpened():
self.logger.error(f"脚部视频写入器已关闭,无法写入帧 - 会话ID: {self.current_session_id}")
break
try:
# 调整帧尺寸到目标大小
resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE)
# 写入录制文件
write_success = self.feet_video_writer.write(resized_frame)
if write_success is False:
self.logger.error(f"视频帧写入返回False - 可能写入失败")
consecutive_failures += 1
else:
consecutive_failures = 0
recording_frame_count += 1
except Exception as write_error:
self.logger.error(f"写入脚部视频帧异常: {write_error}")
consecutive_failures += 1
if consecutive_failures >= 10:
self.logger.error("连续写入失败次数过多,停止录制")
break
else:
# 如果没有获取到帧,写入上一帧或黑色帧来保持帧率
consecutive_failures += 1
if consecutive_failures <= 3:
self.logger.warning(f"录制线程无法从缓存获取帧 (连续失败{consecutive_failures}次)")
elif consecutive_failures == max_consecutive_failures:
self.logger.error(f"录制线程连续失败{max_consecutive_failures}次,可能缓存无数据或推流已停止")
last_frame_time = current_time
else:
self.logger.error("足部视频写入器未初始化")
break
# 短暂休眠避免CPU占用过高
time.sleep(0.01)
# 检查连续失败情况
if consecutive_failures >= max_consecutive_failures:
self.logger.error(f"连续失败次数达到上限({max_consecutive_failures}),停止录制")
break
except Exception as e:
self.logger.error(f'足部录制线程异常: {e}')
finally:
self.logger.info(f"足部录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}")
# 确保视频写入器被正确关闭
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
self.logger.debug("足部视频写入器已释放")
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
def _screen_recording_thread(self):
"""屏幕录制线程"""
self.logger.info(f"屏幕录制线程已启动 - 会话ID: {self.current_session_id}")
recording_frame_count = 0
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
try:
# 使用与足部录制相同的帧率控制
target_fps = 30 # 目标帧率
frame_interval = 1.0 / target_fps
last_frame_time = time.time()
while self.sync_recording and not self.recording_stop_event.is_set():
current_time = time.time()
# 检查是否到了下一帧的时间
if current_time - last_frame_time >= frame_interval:
try:
2025-08-20 16:04:38 +08:00
# 截取屏幕self.screen_size
if self.screen_size:
# print('获取截图的时候屏幕写入器的宽高..............',self.screen_region)
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
2025-08-20 10:30:51 +08:00
else:
2025-08-20 16:04:38 +08:00
# print('screen_region方法没找到。。。。。。。。。。。。。。。。。')
2025-08-20 10:30:51 +08:00
screenshot = pyautogui.screenshot()
# 转换为numpy数组
frame = np.array(screenshot)
# 转换颜色格式 (RGB -> BGR)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# 写入视频文件
if self.screen_video_writer and self.screen_video_writer.isOpened():
self.screen_video_writer.write(frame)
recording_frame_count += 1
last_frame_time = current_time
except Exception as e:
self.logger.error(f"屏幕录制异常: {e}")
# 短暂休眠避免CPU占用过高
time.sleep(0.01)
except Exception as e:
self.logger.error(f'屏幕录制线程异常: {e}')
finally:
self.logger.info(f"屏幕录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}")
# 确保视频写入器被正确关闭
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
self.logger.debug("屏幕视频写入器已释放")
def _cleanup_video_writers(self):
"""清理视频写入器"""
try:
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
self.logger.debug("足部视频写入器已清理")
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
self.logger.debug("屏幕视频写入器已清理")
except Exception as e:
self.logger.error(f"清理视频写入器失败: {e}")
def _set_directory_permissions(self, path):
"""设置目录权限"""
try:
import subprocess
import platform
if platform.system() == 'Windows':
try:
# 为Users用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Users:(OI)(CI)F'
], check=True, capture_output=True, text=True)
# 为Everyone用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Everyone:(OI)(CI)F'
], check=True, capture_output=True, text=True)
self.logger.info(f"已设置Windows目录权限Users和Everyone完全控制: {path}")
except subprocess.CalledProcessError as icacls_error:
self.logger.warning(f"Windows权限设置失败: {icacls_error}")
else:
self.logger.info(f"已设置目录权限为777: {path}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
def set_screen_region(self, region):
"""设置屏幕录制区域"""
if self.sync_recording:
self.logger.warning("录制进行中,无法更改区域设置")
2025-08-20 08:54:36 +08:00
return False
2025-08-20 10:30:51 +08:00
self.screen_region = region
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
if self.screen_region:
x, y, width, height = self.screen_region
2025-08-20 08:54:36 +08:00
# 确保区域在屏幕范围内
x = max(0, min(x, self.screen_size[0] - 1))
y = max(0, min(y, self.screen_size[1] - 1))
width = min(width, self.screen_size[0] - x)
height = min(height, self.screen_size[1] - y)
2025-08-20 10:30:51 +08:00
self.screen_region = (x, y, width, height)
self.logger.info(f"录制区域已设置: {self.screen_region}")
2025-08-20 08:54:36 +08:00
else:
2025-08-20 10:30:51 +08:00
self.logger.info("录制模式已设置: 全屏录制")
2025-08-20 08:54:36 +08:00
return True
def get_status(self):
2025-08-20 10:30:51 +08:00
"""获取录制状态"""
2025-08-20 08:54:36 +08:00
return {
2025-08-20 10:30:51 +08:00
'recording': self.sync_recording,
'session_id': self.current_session_id,
'patient_id': self.current_patient_id,
'recording_start_time': self.recording_start_time.isoformat() if self.recording_start_time else None,
2025-08-20 08:54:36 +08:00
'screen_size': self.screen_size,
2025-08-20 10:30:51 +08:00
'screen_region': self.screen_region,
'screen_fps': self.screen_fps,
'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False,
'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False
2025-08-20 08:54:36 +08:00
}
2025-08-20 16:04:38 +08:00
def capture_images(self, session_id: str, patient_id: str, data_dir) -> Dict[str, str]:
"""
采集屏幕截图和足部视频截图
Args:
session_id: 检测会话ID
patient_id: 患者ID
data_dir: 数据存储目录路径
Returns:
Dict: 包含截图文件路径的字典
"""
result = {
'screen_image': None,
'foot_image': None
}
try:
# 1. 采集屏幕截图
screen_image_path = self._capture_screen_image(data_dir)
if screen_image_path:
result['screen_image'] = str(screen_image_path)
self.logger.debug(f'屏幕截图保存成功: {screen_image_path}')
# 2. 采集足部视频截图
if self.camera_manager and self.camera_manager.is_connected:
foot_image_path = self._capture_foot_image(data_dir)
if foot_image_path:
result['foot_image'] = str(foot_image_path)
self.logger.debug(f'足部截图保存成功: {foot_image_path}')
else:
self.logger.warning('相机设备未连接,跳过足部截图')
except Exception as e:
self.logger.error(f'截图采集失败: {e}')
return result
def collect_detection_data(self, session_id: str, patient_id: str) -> Dict[str, Any]:
"""
采集所有设备数据并保存到指定目录结构
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
2025-08-20 17:16:37 +08:00
data_dir=""
if getattr(sys, 'frozen', False):
# 打包后的exe文件路径
exe_dir = os.path.dirname(sys.executable)
data_dir=(os.path.join(exe_dir, 'data/patients/{patient_id}/{session_id}/{timestamp}'))
else:
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
2025-08-20 16:04:38 +08:00
# 创建数据存储目录
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
self.logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': None,
'body_pose': None,
'body_image': None,
'foot_data': None,
'foot_image': None,
'foot_data_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# 1. 采集头部姿态数据从IMU设备获取
# 注意这里需要从外部传入IMU设备或者在初始化时添加IMU管理器
# if self.imu_manager and self.imu_manager.is_connected:
# head_pose_data = self._collect_head_pose_data()
# if head_pose_data:
# data['head_pose'] = json.dumps(head_pose_data)
# self.logger.debug(f'头部姿态数据采集成功: {session_id}')
# 2. 采集身体姿态数据从FemtoBolt深度相机获取
if self.femtobolt_manager and self.femtobolt_manager.is_connected:
body_pose_data = self._collect_body_pose_data()
if body_pose_data:
data['body_pose'] = json.dumps(body_pose_data)
self.logger.debug(f'身体姿态数据采集成功: {session_id}')
# 3. 采集身体视频截图从FemtoBolt深度相机获取
if self.femtobolt_manager and self.femtobolt_manager.is_connected:
try:
body_image_path = self._capture_body_image(data_dir)
if body_image_path:
data['body_image'] = str(body_image_path)
self.logger.debug(f'身体截图保存成功: {body_image_path}')
except Exception as e:
self.logger.error(f'采集身体截图异常: {e}')
# 4. 采集足部压力数据(从压力传感器获取)
if self.pressure_manager and hasattr(self.pressure_manager, 'is_connected') and self.pressure_manager.is_connected:
foot_data = self._collect_foot_pressure_data()
if foot_data:
data['foot_data'] = json.dumps(foot_data)
self.logger.debug(f'足部压力数据采集成功: {session_id}')
# 5. 采集足部监测视频截图(从摄像头获取)
if self.camera_manager and self.camera_manager.is_connected:
foot_image_path = self._capture_foot_image(data_dir)
if foot_image_path:
data['foot_image'] = str(foot_image_path)
self.logger.debug(f'足部截图保存成功: {foot_image_path}')
# 6. 生成足底压力数据图(从压力传感器数据生成)
if self.pressure_manager and hasattr(self.pressure_manager, 'is_connected') and self.pressure_manager.is_connected:
foot_data_image_path = self._generate_foot_pressure_image(data_dir)
if foot_data_image_path:
data['foot_data_image'] = str(foot_data_image_path)
self.logger.debug(f'足底压力数据图生成成功: {foot_data_image_path}')
# 7. 采集屏幕截图
screen_image_path = self._capture_screen_image(data_dir)
if screen_image_path:
data['screen_image'] = str(screen_image_path)
self.logger.debug(f'屏幕截图保存成功: {screen_image_path}')
self.logger.debug(f'数据采集完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
self.logger.error(f'数据采集失败: {e}')
return data
def _collect_body_pose_data(self) -> Optional[Dict[str, Any]]:
"""
从FemtoBolt深度相机采集身体姿态数据
Returns:
Dict: 身体姿态数据字典
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_pose_data'):
pose_data = self.femtobolt_manager.get_pose_data()
return pose_data
else:
self.logger.warning('FemtoBolt管理器未连接或不支持姿态数据采集')
return None
except Exception as e:
self.logger.error(f'采集身体姿态数据失败: {e}')
return None
def _capture_body_image(self, data_dir) -> Optional[str]:
"""
从FemtoBolt深度相机采集身体截图
Args:
data_dir: 数据存储目录
Returns:
str: 身体截图文件的相对路径
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_latest_frame'):
frame = self.femtobolt_manager.get_latest_frame()
if frame is not None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'body_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图像
cv2.imwrite(str(file_path), frame)
# 返回相对路径
return str(file_path.relative_to(Path.cwd()))
else:
self.logger.warning('FemtoBolt相机未获取到有效帧')
return None
else:
self.logger.warning('FemtoBolt管理器未连接或不支持图像采集')
return None
except Exception as e:
self.logger.error(f'采集身体截图失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Optional[Dict[str, Any]]:
"""
从压力传感器采集足部压力数据
Returns:
Dict: 足部压力数据字典
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'get_pressure_data'):
pressure_data = self.pressure_manager.get_pressure_data()
return pressure_data
else:
self.logger.warning('压力传感器管理器未连接或不支持压力数据采集')
return None
except Exception as e:
self.logger.error(f'采集足部压力数据失败: {e}')
return None
def _generate_foot_pressure_image(self, data_dir) -> Optional[str]:
"""
生成足底压力数据图
Args:
data_dir: 数据存储目录
Returns:
str: 足底压力数据图文件的相对路径
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'generate_pressure_heatmap'):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'foot_pressure_{timestamp}.jpg'
file_path = data_dir / filename
# 生成压力热力图
success = self.pressure_manager.generate_pressure_heatmap(str(file_path))
if success and file_path.exists():
# 返回相对路径
return str(file_path.relative_to(Path.cwd()))
else:
self.logger.warning('足底压力数据图生成失败')
return None
else:
self.logger.warning('压力传感器管理器未连接或不支持压力图生成')
return None
except Exception as e:
self.logger.error(f'生成足底压力数据图失败: {e}')
return None
def _capture_screen_image(self, data_dir) -> Optional[str]:
"""
采集屏幕截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
# 截取屏幕
if self.screen_size:
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
else:
screenshot = pyautogui.screenshot()
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'screen_image.png'
screenshot.save(str(image_path))
# 返回相对路径
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
relative_path = abs_image_path.relative_to(abs_cwd)
return str(relative_path)
except Exception as e:
self.logger.error(f'屏幕截图失败: {e}')
return None
def _capture_foot_image(self, data_dir) -> Optional[str]:
"""
采集足部视频截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
if not self.camera_manager or not self.camera_manager.is_connected:
self.logger.warning('相机设备未连接,无法采集足部截图')
return None
# 从相机管理器获取最新帧
frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera')
if frame is None:
self.logger.warning('无法从相机获取帧数据')
return None
# 调整帧尺寸
resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE)
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'foot_image.png'
cv2.imwrite(str(image_path), resized_frame)
# 返回相对路径
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
relative_path = abs_image_path.relative_to(abs_cwd)
return str(relative_path)
except Exception as e:
self.logger.error(f'足部截图失败: {e}')
return None
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 保持向后兼容的ScreenRecorder类
class ScreenRecorder:
def __init__(self, output_dir="recordings", fps=20, quality=80, region=None):
"""向后兼容的屏幕录制器"""
self.recording_manager = RecordingManager()
self.recording_manager.screen_fps = fps
self.recording_manager.set_screen_region(region)
self.output_dir = output_dir
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
def start_recording(self, filename=None):
"""开始录制"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"screen_record_{timestamp}"
2025-08-20 08:54:36 +08:00
2025-08-20 10:30:51 +08:00
# 使用文件名作为会话ID
session_id = filename
patient_id = "default"
return self.recording_manager.start_recording(session_id, patient_id)
def stop_recording(self):
"""停止录制"""
return self.recording_manager.stop_recording()
def get_status(self):
"""获取状态"""
return self.recording_manager.get_status()