#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 综合录制管理器 支持屏幕录制和足部视频录制 """ import cv2 import numpy as np import pyautogui import threading import time from datetime import datetime import os import logging from typing import Optional, Dict, Any try: from .camera_manager import CameraManager except ImportError: from camera_manager import CameraManager class RecordingManager: def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None): """ 初始化录制管理器 Args: camera_manager: 相机管理器实例 db_manager: 数据库管理器实例 """ self.camera_manager = camera_manager self.db_manager = db_manager # 录制状态 self.sync_recording = False self.recording_stop_event = threading.Event() # 会话信息 self.current_session_id = None self.current_patient_id = None self.recording_start_time = None # 视频写入器 self.feet_video_writer = None self.screen_video_writer = None # 录制线程 self.feet_recording_thread = None self.screen_recording_thread = None # 屏幕录制参数 self.screen_fps = 20 self.screen_region = None self.screen_size = pyautogui.size() # 视频参数 self.MAX_FRAME_SIZE = (1280, 720) # 最大帧尺寸 # 日志 self.logger = logging.getLogger(__name__) self.logger.info("录制管理器初始化完成") def start_recording(self, session_id: str, patient_id: str) -> Dict[str, Any]: """ 启动同步录制 Args: session_id: 检测会话ID patient_id: 患者ID Returns: Dict: 录制启动状态和信息 """ result = { 'success': False, 'session_id': session_id, 'patient_id': patient_id, 'recording_start_time': None, 'video_paths': { 'feet_video': None, 'screen_video': None }, 'message': '' } try: # 检查是否已在录制 if self.sync_recording: result['message'] = f'已在录制中,当前会话ID: {self.current_session_id}' return result # 设置录制参数 self.current_session_id = session_id self.current_patient_id = patient_id self.recording_start_time = datetime.now() # 创建存储目录 base_path = os.path.join('data', 'patients', patient_id, session_id) try: os.makedirs(base_path, exist_ok=True) self.logger.info(f'录制目录创建成功: {base_path}') # 设置目录权限 self._set_directory_permissions(base_path) except Exception as dir_error: self.logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}') result['success'] = False result['message'] = f'创建录制目录失败: {dir_error}' return result # 定义视频文件路径 feet_video_path = os.path.join(base_path, 'feet.mp4') screen_video_path = os.path.join(base_path, 'screen.mp4') result['video_paths']['feet_video'] = feet_video_path result['video_paths']['screen_video'] = screen_video_path # 更新数据库中的视频路径 if self.db_manager: try: # 更新会话状态为录制中 if not self.db_manager.update_session_status(session_id, 'recording'): self.logger.error(f'更新会话状态为录制中失败 - 会话ID: {session_id}') # 更新视频文件路径 self.db_manager.update_session_normal_video_path(session_id, feet_video_path) self.db_manager.update_session_screen_video_path(session_id, screen_video_path) self.logger.debug(f'数据库视频路径更新成功 - 会话ID: {session_id}') except Exception as db_error: self.logger.error(f'更新数据库视频路径失败: {db_error}') # 视频编码参数 fourcc = cv2.VideoWriter_fourcc(*'mp4v') fps = 30 # 初始化足部视频写入器 if self.camera_manager and self.camera_manager.is_connected: target_width, target_height = self.MAX_FRAME_SIZE self.feet_video_writer = cv2.VideoWriter( feet_video_path, fourcc, fps, (target_width, target_height) ) if self.feet_video_writer.isOpened(): self.logger.info(f'脚部视频写入器初始化成功: {feet_video_path}') else: self.logger.error(f'脚部视频写入器初始化失败: {feet_video_path}') else: self.logger.warning('相机设备未启用,跳过脚部视频写入器初始化') # 初始化屏幕录制写入器 record_size = self.screen_region[2:4] if self.screen_region else self.screen_size self.screen_video_writer = cv2.VideoWriter( screen_video_path, fourcc, self.screen_fps, record_size ) if self.screen_video_writer.isOpened(): self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}') else: self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}') # 重置停止事件 self.recording_stop_event.clear() self.sync_recording = True # 启动录制线程 if self.feet_video_writer: self.feet_recording_thread = threading.Thread( target=self._feet_recording_thread, daemon=True, name='FeetRecordingThread' ) self.feet_recording_thread.start() if self.screen_video_writer: self.screen_recording_thread = threading.Thread( target=self._screen_recording_thread, daemon=True, name='ScreenRecordingThread' ) self.screen_recording_thread.start() result['success'] = True result['recording_start_time'] = self.recording_start_time.isoformat() result['message'] = '同步录制已启动' self.logger.info(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}') except Exception as e: self.logger.error(f'启动同步录制失败: {e}') result['message'] = f'启动录制失败: {str(e)}' # 清理已创建的写入器 self._cleanup_video_writers() return result def stop_recording(self, session_id: str = None) -> Dict[str, Any]: """ 停止录制 Args: session_id: 会话ID,用于验证是否为当前录制会话 Returns: Dict: 停止录制的结果 """ result = { 'success': False, 'session_id': self.current_session_id, 'message': '' } try: # 验证会话ID if session_id and session_id != self.current_session_id: result['message'] = f'会话ID不匹配: 期望 {self.current_session_id}, 收到 {session_id}' return result if not self.sync_recording: result['message'] = '当前没有进行录制' return result # 设置停止标志 self.sync_recording = False self.recording_stop_event.set() # 等待录制线程结束 if self.feet_recording_thread and self.feet_recording_thread.is_alive(): self.feet_recording_thread.join(timeout=5.0) if self.screen_recording_thread and self.screen_recording_thread.is_alive(): self.screen_recording_thread.join(timeout=5.0) # 清理视频写入器 self._cleanup_video_writers() # 更新数据库状态 if self.db_manager and self.current_session_id: try: self.db_manager.update_session_status(self.current_session_id, 'completed') self.logger.info(f'会话状态已更新为完成 - 会话ID: {self.current_session_id}') except Exception as db_error: self.logger.error(f'更新数据库状态失败: {db_error}') result['success'] = True result['message'] = '录制已停止' self.logger.info(f'录制已停止 - 会话ID: {self.current_session_id}') # 重置会话信息 self.current_session_id = None self.current_patient_id = None self.recording_start_time = None except Exception as e: self.logger.error(f'停止录制失败: {e}') result['message'] = f'停止录制失败: {str(e)}' return result def _feet_recording_thread(self): """足部视频录制线程""" consecutive_failures = 0 max_consecutive_failures = 10 recording_frame_count = 0 self.logger.info(f"足部录制线程已启动 - 会话ID: {self.current_session_id}") self.logger.info(f"视频写入器状态: {self.feet_video_writer.isOpened() if self.feet_video_writer else 'None'}") try: # 使用与屏幕录制相同的帧率控制 target_fps = 30 # 目标帧率 frame_interval = 1.0 / target_fps last_frame_time = time.time() while self.sync_recording and not self.recording_stop_event.is_set(): current_time = time.time() # 检查是否到了下一帧的时间 if current_time - last_frame_time >= frame_interval: if self.feet_video_writer: # 从相机管理器的全局缓存获取最新帧 frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera') if frame is not None: self.logger.debug(f"成功获取帧 - 尺寸: {frame.shape}, 数据类型: {frame.dtype}, 时间戳: {frame_timestamp}") # 检查视频写入器状态 if not self.feet_video_writer.isOpened(): self.logger.error(f"脚部视频写入器已关闭,无法写入帧 - 会话ID: {self.current_session_id}") break try: # 调整帧尺寸到目标大小 resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE) # 写入录制文件 write_success = self.feet_video_writer.write(resized_frame) if write_success is False: self.logger.error(f"视频帧写入返回False - 可能写入失败") consecutive_failures += 1 else: consecutive_failures = 0 recording_frame_count += 1 except Exception as write_error: self.logger.error(f"写入脚部视频帧异常: {write_error}") consecutive_failures += 1 if consecutive_failures >= 10: self.logger.error("连续写入失败次数过多,停止录制") break else: # 如果没有获取到帧,写入上一帧或黑色帧来保持帧率 consecutive_failures += 1 if consecutive_failures <= 3: self.logger.warning(f"录制线程无法从缓存获取帧 (连续失败{consecutive_failures}次)") elif consecutive_failures == max_consecutive_failures: self.logger.error(f"录制线程连续失败{max_consecutive_failures}次,可能缓存无数据或推流已停止") last_frame_time = current_time else: self.logger.error("足部视频写入器未初始化") break # 短暂休眠避免CPU占用过高 time.sleep(0.01) # 检查连续失败情况 if consecutive_failures >= max_consecutive_failures: self.logger.error(f"连续失败次数达到上限({max_consecutive_failures}),停止录制") break except Exception as e: self.logger.error(f'足部录制线程异常: {e}') finally: self.logger.info(f"足部录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}") # 确保视频写入器被正确关闭 if self.feet_video_writer: self.feet_video_writer.release() self.feet_video_writer = None self.logger.debug("足部视频写入器已释放") def _screen_recording_thread(self): """屏幕录制线程""" self.logger.info(f"屏幕录制线程已启动 - 会话ID: {self.current_session_id}") recording_frame_count = 0 try: # 使用与足部录制相同的帧率控制 target_fps = 30 # 目标帧率 frame_interval = 1.0 / target_fps last_frame_time = time.time() while self.sync_recording and not self.recording_stop_event.is_set(): current_time = time.time() # 检查是否到了下一帧的时间 if current_time - last_frame_time >= frame_interval: try: # 截取屏幕 if self.screen_region: x, y, width, height = self.screen_region screenshot = pyautogui.screenshot(region=(x, y, width, height)) else: screenshot = pyautogui.screenshot() # 转换为numpy数组 frame = np.array(screenshot) # 转换颜色格式 (RGB -> BGR) frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) # 写入视频文件 if self.screen_video_writer and self.screen_video_writer.isOpened(): self.screen_video_writer.write(frame) recording_frame_count += 1 last_frame_time = current_time except Exception as e: self.logger.error(f"屏幕录制异常: {e}") # 短暂休眠避免CPU占用过高 time.sleep(0.01) except Exception as e: self.logger.error(f'屏幕录制线程异常: {e}') finally: self.logger.info(f"屏幕录制线程已结束 - 会话ID: {self.current_session_id}, 总录制帧数: {recording_frame_count}") # 确保视频写入器被正确关闭 if self.screen_video_writer: self.screen_video_writer.release() self.screen_video_writer = None self.logger.debug("屏幕视频写入器已释放") def _cleanup_video_writers(self): """清理视频写入器""" try: if self.feet_video_writer: self.feet_video_writer.release() self.feet_video_writer = None self.logger.debug("足部视频写入器已清理") if self.screen_video_writer: self.screen_video_writer.release() self.screen_video_writer = None self.logger.debug("屏幕视频写入器已清理") except Exception as e: self.logger.error(f"清理视频写入器失败: {e}") def _set_directory_permissions(self, path): """设置目录权限""" try: import subprocess import platform if platform.system() == 'Windows': try: # 为Users用户组授予完全控制权限 subprocess.run([ 'icacls', path, '/grant', 'Users:(OI)(CI)F' ], check=True, capture_output=True, text=True) # 为Everyone用户组授予完全控制权限 subprocess.run([ 'icacls', path, '/grant', 'Everyone:(OI)(CI)F' ], check=True, capture_output=True, text=True) self.logger.info(f"已设置Windows目录权限(Users和Everyone完全控制): {path}") except subprocess.CalledProcessError as icacls_error: self.logger.warning(f"Windows权限设置失败: {icacls_error}") else: self.logger.info(f"已设置目录权限为777: {path}") except Exception as perm_error: self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功") def set_screen_region(self, region): """设置屏幕录制区域""" if self.sync_recording: self.logger.warning("录制进行中,无法更改区域设置") return False self.screen_region = region if self.screen_region: x, y, width, height = self.screen_region # 确保区域在屏幕范围内 x = max(0, min(x, self.screen_size[0] - 1)) y = max(0, min(y, self.screen_size[1] - 1)) width = min(width, self.screen_size[0] - x) height = min(height, self.screen_size[1] - y) self.screen_region = (x, y, width, height) self.logger.info(f"录制区域已设置: {self.screen_region}") else: self.logger.info("录制模式已设置: 全屏录制") return True def get_status(self): """获取录制状态""" return { 'recording': self.sync_recording, 'session_id': self.current_session_id, 'patient_id': self.current_patient_id, 'recording_start_time': self.recording_start_time.isoformat() if self.recording_start_time else None, 'screen_size': self.screen_size, 'screen_region': self.screen_region, 'screen_fps': self.screen_fps, 'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False, 'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False } # 保持向后兼容的ScreenRecorder类 class ScreenRecorder: def __init__(self, output_dir="recordings", fps=20, quality=80, region=None): """向后兼容的屏幕录制器""" self.recording_manager = RecordingManager() self.recording_manager.screen_fps = fps self.recording_manager.set_screen_region(region) self.output_dir = output_dir # 创建输出目录 if not os.path.exists(output_dir): os.makedirs(output_dir) def start_recording(self, filename=None): """开始录制""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"screen_record_{timestamp}" # 使用文件名作为会话ID session_id = filename patient_id = "default" return self.recording_manager.start_recording(session_id, patient_id) def stop_recording(self): """停止录制""" return self.recording_manager.stop_recording() def get_status(self): """获取状态""" return self.recording_manager.get_status()