318 lines
13 KiB
Python
318 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
屏幕录制管理器
|
||
仅保留FFmpeg录制功能
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import logging
|
||
import subprocess
|
||
import signal
|
||
import base64
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
try:
|
||
import pyautogui
|
||
except ImportError:
|
||
pyautogui = None
|
||
|
||
try:
|
||
from .utils.config_manager import ConfigManager
|
||
except ImportError:
|
||
from utils.config_manager import ConfigManager
|
||
|
||
class RecordingManager:
|
||
def __init__(self, config_manager: Optional[ConfigManager] = None):
|
||
"""
|
||
初始化录制管理器
|
||
|
||
Args:
|
||
config_manager: 配置管理器实例
|
||
"""
|
||
self.logger = logging.getLogger(__name__)
|
||
self.config_manager = config_manager or ConfigManager()
|
||
|
||
# FFmpeg进程管理
|
||
self._ffmpeg_processes = {}
|
||
self._ffmpeg_meta = {}
|
||
|
||
# 默认参数
|
||
self.screen_fps = 25
|
||
self.screen_size = self._get_screen_size()
|
||
|
||
def _get_screen_size(self):
|
||
try:
|
||
import pyautogui
|
||
return pyautogui.size()
|
||
except ImportError:
|
||
return (1920, 1080)
|
||
|
||
def _get_primary_screen_bounds(self) -> Dict[str, int]:
|
||
try:
|
||
import ctypes
|
||
user32 = ctypes.windll.user32
|
||
w = user32.GetSystemMetrics(0)
|
||
h = user32.GetSystemMetrics(1)
|
||
return {'x': 0, 'y': 0, 'width': int(w), 'height': int(h)}
|
||
except Exception:
|
||
sw, sh = self.screen_size
|
||
return {'x': 0, 'y': 0, 'width': int(sw), 'height': int(sh)}
|
||
|
||
def start_recording_ffmpeg(self, session_id: str, patient_id: str, screen_location: List[int], fps: int = None) -> Dict[str, Any]:
|
||
result = {'success': False, 'message': ''}
|
||
try:
|
||
x, y, w, h = screen_location
|
||
bounds = self._get_primary_screen_bounds()
|
||
x_clamped = max(bounds['x'], min(int(x), bounds['x'] + bounds['width'] - 1))
|
||
y_clamped = max(bounds['y'], min(int(y), bounds['y'] + bounds['height'] - 1))
|
||
max_w = (bounds['x'] + bounds['width']) - x_clamped
|
||
max_h = (bounds['y'] + bounds['height']) - y_clamped
|
||
w_clamped = max(1, min(int(w), int(max_w)))
|
||
h_clamped = max(1, min(int(h), int(max_h)))
|
||
off_x = x_clamped - bounds['x']
|
||
off_y = y_clamped - bounds['y']
|
||
|
||
file_dir = self.config_manager.get_config_value('FILEPATH', 'path')
|
||
timestamp = datetime.now().strftime('%H%M%S%f')[:-3]
|
||
base_path = os.path.join(file_dir, patient_id, session_id, f'video_{timestamp}')
|
||
os.makedirs(base_path, exist_ok=True)
|
||
screen_video_path = os.path.join(base_path, 'screen.mp4')
|
||
target_fps = fps or self.screen_fps
|
||
ffmpeg_path = None
|
||
if self.config_manager:
|
||
ffmpeg_path = (
|
||
self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_path', fallback=None) or
|
||
self.config_manager.get_config_value('RECORDING', 'ffmpeg_path', fallback=None)
|
||
)
|
||
if not ffmpeg_path or not os.path.isfile(str(ffmpeg_path)):
|
||
base_dir = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__))
|
||
alt_path = os.path.join(base_dir, 'ffmpeg', 'bin', 'ffmpeg.exe')
|
||
if os.path.isfile(alt_path):
|
||
ffmpeg_path = alt_path
|
||
else:
|
||
result['message'] = '未配置有效的ffmpeg_path,请在配置中设置 SCREEN_RECORDING.ffmpeg_path 或 RECORDING.ffmpeg_path'
|
||
return result
|
||
|
||
cmd = [
|
||
str(ffmpeg_path),
|
||
'-y',
|
||
'-f', 'gdigrab',
|
||
'-framerate', str(target_fps),
|
||
'-draw_mouse', str(int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_draw_mouse', fallback='0') or '0'))),
|
||
'-offset_x', str(off_x),
|
||
'-offset_y', str(off_y),
|
||
'-video_size', f'{w_clamped}x{h_clamped}',
|
||
'-i', 'desktop',
|
||
]
|
||
|
||
codec = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_codec', fallback=None) or
|
||
self.config_manager.get_config_value('RECORDING', 'ffmpeg_codec', fallback=None) or
|
||
'libx264')
|
||
preset = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_preset', fallback=None) or
|
||
self.config_manager.get_config_value('RECORDING', 'ffmpeg_preset', fallback=None) or
|
||
('p1' if codec == 'h264_nvenc' else 'ultrafast'))
|
||
threads = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_threads', fallback='2') or '2'))
|
||
bframes = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_bframes', fallback='0') or '0'))
|
||
gop = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_gop', fallback=str(max(1, int(target_fps*2)))) or str(max(1, int(target_fps*2)))))
|
||
|
||
cmd += ['-c:v', codec]
|
||
# 统一的低CPU选项
|
||
cmd += ['-preset', str(preset)]
|
||
cmd += ['-bf', str(bframes)]
|
||
cmd += ['-g', str(gop)]
|
||
cmd += ['-pix_fmt', 'yuv420p']
|
||
cmd += ['-threads', str(threads)]
|
||
cmd += ['-r', str(target_fps)]
|
||
cmd += [screen_video_path]
|
||
proc = subprocess.Popen(
|
||
cmd,
|
||
stdin=subprocess.PIPE,
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
creationflags=getattr(subprocess, 'CREATE_NEW_PROCESS_GROUP', 0)
|
||
)
|
||
self._ffmpeg_processes['screen'] = proc
|
||
self._ffmpeg_meta['screen'] = {'base_path': base_path, 'patient_id': patient_id, 'session_id': session_id, 'video_path': screen_video_path}
|
||
result['success'] = True
|
||
result['message'] = 'ffmpeg录制已启动'
|
||
result['database_updates'] = {
|
||
'session_id': session_id,
|
||
'status': 'recording',
|
||
'video_paths': {
|
||
'screen_video_path': os.path.relpath(screen_video_path, file_dir)
|
||
}
|
||
}
|
||
return result
|
||
except Exception as e:
|
||
result['message'] = f'ffmpeg启动失败: {e}'
|
||
return result
|
||
|
||
def stop_recording_ffmpeg(self, session_id: str = None) -> Dict[str, Any]:
|
||
result = {'success': False, 'message': ''}
|
||
try:
|
||
proc = self._ffmpeg_processes.get('screen')
|
||
meta = self._ffmpeg_meta.get('screen')
|
||
if proc:
|
||
try:
|
||
if proc.stdin and proc.poll() is None:
|
||
try:
|
||
proc.communicate(input=b'q', timeout=2.0)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
proc.send_signal(getattr(signal, 'CTRL_BREAK_EVENT', signal.SIGTERM))
|
||
except Exception:
|
||
pass
|
||
try:
|
||
proc.terminate()
|
||
except Exception:
|
||
pass
|
||
try:
|
||
proc.wait(timeout=3.0)
|
||
except Exception:
|
||
try:
|
||
proc.kill()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
self._ffmpeg_processes.pop('screen', None)
|
||
result['success'] = True
|
||
result['message'] = 'ffmpeg录制已停止'
|
||
if meta:
|
||
result['database_updates'] = {
|
||
'session_id': meta.get('session_id'),
|
||
'status': 'recorded'
|
||
}
|
||
return result
|
||
except Exception as e:
|
||
result['message'] = f'ffmpeg停止失败: {e}'
|
||
return result
|
||
|
||
def save_detection_images(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
保存前端传入的检测图片到指定目录
|
||
|
||
Args:
|
||
session_id: 检测会话ID
|
||
patient_id: 患者ID
|
||
detection_data: 前端传入的检测数据,包含base64格式的图片数据
|
||
|
||
Returns:
|
||
Dict: 包含所有采集数据的字典,符合detection_data表结构
|
||
"""
|
||
# 生成采集时间戳
|
||
timestamp = datetime.now().strftime('%H%M%S%f')[:-3] # 精确到毫秒
|
||
file_path = self.config_manager.get_config_value('FILEPATH', 'path')
|
||
data_dir = Path(os.path.join(file_path,patient_id, session_id, f"image_{timestamp}"))
|
||
# 创建数据存储目录
|
||
data_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 设置目录权限为777(完全权限)
|
||
try:
|
||
import stat
|
||
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
|
||
self.logger.debug(f"已设置目录权限为777: {data_dir}")
|
||
except Exception as perm_error:
|
||
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
|
||
|
||
# 初始化数据字典
|
||
data = {
|
||
'session_id': session_id,
|
||
'head_pose': detection_data.get('head_pose'),
|
||
'screen_location': detection_data.get('screen_location'),
|
||
'body_pose': None,
|
||
'body_image': None,
|
||
'foot_data': detection_data.get('foot_data'),
|
||
'foot_data_image': detection_data.get('foot_data_image'),
|
||
'foot1_image': None,
|
||
'foot2_image': None,
|
||
'screen_image': None,
|
||
'timestamp': timestamp
|
||
}
|
||
|
||
try:
|
||
# 保存图片数据
|
||
image_fields = [
|
||
('body_image', 'body'),
|
||
('foot1_image', 'foot1'),
|
||
('foot2_image', 'foot2')
|
||
]
|
||
|
||
for field, prefix in image_fields:
|
||
base64_data = detection_data.get(field)
|
||
if base64_data:
|
||
try:
|
||
# 移除base64头部信息
|
||
if ';base64,' in base64_data:
|
||
base64_data = base64_data.split(';base64,')[1]
|
||
|
||
# 解码base64数据
|
||
image_data = base64.b64decode(base64_data)
|
||
|
||
# 生成图片文件名
|
||
filename = f'{prefix}_{timestamp}.jpg'
|
||
file_path = data_dir / filename
|
||
|
||
# 保存图片
|
||
with open(file_path, 'wb') as f:
|
||
f.write(image_data)
|
||
|
||
# 更新数据字典中的图片路径
|
||
|
||
data[field] = str(os.path.join(patient_id, session_id, f"image_{timestamp}", filename))
|
||
self.logger.debug(f'{field}保存成功: {filename}')
|
||
|
||
except Exception as e:
|
||
self.logger.error(f'保存{field}失败: {e}')
|
||
# 完整屏幕截图--根据screen_location 进行截图
|
||
screen_image = self._capture_screen_image(data_dir, data.get('screen_location'),'screen', timestamp=timestamp)
|
||
if screen_image:
|
||
data['screen_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", screen_image))
|
||
# 足部压力屏幕截图——根据foot_data_image 进行截图
|
||
foot_data_image = self._capture_screen_image(data_dir, data.get('foot_data_image'),'foot_data', timestamp=timestamp)
|
||
if foot_data_image:
|
||
data['foot_data_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", foot_data_image))
|
||
|
||
self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}')
|
||
|
||
except Exception as e:
|
||
self.logger.error(f'数据保存失败: {e}')
|
||
|
||
return data
|
||
|
||
|
||
def _capture_screen_image(self, data_dir, screen_location,type, timestamp) -> Optional[str]:
|
||
"""
|
||
采集屏幕截图,根据screen_region 进行截图
|
||
|
||
Args:
|
||
data_dir: 数据存储目录路径
|
||
|
||
Returns:
|
||
str: 截图文件的相对路径,失败返回None
|
||
"""
|
||
try:
|
||
# 截取屏幕
|
||
if screen_location:
|
||
# 使用指定区域截图
|
||
x, y, width, height = screen_location
|
||
screenshot = pyautogui.screenshot(region=(x, y, width, height))
|
||
else:
|
||
# 全屏截图
|
||
screenshot = pyautogui.screenshot()
|
||
|
||
# 保存截图
|
||
from pathlib import Path
|
||
screen_filename = f'{type}_{timestamp}.jpg'
|
||
image_path = Path(data_dir) / screen_filename
|
||
screenshot.save(str(image_path), quality=95, optimize=True)
|
||
|
||
return screen_filename
|
||
|
||
except Exception as e:
|
||
self.logger.error(f'屏幕截图失败: {e}')
|
||
return None |