#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 屏幕录制管理器 仅保留FFmpeg录制功能 """ import os import sys import logging import subprocess import signal import base64 from pathlib import Path from datetime import datetime from typing import Dict, Any, List, Optional try: import pyautogui except ImportError: pyautogui = None try: from .utils.config_manager import ConfigManager except ImportError: from utils.config_manager import ConfigManager class RecordingManager: def __init__(self, config_manager: Optional[ConfigManager] = None): """ 初始化录制管理器 Args: config_manager: 配置管理器实例 """ self.logger = logging.getLogger(__name__) self.config_manager = config_manager or ConfigManager() # FFmpeg进程管理 self._ffmpeg_processes = {} self._ffmpeg_meta = {} # 默认参数 self.screen_fps = 25 self.screen_size = self._get_screen_size() def _get_screen_size(self): try: import pyautogui return pyautogui.size() except ImportError: return (1920, 1080) def _get_primary_screen_bounds(self) -> Dict[str, int]: try: import ctypes user32 = ctypes.windll.user32 w = user32.GetSystemMetrics(0) h = user32.GetSystemMetrics(1) return {'x': 0, 'y': 0, 'width': int(w), 'height': int(h)} except Exception: sw, sh = self.screen_size return {'x': 0, 'y': 0, 'width': int(sw), 'height': int(sh)} def start_recording_ffmpeg(self, session_id: str, patient_id: str, screen_location: List[int], fps: int = None) -> Dict[str, Any]: result = {'success': False, 'message': ''} try: x, y, w, h = screen_location bounds = self._get_primary_screen_bounds() x_clamped = max(bounds['x'], min(int(x), bounds['x'] + bounds['width'] - 1)) y_clamped = max(bounds['y'], min(int(y), bounds['y'] + bounds['height'] - 1)) max_w = (bounds['x'] + bounds['width']) - x_clamped max_h = (bounds['y'] + bounds['height']) - y_clamped w_clamped = max(1, min(int(w), int(max_w))) h_clamped = max(1, min(int(h), int(max_h))) off_x = x_clamped - bounds['x'] off_y = y_clamped - bounds['y'] file_dir = self.config_manager.get_config_value('FILEPATH', 'path') timestamp = datetime.now().strftime('%H%M%S%f')[:-3] base_path = os.path.join(file_dir, patient_id, session_id, f'video_{timestamp}') os.makedirs(base_path, exist_ok=True) screen_video_path = os.path.join(base_path, 'screen.mp4') target_fps = fps or self.screen_fps ffmpeg_path = None if self.config_manager: ffmpeg_path = ( self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_path', fallback=None) or self.config_manager.get_config_value('RECORDING', 'ffmpeg_path', fallback=None) ) if not ffmpeg_path or not os.path.isfile(str(ffmpeg_path)): base_dir = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__)) alt_path = os.path.join(base_dir, 'ffmpeg', 'bin', 'ffmpeg.exe') if os.path.isfile(alt_path): ffmpeg_path = alt_path else: result['message'] = '未配置有效的ffmpeg_path,请在配置中设置 SCREEN_RECORDING.ffmpeg_path 或 RECORDING.ffmpeg_path' return result cmd = [ str(ffmpeg_path), '-y', '-f', 'gdigrab', '-framerate', str(target_fps), '-draw_mouse', str(int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_draw_mouse', fallback='0') or '0'))), '-offset_x', str(off_x), '-offset_y', str(off_y), '-video_size', f'{w_clamped}x{h_clamped}', '-i', 'desktop', ] codec = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_codec', fallback=None) or self.config_manager.get_config_value('RECORDING', 'ffmpeg_codec', fallback=None) or 'libx264') preset = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_preset', fallback=None) or self.config_manager.get_config_value('RECORDING', 'ffmpeg_preset', fallback=None) or ('p1' if codec == 'h264_nvenc' else 'ultrafast')) threads = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_threads', fallback='2') or '2')) bframes = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_bframes', fallback='0') or '0')) gop = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_gop', fallback=str(max(1, int(target_fps*2)))) or str(max(1, int(target_fps*2))))) cmd += ['-c:v', codec] # 统一的低CPU选项 cmd += ['-preset', str(preset)] cmd += ['-bf', str(bframes)] cmd += ['-g', str(gop)] cmd += ['-pix_fmt', 'yuv420p'] cmd += ['-threads', str(threads)] cmd += ['-r', str(target_fps)] cmd += [screen_video_path] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=getattr(subprocess, 'CREATE_NEW_PROCESS_GROUP', 0) ) self._ffmpeg_processes['screen'] = proc self._ffmpeg_meta['screen'] = {'base_path': base_path, 'patient_id': patient_id, 'session_id': session_id, 'video_path': screen_video_path} result['success'] = True result['message'] = 'ffmpeg录制已启动' result['database_updates'] = { 'session_id': session_id, 'status': 'recording', 'video_paths': { 'screen_video_path': os.path.relpath(screen_video_path, file_dir) } } return result except Exception as e: result['message'] = f'ffmpeg启动失败: {e}' return result def stop_recording_ffmpeg(self, session_id: str = None) -> Dict[str, Any]: result = {'success': False, 'message': ''} try: proc = self._ffmpeg_processes.get('screen') meta = self._ffmpeg_meta.get('screen') if proc: try: if proc.stdin and proc.poll() is None: try: proc.communicate(input=b'q', timeout=2.0) except Exception: pass try: proc.send_signal(getattr(signal, 'CTRL_BREAK_EVENT', signal.SIGTERM)) except Exception: pass try: proc.terminate() except Exception: pass try: proc.wait(timeout=3.0) except Exception: try: proc.kill() except Exception: pass finally: self._ffmpeg_processes.pop('screen', None) result['success'] = True result['message'] = 'ffmpeg录制已停止' if meta: result['database_updates'] = { 'session_id': meta.get('session_id'), 'status': 'recorded' } return result except Exception as e: result['message'] = f'ffmpeg停止失败: {e}' return result def save_detection_images(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]: """ 保存前端传入的检测图片到指定目录 Args: session_id: 检测会话ID patient_id: 患者ID detection_data: 前端传入的检测数据,包含base64格式的图片数据 Returns: Dict: 包含所有采集数据的字典,符合detection_data表结构 """ # 生成采集时间戳 timestamp = datetime.now().strftime('%H%M%S%f')[:-3] # 精确到毫秒 file_path = self.config_manager.get_config_value('FILEPATH', 'path') data_dir = Path(os.path.join(file_path,patient_id, session_id, f"image_{timestamp}")) # 创建数据存储目录 data_dir.mkdir(parents=True, exist_ok=True) # 设置目录权限为777(完全权限) try: import stat os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限 self.logger.debug(f"已设置目录权限为777: {data_dir}") except Exception as perm_error: self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功") # 初始化数据字典 data = { 'session_id': session_id, 'head_pose': detection_data.get('head_pose'), 'screen_location': detection_data.get('screen_location'), 'body_pose': None, 'body_image': None, 'foot_data': detection_data.get('foot_data'), 'foot_data_image': detection_data.get('foot_data_image'), 'foot1_image': None, 'foot2_image': None, 'screen_image': None, 'timestamp': timestamp } try: # 保存图片数据 image_fields = [ ('body_image', 'body'), ('foot1_image', 'foot1'), ('foot2_image', 'foot2') ] for field, prefix in image_fields: base64_data = detection_data.get(field) if base64_data: try: # 移除base64头部信息 if ';base64,' in base64_data: base64_data = base64_data.split(';base64,')[1] # 解码base64数据 image_data = base64.b64decode(base64_data) # 生成图片文件名 filename = f'{prefix}_{timestamp}.jpg' file_path = data_dir / filename # 保存图片 with open(file_path, 'wb') as f: f.write(image_data) # 更新数据字典中的图片路径 data[field] = str(os.path.join(patient_id, session_id, f"image_{timestamp}", filename)) self.logger.debug(f'{field}保存成功: {filename}') except Exception as e: self.logger.error(f'保存{field}失败: {e}') # 完整屏幕截图--根据screen_location 进行截图 screen_image = self._capture_screen_image(data_dir, data.get('screen_location'),'screen', timestamp=timestamp) if screen_image: data['screen_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", screen_image)) # 足部压力屏幕截图——根据foot_data_image 进行截图 foot_data_image = self._capture_screen_image(data_dir, data.get('foot_data_image'),'foot_data', timestamp=timestamp) if foot_data_image: data['foot_data_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", foot_data_image)) self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}') except Exception as e: self.logger.error(f'数据保存失败: {e}') return data def _capture_screen_image(self, data_dir, screen_location,type, timestamp) -> Optional[str]: """ 采集屏幕截图,根据screen_region 进行截图 Args: data_dir: 数据存储目录路径 Returns: str: 截图文件的相对路径,失败返回None """ try: # 截取屏幕 if screen_location: # 使用指定区域截图 x, y, width, height = screen_location screenshot = pyautogui.screenshot(region=(x, y, width, height)) else: # 全屏截图 screenshot = pyautogui.screenshot() # 保存截图 from pathlib import Path screen_filename = f'{type}_{timestamp}.jpg' image_path = Path(data_dir) / screen_filename screenshot.save(str(image_path), quality=95, optimize=True) return screen_filename except Exception as e: self.logger.error(f'屏幕截图失败: {e}') return None