BodyBalanceEvaluation/backend/devices/screen_recorder.py

318 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
屏幕录制管理器
仅保留FFmpeg录制功能
"""
import os
import sys
import logging
import subprocess
import signal
import base64
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional
try:
import pyautogui
except ImportError:
pyautogui = None
try:
from .utils.config_manager import ConfigManager
except ImportError:
from utils.config_manager import ConfigManager
class RecordingManager:
def __init__(self, config_manager: Optional[ConfigManager] = None):
"""
初始化录制管理器
Args:
config_manager: 配置管理器实例
"""
self.logger = logging.getLogger(__name__)
self.config_manager = config_manager or ConfigManager()
# FFmpeg进程管理
self._ffmpeg_processes = {}
self._ffmpeg_meta = {}
# 默认参数
self.screen_fps = 25
self.screen_size = self._get_screen_size()
def _get_screen_size(self):
try:
import pyautogui
return pyautogui.size()
except ImportError:
return (1920, 1080)
def _get_primary_screen_bounds(self) -> Dict[str, int]:
try:
import ctypes
user32 = ctypes.windll.user32
w = user32.GetSystemMetrics(0)
h = user32.GetSystemMetrics(1)
return {'x': 0, 'y': 0, 'width': int(w), 'height': int(h)}
except Exception:
sw, sh = self.screen_size
return {'x': 0, 'y': 0, 'width': int(sw), 'height': int(sh)}
def start_recording_ffmpeg(self, session_id: str, patient_id: str, screen_location: List[int], fps: int = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
x, y, w, h = screen_location
bounds = self._get_primary_screen_bounds()
x_clamped = max(bounds['x'], min(int(x), bounds['x'] + bounds['width'] - 1))
y_clamped = max(bounds['y'], min(int(y), bounds['y'] + bounds['height'] - 1))
max_w = (bounds['x'] + bounds['width']) - x_clamped
max_h = (bounds['y'] + bounds['height']) - y_clamped
w_clamped = max(1, min(int(w), int(max_w)))
h_clamped = max(1, min(int(h), int(max_h)))
off_x = x_clamped - bounds['x']
off_y = y_clamped - bounds['y']
file_dir = self.config_manager.get_config_value('FILEPATH', 'path')
timestamp = datetime.now().strftime('%H%M%S%f')[:-3]
base_path = os.path.join(file_dir, patient_id, session_id, f'video_{timestamp}')
os.makedirs(base_path, exist_ok=True)
screen_video_path = os.path.join(base_path, 'screen.mp4')
target_fps = fps or self.screen_fps
ffmpeg_path = None
if self.config_manager:
ffmpeg_path = (
self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_path', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_path', fallback=None)
)
if not ffmpeg_path or not os.path.isfile(str(ffmpeg_path)):
base_dir = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__))
alt_path = os.path.join(base_dir, 'ffmpeg', 'bin', 'ffmpeg.exe')
if os.path.isfile(alt_path):
ffmpeg_path = alt_path
else:
result['message'] = '未配置有效的ffmpeg_path请在配置中设置 SCREEN_RECORDING.ffmpeg_path 或 RECORDING.ffmpeg_path'
return result
cmd = [
str(ffmpeg_path),
'-y',
'-f', 'gdigrab',
'-framerate', str(target_fps),
'-draw_mouse', str(int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_draw_mouse', fallback='0') or '0'))),
'-offset_x', str(off_x),
'-offset_y', str(off_y),
'-video_size', f'{w_clamped}x{h_clamped}',
'-i', 'desktop',
]
codec = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_codec', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_codec', fallback=None) or
'libx264')
preset = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_preset', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_preset', fallback=None) or
('p1' if codec == 'h264_nvenc' else 'ultrafast'))
threads = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_threads', fallback='2') or '2'))
bframes = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_bframes', fallback='0') or '0'))
gop = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_gop', fallback=str(max(1, int(target_fps*2)))) or str(max(1, int(target_fps*2)))))
cmd += ['-c:v', codec]
# 统一的低CPU选项
cmd += ['-preset', str(preset)]
cmd += ['-bf', str(bframes)]
cmd += ['-g', str(gop)]
cmd += ['-pix_fmt', 'yuv420p']
cmd += ['-threads', str(threads)]
cmd += ['-r', str(target_fps)]
cmd += [screen_video_path]
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=getattr(subprocess, 'CREATE_NEW_PROCESS_GROUP', 0)
)
self._ffmpeg_processes['screen'] = proc
self._ffmpeg_meta['screen'] = {'base_path': base_path, 'patient_id': patient_id, 'session_id': session_id, 'video_path': screen_video_path}
result['success'] = True
result['message'] = 'ffmpeg录制已启动'
result['database_updates'] = {
'session_id': session_id,
'status': 'recording',
'video_paths': {
'screen_video_path': os.path.relpath(screen_video_path, file_dir)
}
}
return result
except Exception as e:
result['message'] = f'ffmpeg启动失败: {e}'
return result
def stop_recording_ffmpeg(self, session_id: str = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
proc = self._ffmpeg_processes.get('screen')
meta = self._ffmpeg_meta.get('screen')
if proc:
try:
if proc.stdin and proc.poll() is None:
try:
proc.communicate(input=b'q', timeout=2.0)
except Exception:
pass
try:
proc.send_signal(getattr(signal, 'CTRL_BREAK_EVENT', signal.SIGTERM))
except Exception:
pass
try:
proc.terminate()
except Exception:
pass
try:
proc.wait(timeout=3.0)
except Exception:
try:
proc.kill()
except Exception:
pass
finally:
self._ffmpeg_processes.pop('screen', None)
result['success'] = True
result['message'] = 'ffmpeg录制已停止'
if meta:
result['database_updates'] = {
'session_id': meta.get('session_id'),
'status': 'recorded'
}
return result
except Exception as e:
result['message'] = f'ffmpeg停止失败: {e}'
return result
def save_detection_images(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]:
"""
保存前端传入的检测图片到指定目录
Args:
session_id: 检测会话ID
patient_id: 患者ID
detection_data: 前端传入的检测数据包含base64格式的图片数据
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%H%M%S%f')[:-3] # 精确到毫秒
file_path = self.config_manager.get_config_value('FILEPATH', 'path')
data_dir = Path(os.path.join(file_path,patient_id, session_id, f"image_{timestamp}"))
# 创建数据存储目录
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
self.logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': detection_data.get('head_pose'),
'screen_location': detection_data.get('screen_location'),
'body_pose': None,
'body_image': None,
'foot_data': detection_data.get('foot_data'),
'foot_data_image': detection_data.get('foot_data_image'),
'foot1_image': None,
'foot2_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# 保存图片数据
image_fields = [
('body_image', 'body'),
('foot1_image', 'foot1'),
('foot2_image', 'foot2')
]
for field, prefix in image_fields:
base64_data = detection_data.get(field)
if base64_data:
try:
# 移除base64头部信息
if ';base64,' in base64_data:
base64_data = base64_data.split(';base64,')[1]
# 解码base64数据
image_data = base64.b64decode(base64_data)
# 生成图片文件名
filename = f'{prefix}_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图片
with open(file_path, 'wb') as f:
f.write(image_data)
# 更新数据字典中的图片路径
data[field] = str(os.path.join(patient_id, session_id, f"image_{timestamp}", filename))
self.logger.debug(f'{field}保存成功: {filename}')
except Exception as e:
self.logger.error(f'保存{field}失败: {e}')
# 完整屏幕截图--根据screen_location 进行截图
screen_image = self._capture_screen_image(data_dir, data.get('screen_location'),'screen', timestamp=timestamp)
if screen_image:
data['screen_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", screen_image))
# 足部压力屏幕截图——根据foot_data_image 进行截图
foot_data_image = self._capture_screen_image(data_dir, data.get('foot_data_image'),'foot_data', timestamp=timestamp)
if foot_data_image:
data['foot_data_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", foot_data_image))
self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
self.logger.error(f'数据保存失败: {e}')
return data
def _capture_screen_image(self, data_dir, screen_location,type, timestamp) -> Optional[str]:
"""
采集屏幕截图根据screen_region 进行截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
# 截取屏幕
if screen_location:
# 使用指定区域截图
x, y, width, height = screen_location
screenshot = pyautogui.screenshot(region=(x, y, width, height))
else:
# 全屏截图
screenshot = pyautogui.screenshot()
# 保存截图
from pathlib import Path
screen_filename = f'{type}_{timestamp}.jpg'
image_path = Path(data_dir) / screen_filename
screenshot.save(str(image_path), quality=95, optimize=True)
return screen_filename
except Exception as e:
self.logger.error(f'屏幕截图失败: {e}')
return None