BodyBalanceEvaluation/backend/devices/screen_recorder.py

801 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
综合录制管理器
支持屏幕录制和足部视频录制
"""
import cv2
import numpy as np
import pyautogui
import threading
import time
from datetime import datetime
import os
import logging
import json
import base64
from pathlib import Path
from typing import Optional, Dict, Any
import sys
try:
from .camera_manager import CameraManager
from .femtobolt_manager import FemtoBoltManager
from .pressure_manager import PressureManager
except ImportError:
from camera_manager import CameraManager
from femtobolt_manager import FemtoBoltManager
from pressure_manager import PressureManager
class RecordingManager:
def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None,
femtobolt_manager: Optional[FemtoBoltManager] = None,
pressure_manager: Optional[PressureManager] = None):
"""
初始化录制管理器
Args:
camera_manager: 相机管理器实例
db_manager: 数据库管理器实例
femtobolt_manager: FemtoBolt深度相机管理器实例
pressure_manager: 压力传感器管理器实例
"""
self.camera_manager = camera_manager
self.db_manager = db_manager
self.femtobolt_manager = femtobolt_manager
self.pressure_manager = pressure_manager
# 录制状态
self.sync_recording = False
self.recording_stop_event = threading.Event()
# 会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
# 视频写入器
self.feet_video_writer = None
self.screen_video_writer = None
# 录制线程
self.feet_recording_thread = None
self.screen_recording_thread = None
# 屏幕录制参数
self.screen_fps = 20
self.screen_region = None
self.screen_size = pyautogui.size()
# 视频参数
self.MAX_FRAME_SIZE = (1280, 720) # 最大帧尺寸
# 日志
self.logger = logging.getLogger(__name__)
self.logger.info("录制管理器初始化完成")
def start_recording(self, session_id: str, patient_id: str) -> Dict[str, Any]:
"""
启动同步录制
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 录制启动状态和信息
"""
result = {
'success': False,
'session_id': session_id,
'patient_id': patient_id,
'recording_start_time': None,
'video_paths': {
'feet_video': None,
'screen_video': None
},
'message': ''
}
try:
# 检查是否已在录制
if self.sync_recording:
result['message'] = f'已在录制中当前会话ID: {self.current_session_id}'
return result
# 设置录制参数
self.current_session_id = session_id
self.current_patient_id = patient_id
self.recording_start_time = datetime.now()
data_base_path = os.path.join('data', 'patients', patient_id, session_id)
# 创建存储目录
if getattr(sys, 'frozen', False):
# 打包后的exe文件路径
exe_dir = os.path.dirname(sys.executable)
base_path = os.path.join(exe_dir, 'data', 'patients', patient_id, session_id)
else:
base_path = os.path.join('data', 'patients', patient_id, session_id)
try:
os.makedirs(base_path, exist_ok=True)
self.logger.info(f'录制目录创建成功: {base_path}')
# 设置目录权限
self._set_directory_permissions(base_path)
except Exception as dir_error:
self.logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}')
result['success'] = False
result['message'] = f'创建录制目录失败: {dir_error}'
return result
# 定义视频文件路径
feet_video_path = os.path.join(base_path, 'feet.mp4')
screen_video_path = os.path.join(base_path, 'screen.mp4')
result['video_paths']['feet_video'] = feet_video_path
result['video_paths']['screen_video'] = screen_video_path
# 更新数据库中的视频路径
if self.db_manager:
try:
# 更新会话状态为录制中
if not self.db_manager.update_session_status(session_id, 'recording'):
self.logger.error(f'更新会话状态为录制中失败 - 会话ID: {session_id}')
# 更新视频文件路径
self.db_manager.update_session_normal_video_path(session_id, os.path.join(data_base_path, 'feet.mp4'))
self.db_manager.update_session_screen_video_path(session_id, os.path.join(data_base_path, 'screen.mp4'))
self.logger.debug(f'数据库视频路径更新成功 - 会话ID: {session_id}')
except Exception as db_error:
self.logger.error(f'更新数据库视频路径失败: {db_error}')
# 视频编码参数
fourcc = cv2.VideoWriter_fourcc(*'avc1')
fps = 30
# 初始化足部视频写入器
if self.camera_manager and self.camera_manager.is_connected:
target_width, target_height = self.MAX_FRAME_SIZE
self.feet_video_writer = cv2.VideoWriter(
feet_video_path, fourcc, fps, (target_width, target_height)
)
if self.feet_video_writer.isOpened():
self.logger.info(f'脚部视频写入器初始化成功: {feet_video_path}')
else:
self.logger.error(f'脚部视频写入器初始化失败: {feet_video_path}')
else:
self.logger.warning('相机设备未启用,跳过脚部视频写入器初始化')
# 初始化屏幕录制写入器
# record_size = self.screen_region[2:4] if self.screen_region else self.screen_size
# print('屏幕写入器的宽高..............',record_size)
# self.screen_video_writer = cv2.VideoWriter(
# screen_video_path, fourcc, fps, (self.screen_size[0],self.screen_size[1])
# )
# 检查屏幕视频写入器状态(仅在初始化时)
if self.screen_video_writer and self.screen_video_writer.isOpened():
self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}')
elif self.screen_video_writer:
self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}')
else:
self.logger.info('屏幕录制功能暂时禁用')
# 重置停止事件
self.recording_stop_event.clear()
self.sync_recording = True
# 启动录制线程
if self.feet_video_writer:
self.feet_recording_thread = threading.Thread(
target=self._feet_recording_thread,
daemon=True,
name='FeetRecordingThread'
)
self.feet_recording_thread.start()
# if self.screen_video_writer:
# self.screen_recording_thread = threading.Thread(
# target=self._screen_recording_thread,
# daemon=True,
# name='ScreenRecordingThread'
# )
# self.screen_recording_thread.start()
result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat()
result['message'] = '同步录制已启动'
self.logger.info(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}')
except Exception as e:
self.logger.error(f'启动同步录制失败: {e}')
result['message'] = f'启动录制失败: {str(e)}'
# 清理已创建的写入器
self._cleanup_video_writers()
return result
def stop_recording(self, session_id: str = None) -> Dict[str, Any]:
"""
停止录制
Args:
session_id: 会话ID用于验证是否为当前录制会话
Returns:
Dict: 停止录制的结果
"""
result = {
'success': False,
'session_id': self.current_session_id,
'message': ''
}
try:
# 验证会话ID
if session_id and session_id != self.current_session_id:
result['message'] = f'会话ID不匹配: 期望 {self.current_session_id}, 收到 {session_id}'
return result
if not self.sync_recording:
result['message'] = '当前没有进行录制'
return result
# 设置停止标志
self.sync_recording = False
self.recording_stop_event.set()
# 等待录制线程结束
if self.feet_recording_thread and self.feet_recording_thread.is_alive():
self.feet_recording_thread.join(timeout=5.0)
if self.screen_recording_thread and self.screen_recording_thread.is_alive():
self.screen_recording_thread.join(timeout=5.0)
# 清理视频写入器
self._cleanup_video_writers()
# 更新数据库状态
if self.db_manager and self.current_session_id:
try:
self.db_manager.update_session_status(self.current_session_id, 'completed')
self.logger.info(f'会话状态已更新为完成 - 会话ID: {self.current_session_id}')
except Exception as db_error:
self.logger.error(f'更新数据库状态失败: {db_error}')
result['success'] = True
result['message'] = '录制已停止'
self.logger.info(f'录制已停止 - 会话ID: {self.current_session_id}')
# 重置会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
except Exception as e:
self.logger.error(f'停止录制失败: {e}')
result['message'] = f'停止录制失败: {str(e)}'
return result
def _feet_recording_thread(self):
"""足部视频录制线程"""
consecutive_failures = 0
max_consecutive_failures = 10
recording_frame_count = 0
self.logger.info(f"足部录制线程已启动 - 会话ID: {self.current_session_id}")
self.logger.info(f"视频写入器状态: {self.feet_video_writer.isOpened() if self.feet_video_writer else 'None'}")
try:
# 使用与屏幕录制相同的帧率控制
target_fps = 30 # 目标帧率
frame_interval = 1.0 / target_fps
last_frame_time = time.time()
while self.sync_recording and not self.recording_stop_event.is_set():
current_time = time.time()
# 检查是否到了下一帧的时间
if current_time - last_frame_time >= frame_interval:
if self.feet_video_writer:
# 从相机管理器的全局缓存获取最新帧
frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera')
if frame is not None:
self.logger.debug(f"成功获取帧 - 尺寸: {frame.shape}, 数据类型: {frame.dtype}, 时间戳: {frame_timestamp}")
# 检查视频写入器状态
if not self.feet_video_writer.isOpened():
self.logger.error(f"脚部视频写入器已关闭,无法写入帧 - 会话ID: {self.current_session_id}")
break
try:
# 调整帧尺寸到目标大小
resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE)
# 写入录制文件
write_success = self.feet_video_writer.write(resized_frame)
if write_success is False:
self.logger.error(f"视频帧写入返回False - 可能写入失败")
consecutive_failures += 1
else:
consecutive_failures = 0
recording_frame_count += 1
except Exception as write_error:
self.logger.error(f"写入脚部视频帧异常: {write_error}")
consecutive_failures += 1
if consecutive_failures >= 10:
self.logger.error("连续写入失败次数过多,停止录制")
break
else:
# 如果没有获取到帧,写入上一帧或黑色帧来保持帧率
consecutive_failures += 1
if consecutive_failures <= 3:
self.logger.warning(f"录制线程无法从缓存获取帧 (连续失败{consecutive_failures}次)")
elif consecutive_failures == max_consecutive_failures:
self.logger.error(f"录制线程连续失败{max_consecutive_failures}次,可能缓存无数据或推流已停止")
last_frame_time = current_time
else:
self.logger.error("足部视频写入器未初始化")
break
# 短暂休眠避免CPU占用过高
time.sleep(0.01)
# 检查连续失败情况
if consecutive_failures >= max_consecutive_failures:
self.logger.error(f"连续失败次数达到上限({max_consecutive_failures}),停止录制")
break
except Exception as e:
self.logger.error(f'足部录制线程异常: {e}')
finally:
self.logger.info(f"足部录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}")
# 确保视频写入器被正确关闭
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
self.logger.debug("足部视频写入器已释放")
def _screen_recording_thread(self):
"""屏幕录制线程"""
self.logger.info(f"屏幕录制线程已启动 - 会话ID: {self.current_session_id}")
recording_frame_count = 0
try:
# 使用与足部录制相同的帧率控制
target_fps = 30 # 目标帧率
frame_interval = 1.0 / target_fps
last_frame_time = time.time()
while self.sync_recording and not self.recording_stop_event.is_set():
current_time = time.time()
# 检查是否到了下一帧的时间
if current_time - last_frame_time >= frame_interval:
try:
# 截取屏幕self.screen_size
if self.screen_size:
# print('获取截图的时候屏幕写入器的宽高..............',self.screen_region)
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
else:
# print('screen_region方法没找到。。。。。。。。。。。。。。。。。')
screenshot = pyautogui.screenshot()
# 转换为numpy数组
frame = np.array(screenshot)
# 转换颜色格式 (RGB -> BGR)
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# 写入视频文件
if self.screen_video_writer and self.screen_video_writer.isOpened():
self.screen_video_writer.write(frame)
recording_frame_count += 1
last_frame_time = current_time
except Exception as e:
self.logger.error(f"屏幕录制异常: {e}")
# 短暂休眠避免CPU占用过高
time.sleep(0.01)
except Exception as e:
self.logger.error(f'屏幕录制线程异常: {e}')
finally:
self.logger.info(f"屏幕录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}")
# 确保视频写入器被正确关闭
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
self.logger.debug("屏幕视频写入器已释放")
def _cleanup_video_writers(self):
"""清理视频写入器"""
try:
if self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
self.logger.debug("足部视频写入器已清理")
if self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
self.logger.debug("屏幕视频写入器已清理")
except Exception as e:
self.logger.error(f"清理视频写入器失败: {e}")
def _set_directory_permissions(self, path):
"""设置目录权限"""
try:
import subprocess
import platform
if platform.system() == 'Windows':
try:
# 为Users用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Users:(OI)(CI)F'
], check=True, capture_output=True, text=True)
# 为Everyone用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Everyone:(OI)(CI)F'
], check=True, capture_output=True, text=True)
self.logger.info(f"已设置Windows目录权限Users和Everyone完全控制: {path}")
except subprocess.CalledProcessError as icacls_error:
self.logger.warning(f"Windows权限设置失败: {icacls_error}")
else:
self.logger.info(f"已设置目录权限为777: {path}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
def set_screen_region(self, region):
"""设置屏幕录制区域"""
if self.sync_recording:
self.logger.warning("录制进行中,无法更改区域设置")
return False
self.screen_region = region
if self.screen_region:
x, y, width, height = self.screen_region
# 确保区域在屏幕范围内
x = max(0, min(x, self.screen_size[0] - 1))
y = max(0, min(y, self.screen_size[1] - 1))
width = min(width, self.screen_size[0] - x)
height = min(height, self.screen_size[1] - y)
self.screen_region = (x, y, width, height)
self.logger.info(f"录制区域已设置: {self.screen_region}")
else:
self.logger.info("录制模式已设置: 全屏录制")
return True
def get_status(self):
"""获取录制状态"""
return {
'recording': self.sync_recording,
'session_id': self.current_session_id,
'patient_id': self.current_patient_id,
'recording_start_time': self.recording_start_time.isoformat() if self.recording_start_time else None,
'screen_size': self.screen_size,
'screen_region': self.screen_region,
'screen_fps': self.screen_fps,
'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False,
'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False
}
def collect_detection_data(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]:
"""
保存前端传入的检测数据和图片
Args:
session_id: 检测会话ID
patient_id: 患者ID
detection_data: 前端传入的检测数据包含base64格式的图片数据
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
if getattr(sys, 'frozen', False):
# 打包后的exe文件路径
exe_dir = os.path.dirname(sys.executable)
data_dir = Path(os.path.join(exe_dir, 'data', 'patients', patient_id, session_id, timestamp))
else:
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
# 创建数据存储目录
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
self.logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': detection_data.get('head_pose'),
'body_pose': None,
'body_image': None,
'foot_data': detection_data.get('foot_data'),
'foot_data_image': None,
'foot_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# 保存图片数据
image_fields = [
('body_image', 'body'),
('foot_image', 'foot'),
('foot_data_image', 'foot_data'),
('screen_image', 'screen')
]
for field, prefix in image_fields:
base64_data = detection_data.get(field)
if base64_data:
try:
# 移除base64头部信息
if ';base64,' in base64_data:
base64_data = base64_data.split(';base64,')[1]
# 解码base64数据
image_data = base64.b64decode(base64_data)
# 生成图片文件名
filename = f'{prefix}_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图片
with open(file_path, 'wb') as f:
f.write(image_data)
# 更新数据字典中的图片路径
data[field] = str(os.path.join('data', 'patients', patient_id, session_id, timestamp, filename))
self.logger.debug(f'{field}保存成功: {filename}')
except Exception as e:
self.logger.error(f'保存{field}失败: {e}')
self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
self.logger.error(f'数据保存失败: {e}')
return data
def _collect_body_pose_data(self) -> Optional[Dict[str, Any]]:
"""
从FemtoBolt深度相机采集身体姿态数据
Returns:
Dict: 身体姿态数据字典
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_pose_data'):
pose_data = self.femtobolt_manager.get_pose_data()
return pose_data
else:
self.logger.warning('FemtoBolt管理器未连接或不支持姿态数据采集')
return None
except Exception as e:
self.logger.error(f'采集身体姿态数据失败: {e}')
return None
def _capture_body_image(self, data_dir) -> Optional[str]:
"""
从FemtoBolt深度相机采集身体截图
Args:
data_dir: 数据存储目录
Returns:
str: 身体截图文件的相对路径
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_latest_frame'):
frame = self.femtobolt_manager.get_latest_frame()
if frame is not None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'body_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图像
cv2.imwrite(str(file_path), frame)
# 返回相对路径
return str(filename)
else:
self.logger.warning('FemtoBolt相机未获取到有效帧')
return None
else:
self.logger.warning('FemtoBolt管理器未连接或不支持图像采集')
return None
except Exception as e:
self.logger.error(f'采集身体截图失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Optional[Dict[str, Any]]:
"""
从压力传感器采集足部压力数据
Returns:
Dict: 足部压力数据字典
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'get_pressure_data'):
pressure_data = self.pressure_manager.get_pressure_data()
return pressure_data
else:
self.logger.warning('压力传感器管理器未连接或不支持压力数据采集')
return None
except Exception as e:
self.logger.error(f'采集足部压力数据失败: {e}')
return None
def _generate_foot_pressure_image(self, data_dir) -> Optional[str]:
"""
生成足底压力数据图
Args:
data_dir: 数据存储目录
Returns:
str: 足底压力数据图文件的相对路径
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'generate_pressure_heatmap'):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'foot_pressure_{timestamp}.jpg'
file_path = data_dir / filename
# 生成压力热力图
success = self.pressure_manager.generate_pressure_heatmap(str(file_path))
if success and file_path.exists():
# 返回相对路径
return str(file_path.relative_to(Path.cwd()))
else:
self.logger.warning('足底压力数据图生成失败')
return None
else:
self.logger.warning('压力传感器管理器未连接或不支持压力图生成')
return None
except Exception as e:
self.logger.error(f'生成足底压力数据图失败: {e}')
return None
def _capture_screen_image(self, data_dir) -> Optional[str]:
"""
采集屏幕截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
# 截取屏幕
if self.screen_size:
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
else:
screenshot = pyautogui.screenshot()
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'screen_image.png'
screenshot.save(str(image_path))
# # 返回相对路径
# abs_image_path = image_path.resolve()
# abs_cwd = Path.cwd().resolve()
# relative_path = abs_image_path.relative_to(abs_cwd)
return str('screen_image.png')
except Exception as e:
self.logger.error(f'屏幕截图失败: {e}')
return None
def _capture_foot_image(self, data_dir) -> Optional[str]:
"""
采集足部视频截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
if not self.camera_manager or not self.camera_manager.is_connected:
self.logger.warning('相机设备未连接,无法采集足部截图')
return None
# 从相机管理器获取最新帧
frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera')
if frame is None:
self.logger.warning('无法从相机获取帧数据')
return None
# 调整帧尺寸
resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE)
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'foot_image.png'
cv2.imwrite(str(image_path), resized_frame)
# 返回相对路径
# abs_image_path = image_path.resolve()
# abs_cwd = Path.cwd().resolve()
# relative_path = abs_image_path.relative_to(abs_cwd)
return str(image_path)
except Exception as e:
self.logger.error(f'足部截图失败: {e}')
return None
# 保持向后兼容的ScreenRecorder类
class ScreenRecorder:
def __init__(self, output_dir="recordings", fps=20, quality=80, region=None):
"""向后兼容的屏幕录制器"""
self.recording_manager = RecordingManager()
self.recording_manager.screen_fps = fps
self.recording_manager.set_screen_region(region)
self.output_dir = output_dir
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def start_recording(self, filename=None):
"""开始录制"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"screen_record_{timestamp}"
# 使用文件名作为会话ID
session_id = filename
patient_id = "default"
return self.recording_manager.start_recording(session_id, patient_id)
def stop_recording(self):
"""停止录制"""
return self.recording_manager.stop_recording()
def get_status(self):
"""获取状态"""
return self.recording_manager.get_status()