BodyBalanceEvaluation/backend/devices/screen_recorder.py

1406 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
综合录制管理器
支持屏幕录制和足部视频录制
"""
import cv2
import numpy as np
import pyautogui
import threading
import time
from datetime import datetime
import os
import logging
import json
import base64
from pathlib import Path
from typing import Optional, Dict, Any, List
import sys
import subprocess
import signal
import queue
# 移除psutil导入不再需要性能监控
import gc
try:
from .camera_manager import CameraManager
from .femtobolt_manager import FemtoBoltManager
from .pressure_manager import PressureManager
from .utils.config_manager import ConfigManager
except ImportError:
from camera_manager import CameraManager
from femtobolt_manager import FemtoBoltManager
from pressure_manager import PressureManager
from utils.config_manager import ConfigManager
class RecordingManager:
def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None,
femtobolt_manager: Optional[FemtoBoltManager] = None,
pressure_manager: Optional[PressureManager] = None,
config_manager: Optional[ConfigManager] = None):
"""
初始化录制管理器
Args:
camera_manager: 相机管理器实例
db_manager: 数据库管理器实例
femtobolt_manager: FemtoBolt深度相机管理器实例
pressure_manager: 压力传感器管理器实例
config_manager: 配置管理器实例
"""
self.camera_manager = camera_manager
self.db_manager = db_manager
self.femtobolt_manager = femtobolt_manager
self.pressure_manager = pressure_manager
# 配置管理
self.config_manager = config_manager or ConfigManager()
# 录制状态
self.sync_recording = False
self.is_recording = False
self.recording_stop_event = threading.Event()
# 会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
# 视频写入器
self.feet_video_writer = None
self.screen_video_writer = None
self.femtobolt_video_writer = None
# 录制线程
self.feet_recording_thread = None
self.screen_recording_thread = None
self.femtobolt_recording_thread = None
self.camera1_recording_thread = None
self.camera2_recording_thread = None
# 共享屏幕采集资源
self._shared_screen_thread = None
self._screen_capture_stop_event = threading.Event()
self._screen_frame_lock = threading.Lock()
self._latest_screen_frame = None
self._latest_screen_time = 0.0
self._screen_frame_event = threading.Event()
self._ffmpeg_processes = {}
self._ffmpeg_meta = {}
self._threaded_queues = {}
self._threaded_threads = {}
self._threaded_stop_events = {}
# 独立的录制参数配置
self.screen_fps = 25 # 屏幕录制帧率
self.camera1_fps = 20 # 相机1录制帧率
self.camera2_fps = 20 # 相机2录制帧率
self.femtobolt_fps = 15 # FemtoBolt录制帧率
# 录制区域
self.screen_region = None
self.camera1_region = None
self.camera2_region = None
self.femtobolt_region = None
# 屏幕尺寸
self.screen_size = pyautogui.size()
# 输出目录
self.screen_output_dir = None
self.camera_output_dir = None
self.femtobolt_output_dir = None
# 视频参数
self.MAX_FRAME_SIZE = (1280, 720) # 最大帧尺寸
# 独立的帧率控制参数
self.screen_current_fps = self.screen_fps
self.camera1_current_fps = self.camera1_fps
self.camera2_current_fps = self.camera2_fps
self.femtobolt_current_fps = self.femtobolt_fps
# 区域大小阈值配置 - 根据实际录制场景优化
self.SMALL_REGION_THRESHOLD = 400 * 300 # 小区域阈值 (120,000像素)
self.MEDIUM_REGION_THRESHOLD = 800 * 600 # 中等区域阈值 (480,000像素)
self.LARGE_REGION_THRESHOLD = 1600 * 900 # 大区域阈值 (1,440,000像素)
# 基于区域大小的帧率配置 - 大幅降低帧率以减小文件大小
self.fps_config = {
'small': {'screen': 12, 'camera': 25, 'femtobolt': 20}, # 小区域:低帧率
'medium': {'screen': 10, 'camera': 22, 'femtobolt': 18}, # 中等区域:更低帧率
'large': {'screen': 8, 'camera': 18, 'femtobolt': 15}, # 大区域:很低帧率
'xlarge': {'screen': 6, 'camera': 15, 'femtobolt': 12} # 超大区域:极低帧率
}
# 移除CPU监控和性能优化参数使用固定帧率控制
# 录制同步控制
self.recording_sync_barrier = None # 同步屏障
self.recording_threads = {} # 录制线程字典
self.recording_start_sync = threading.Event() # 录制开始同步事件
self.global_recording_start_time = None # 全局录制开始时间
# 日志
self.logger = logging.getLogger(__name__)
self.logger.info("录制管理器初始化完成")
# 移除系统性能检查方法
def _calculate_region_size_category(self, region):
"""
根据录制区域大小计算区域类别
Args:
region: 录制区域 (x, y, width, height)
Returns:
str: 区域大小类别 ('small', 'medium', 'large', 'xlarge')
"""
if not region or len(region) != 4:
return 'medium' # 默认中等大小
_, _, width, height = region
area = width * height
if area <= self.SMALL_REGION_THRESHOLD:
return 'small'
elif area <= self.MEDIUM_REGION_THRESHOLD:
return 'medium'
elif area <= self.LARGE_REGION_THRESHOLD:
return 'large'
else:
return 'xlarge'
def _set_adaptive_fps_by_region(self, recording_type, region):
"""
根据录制区域大小设置自适应帧率
Args:
recording_type: 录制类型 ('screen', 'camera1', 'camera2', 'femtobolt')
region: 录制区域 (x, y, width, height)
"""
size_category = self._calculate_region_size_category(region)
lookup_key = 'camera' if recording_type in ['camera1', 'camera2'] else recording_type
target_fps = self.fps_config[size_category][lookup_key]
# 计算区域面积用于日志
_, _, width, height = region
area = width * height
if recording_type == 'screen':
self.screen_current_fps = target_fps
elif recording_type == 'camera1':
self.camera1_current_fps = target_fps
elif recording_type == 'camera2':
self.camera2_current_fps = target_fps
elif recording_type == 'femtobolt':
self.femtobolt_current_fps = target_fps
self.logger.info(f"{recording_type}录制区域解包成功: x={region[0]}, y={region[1]}, w={width}, h={height}")
self.logger.info(f"{recording_type}录制区域分析: 面积={area:,}像素, 类别={size_category}, 优化帧率={target_fps}fps")
# 如果是大区域,提示将启用性能优化
if size_category in ['large', 'xlarge']:
self.logger.info(f"{recording_type}大区域检测: 将启用降采样和压缩优化以提升性能")
# 移除动态性能调整方法,使用固定帧率控制
def _optimize_frame_for_large_region(self, frame, region, recording_type):
"""
为大区域录制优化帧数据
Args:
frame: 原始帧数据
region: 录制区域
recording_type: 录制类型
Returns:
优化后的帧数据
"""
if frame is None:
return None
# 屏幕录制优先保证清晰度:不做降采样,避免字体和图标模糊
if recording_type == 'screen':
return frame
size_category = self._calculate_region_size_category(region)
_, _, width, height = region
if size_category == 'xlarge':
scale = 0.5
elif size_category == 'large':
scale = 0.6
elif size_category == 'medium':
scale = 0.75
else:
scale = 0.85
if scale < 1.0:
new_width = max(1, int(width * scale))
new_height = max(1, int(height * scale))
downsampled = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
self.logger.debug(f"{recording_type}区域降采样({size_category}): {width}x{height} -> {new_width}x{new_height}")
frame = cv2.resize(downsampled, (width, height), interpolation=cv2.INTER_LINEAR)
return frame
def _start_shared_screen_capture(self, fps: int):
"""启动共享屏幕采集线程,单次采集整屏并供各录制线程区域裁剪使用"""
# 如果线程已在运行,直接返回
if self._shared_screen_thread and self._shared_screen_thread.is_alive():
return
self._screen_capture_stop_event.clear()
def _capture_loop():
interval = 1.0 / max(1, fps)
while not self._screen_capture_stop_event.is_set() and self.sync_recording:
start_t = time.time()
try:
screenshot = pyautogui.screenshot() # 全屏一次采集
full_frame = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)
with self._screen_frame_lock:
self._latest_screen_frame = full_frame
self._latest_screen_time = start_t
# 通知有新帧
self._screen_frame_event.set()
except Exception as e:
self.logger.error(f'共享屏幕采集错误: {e}')
time.sleep(0.01)
# 精确控制帧率
elapsed = time.time() - start_t
sleep_t = interval - elapsed
if sleep_t > 0:
time.sleep(sleep_t)
self._shared_screen_thread = threading.Thread(target=_capture_loop, daemon=True, name='SharedScreenCaptureThread')
self._shared_screen_thread.start()
def _stop_shared_screen_capture(self):
"""停止共享屏幕采集线程并清理资源"""
self._screen_capture_stop_event.set()
if self._shared_screen_thread and self._shared_screen_thread.is_alive():
self._shared_screen_thread.join(timeout=2.0)
self._shared_screen_thread = None
with self._screen_frame_lock:
self._latest_screen_frame = None
self._latest_screen_time = 0.0
self._screen_frame_event.clear()
def _get_latest_screen_frame(self):
"""线程安全获取最新整屏帧"""
with self._screen_frame_lock:
return None if self._latest_screen_frame is None else self._latest_screen_frame.copy()
def _get_primary_screen_bounds(self) -> Dict[str, int]:
try:
import ctypes
user32 = ctypes.windll.user32
w = user32.GetSystemMetrics(0)
h = user32.GetSystemMetrics(1)
return {'x': 0, 'y': 0, 'width': int(w), 'height': int(h)}
except Exception:
sw, sh = self.screen_size
return {'x': 0, 'y': 0, 'width': int(sw), 'height': int(sh)}
def _get_virtual_desktop_bounds(self) -> Dict[str, int]:
try:
import ctypes
user32 = ctypes.windll.user32
x = user32.GetSystemMetrics(76) # SM_XVIRTUALSCREEN
y = user32.GetSystemMetrics(77) # SM_YVIRTUALSCREEN
w = user32.GetSystemMetrics(78) # SM_CXVIRTUALSCREEN
h = user32.GetSystemMetrics(79) # SM_CYVIRTUALSCREEN
return {'x': int(x), 'y': int(y), 'width': int(w), 'height': int(h)}
except Exception:
# 回退:使用主屏尺寸,从(0,0)开始
sw, sh = self.screen_size
return {'x': 0, 'y': 0, 'width': int(sw), 'height': int(sh)}
def start_recording_ffmpeg(self, session_id: str, patient_id: str, screen_location: List[int], fps: int = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
x, y, w, h = screen_location
bounds = self._get_primary_screen_bounds()
x_clamped = max(bounds['x'], min(int(x), bounds['x'] + bounds['width'] - 1))
y_clamped = max(bounds['y'], min(int(y), bounds['y'] + bounds['height'] - 1))
max_w = (bounds['x'] + bounds['width']) - x_clamped
max_h = (bounds['y'] + bounds['height']) - y_clamped
w_clamped = max(1, min(int(w), int(max_w)))
h_clamped = max(1, min(int(h), int(max_h)))
off_x = x_clamped - bounds['x']
off_y = y_clamped - bounds['y']
file_dir = self.config_manager.get_config_value('FILEPATH', 'path')
timestamp = datetime.now().strftime('%H%M%S%f')[:-3]
base_path = os.path.join(file_dir, patient_id, session_id, f'video_{timestamp}')
os.makedirs(base_path, exist_ok=True)
screen_video_path = os.path.join(base_path, 'screen.mp4')
target_fps = fps or self.screen_fps
ffmpeg_path = None
if self.config_manager:
ffmpeg_path = (
self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_path', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_path', fallback=None)
)
if not ffmpeg_path or not os.path.isfile(str(ffmpeg_path)):
base_dir = os.path.dirname(sys.executable) if getattr(sys, 'frozen', False) else os.path.dirname(os.path.abspath(__file__))
alt_path = os.path.join(base_dir, 'ffmpeg', 'bin', 'ffmpeg.exe')
if os.path.isfile(alt_path):
ffmpeg_path = alt_path
else:
result['message'] = '未配置有效的ffmpeg_path请在配置中设置 SCREEN_RECORDING.ffmpeg_path 或 RECORDING.ffmpeg_path'
return result
cmd = [
str(ffmpeg_path),
'-y',
'-f', 'gdigrab',
'-framerate', str(target_fps),
'-draw_mouse', str(int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_draw_mouse', fallback='0') or '0'))),
'-offset_x', str(off_x),
'-offset_y', str(off_y),
'-video_size', f'{w_clamped}x{h_clamped}',
'-i', 'desktop',
]
codec = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_codec', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_codec', fallback=None) or
'libx264')
preset = (self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_preset', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'ffmpeg_preset', fallback=None) or
('p1' if codec == 'h264_nvenc' else 'ultrafast'))
threads = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_threads', fallback='2') or '2'))
bframes = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_bframes', fallback='0') or '0'))
gop = int((self.config_manager.get_config_value('SCREEN_RECORDING', 'ffmpeg_gop', fallback=str(max(1, int(target_fps*2)))) or str(max(1, int(target_fps*2)))))
cmd += ['-c:v', codec]
# 统一的低CPU选项
cmd += ['-preset', str(preset)]
cmd += ['-bf', str(bframes)]
cmd += ['-g', str(gop)]
cmd += ['-pix_fmt', 'yuv420p']
cmd += ['-threads', str(threads)]
cmd += ['-r', str(target_fps)]
cmd += [screen_video_path]
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=getattr(subprocess, 'CREATE_NEW_PROCESS_GROUP', 0)
)
self._ffmpeg_processes['screen'] = proc
self._ffmpeg_meta['screen'] = {'base_path': base_path, 'patient_id': patient_id, 'session_id': session_id, 'video_path': screen_video_path}
result['success'] = True
result['message'] = 'ffmpeg录制已启动'
result['database_updates'] = {
'session_id': session_id,
'status': 'recording',
'video_paths': {
'screen_video_path': os.path.relpath(screen_video_path, file_dir)
}
}
return result
except Exception as e:
result['message'] = f'ffmpeg启动失败: {e}'
return result
def stop_recording_ffmpeg(self, session_id: str = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
proc = self._ffmpeg_processes.get('screen')
meta = self._ffmpeg_meta.get('screen')
if proc:
try:
if proc.stdin and proc.poll() is None:
try:
proc.communicate(input=b'q', timeout=2.0)
except Exception:
pass
try:
proc.send_signal(getattr(signal, 'CTRL_BREAK_EVENT', signal.SIGTERM))
except Exception:
pass
try:
proc.terminate()
except Exception:
pass
try:
proc.wait(timeout=3.0)
except Exception:
try:
proc.kill()
except Exception:
pass
finally:
self._ffmpeg_processes.pop('screen', None)
result['success'] = True
result['message'] = 'ffmpeg录制已停止'
if meta:
result['database_updates'] = {
'session_id': meta.get('session_id'),
'status': 'recorded'
}
return result
except Exception as e:
result['message'] = f'ffmpeg停止失败: {e}'
return result
def start_recording_threaded(self, session_id: str, patient_id: str, screen_location: List[int], fps: int = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
x, y, w, h = screen_location
file_dir = self.config_manager.get_config_value('FILEPATH', 'path')
timestamp = datetime.now().strftime('%H%M%S%f')[:-3]
base_path = os.path.join(file_dir, patient_id, session_id, f'video_{timestamp}')
os.makedirs(base_path, exist_ok=True)
screen_video_path = os.path.join(base_path, 'screen.mp4')
target_fps = fps or self.screen_fps
try:
fourcc = cv2.VideoWriter_fourcc(*'avc1')
except Exception:
try:
fourcc = cv2.VideoWriter_fourcc(*'H264')
except Exception:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
vw = cv2.VideoWriter(screen_video_path, fourcc, target_fps, (w, h))
if not vw or not vw.isOpened():
result['message'] = 'VideoWriter初始化失败'
return result
q = queue.Queue(maxsize=target_fps * 2)
stop_event = threading.Event()
self._threaded_queues['screen'] = q
self._threaded_stop_events['screen'] = stop_event
def _capture():
self._start_shared_screen_capture(target_fps)
while not stop_event.is_set():
full = self._get_latest_screen_frame()
if full is None:
time.sleep(0.001)
continue
H, W = full.shape[:2]
x0 = max(0, min(x, W - 1))
y0 = max(0, min(y, H - 1))
x1 = max(0, min(x0 + w, W))
y1 = max(0, min(y0 + h, H))
if x1 > x0 and y1 > y0:
crop = full[y0:y1, x0:x1]
if crop.shape[1] != w or crop.shape[0] != h:
frame = cv2.resize(crop, (w, h), interpolation=cv2.INTER_AREA)
else:
frame = crop
try:
q.put_nowait(frame)
except Exception:
try:
_ = q.get_nowait()
except Exception:
pass
try:
q.put_nowait(frame)
except Exception:
pass
else:
time.sleep(0.002)
def _writer():
last = time.time()
interval = 1.0 / max(1, target_fps)
while not stop_event.is_set():
try:
frame = q.get(timeout=0.05)
except Exception:
frame = None
now = time.time()
if frame is not None:
vw.write(frame)
else:
if now - last < interval:
time.sleep(interval - (now - last))
last = now
try:
vw.release()
except Exception:
pass
t1 = threading.Thread(target=_capture, daemon=True, name='ThreadedScreenCapture')
t2 = threading.Thread(target=_writer, daemon=True, name='ThreadedScreenWriter')
self._threaded_threads['screen'] = (t1, t2)
t1.start(); t2.start()
result['success'] = True
result['message'] = '线程录制已启动'
result['database_updates'] = {
'session_id': session_id,
'status': 'recording',
'video_paths': {
'screen_video_path': os.path.relpath(screen_video_path, file_dir)
}
}
return result
except Exception as e:
result['message'] = f'线程录制启动失败: {e}'
return result
def stop_recording_threaded(self, session_id: str = None) -> Dict[str, Any]:
result = {'success': False, 'message': ''}
try:
stop_event = self._threaded_stop_events.get('screen')
threads = self._threaded_threads.get('screen')
if stop_event:
stop_event.set()
if threads:
for t in threads:
try:
t.join(timeout=2.0)
except Exception:
pass
self._threaded_threads.pop('screen', None)
self._stop_shared_screen_capture()
result['success'] = True
result['message'] = '线程录制已停止'
return result
except Exception as e:
result['message'] = f'线程录制停止失败: {e}'
return result
def start_recording(self, session_id: str, patient_id: str, screen_location: List[int], camera1_location: List[int], camera2_location: List[int], femtobolt_location: List[int], recording_types: List[str] = None) -> Dict[str, Any]:
"""
启动同步录制
Args:
session_id: 检测会话ID
patient_id: 患者ID
screen_location: 屏幕录制区域 [x, y, w, h]
camera1_location: 相机1录制区域 [x, y, w, h]
camera2_location: 相机2录制区域 [x, y, w, h]
femtobolt_location: FemtoBolt录制区域 [x, y, w, h]
recording_types: 录制类型列表 ['screen', 'camera', 'feet', 'femtobolt'],默认全部录制
Returns:
Dict: 录制启动状态和信息
"""
result = {
'success': False,
'session_id': session_id,
'patient_id': patient_id,
'recording_start_time': None,
'video_paths': {
'camera1_video': None,
'camera2_video': None,
'feet_video': None,
'screen_video': None,
'femtobolt_video': None
},
'message': ''
}
try:
# 检查是否已在录制
if self.sync_recording:
result['message'] = f'已在录制中当前会话ID: {self.current_session_id}'
return result
# 设置默认录制类型
recording_types = ['screen']
# 验证录制区域参数(仅对启用的录制类型进行验证)
if 'screen' in recording_types:
if not screen_location or not isinstance(screen_location, list) or len(screen_location) != 4:
result['success'] = False
result['message'] = '屏幕录制区域参数无效或缺失必须是包含4个元素的数组[x, y, w, h]'
return result
if 'camera1' in recording_types:
if not camera1_location or not isinstance(camera1_location, list) or len(camera1_location) != 4:
result['success'] = False
result['message'] = '相机1录制区域参数无效或缺失必须是包含4个元素的数组[x, y, w, h]'
return result
if 'camera2' in recording_types:
if not camera2_location or not isinstance(camera2_location, list) or len(camera2_location) != 4:
result['success'] = False
result['message'] = '相机2录制区域参数无效或缺失必须是包含4个元素的数组[x, y, w, h]'
return result
if 'femtobolt' in recording_types:
if not femtobolt_location or not isinstance(femtobolt_location, list) or len(femtobolt_location) != 4:
result['success'] = False
result['message'] = 'FemtoBolt录制区域参数无效或缺失必须是包含4个元素的数组[x, y, w, h]'
return result
# 设置录制参数
self.current_session_id = session_id
# self.logger.info(f'检测sessionID................: {self.current_session_id}')
self.current_patient_id = patient_id
self.screen_region = tuple(screen_location) # [x, y, w, h] -> (x, y, w, h)
self.camera1_region = tuple(camera1_location) # [x, y, w, h] -> (x, y, w, h)
self.camera2_region = tuple(camera2_location) # [x, y, w, h] -> (x, y, w, h)
self.femtobolt_region = tuple(femtobolt_location) # [x, y, w, h] -> (x, y, w, h)
strategy = None
if self.config_manager:
strategy = (
self.config_manager.get_config_value('SCREEN_RECORDING', 'strategy', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'screen_strategy', fallback=None)
)
if strategy:
strategy = str(strategy).lower()
if strategy in ['ffmpeg', 'threaded']:
self.sync_recording = True
if strategy == 'ffmpeg':
return self.start_recording_ffmpeg(session_id, patient_id, screen_location, fps=self.screen_fps)
else:
return self.start_recording_threaded(session_id, patient_id, screen_location, fps=self.screen_fps)
# 根据录制区域大小设置自适应帧率
if 'screen' in recording_types:
self._set_adaptive_fps_by_region('screen', self.screen_region)
if 'camera1' in recording_types:
self._set_adaptive_fps_by_region('camera1', self.camera1_region)
if 'camera2' in recording_types:
self._set_adaptive_fps_by_region('camera2', self.camera2_region)
if 'femtobolt' in recording_types:
self._set_adaptive_fps_by_region('femtobolt', self.femtobolt_region)
# 设置录制同步
active_recording_count = len([t for t in recording_types if t in ['screen', 'camera1', 'camera2', 'femtobolt']])
self.recording_sync_barrier = threading.Barrier(active_recording_count)
self.recording_start_sync.clear()
self.global_recording_start_time = None
self.recording_start_time = datetime.now()
# 创建主存储目录
timestamp = datetime.now().strftime('%H%M%S%f')[:-3] # 精确到毫秒
file_dir = self.config_manager.get_config_value('FILEPATH', 'path')
base_path = os.path.join(file_dir, patient_id, session_id,f'video_{timestamp}')
db_base_path = os.path.join(patient_id, session_id,f'video_{timestamp}')
try:
# 设置目录权限
self._set_directory_permissions(base_path)
os.makedirs(base_path, exist_ok=True)
except Exception as dir_error:
self.logger.error(f'创建录制目录失败: {base_path}, 错误: {dir_error}')
result['success'] = False
result['message'] = f'创建录制目录失败: {dir_error}'
return result
camera1_video_path = os.path.join(base_path, 'camera1.mp4')
camera2_video_path = os.path.join(base_path, 'camera2.mp4')
screen_video_path = os.path.join(base_path, 'screen.mp4')
femtobolt_video_path = os.path.join(base_path, 'femtobolt.mp4')
# 准备数据库更新信息,返回给调用方统一处理
result['database_updates'] = {
'session_id': session_id,
'status': 'recording',
'video_paths': {
'camera1_video_path': None,
'camera2_video_path': None,
'screen_video_path': os.path.join(db_base_path, 'screen.mp4'),
'femtobolt_video_path': None
}
}
self.logger.debug(f'数据库更新信息已准备 - 会话ID: {session_id}')
# 视频编码参数 - 使用浏览器兼容的H.264编解码器
# 优先使用H.264编码器以确保浏览器兼容性
try:
fourcc = cv2.VideoWriter_fourcc(*'avc1') # H.264编码器,浏览器兼容性最好
except:
try:
fourcc = cv2.VideoWriter_fourcc(*'H264') # 备选H.264编码器
except:
try:
fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 备选编解码器
except:
fourcc = cv2.VideoWriter_fourcc(*'MJPG') # 最后备选
# 根据录制类型选择性地初始化视频写入器,使用各自的自适应帧率
self.screen_video_writer = None
self.femtobolt_video_writer = None
self.camera1_video_writer = None
self.camera2_video_writer = None
if 'camera1' in recording_types:
self.camera1_video_writer = cv2.VideoWriter(
camera1_video_path, fourcc, self.camera1_current_fps, (self.camera1_region[2], self.camera1_region[3])
)
self.logger.info(f'相机1视频写入器使用帧率: {self.camera1_current_fps}fps')
if 'camera2' in recording_types:
self.camera2_video_writer = cv2.VideoWriter(
camera2_video_path, fourcc, self.camera2_current_fps, (self.camera2_region[2], self.camera2_region[3])
)
self.logger.info(f'相机2视频写入器使用帧率: {self.camera2_current_fps}fps')
if 'screen' in recording_types:
self.screen_video_writer = cv2.VideoWriter(
screen_video_path, fourcc, self.screen_current_fps, (self.screen_region[2], self.screen_region[3])
)
self.logger.info(f'屏幕视频写入器使用帧率: {self.screen_current_fps}fps')
if 'femtobolt' in recording_types:
self.femtobolt_video_writer = cv2.VideoWriter(
femtobolt_video_path, fourcc, self.femtobolt_current_fps, (self.femtobolt_region[2], self.femtobolt_region[3])
)
self.logger.info(f'FemtoBolt视频写入器使用帧率: {self.femtobolt_current_fps}fps')
# 检查相机1视频写入器
if 'camera1' in recording_types:
if self.camera1_video_writer and self.camera1_video_writer.isOpened():
self.logger.info(f'相机1视频写入器初始化成功: {camera1_video_path}')
else:
self.logger.error(f'相机1视频写入器初始化失败: {camera1_video_path}')
else:
self.logger.info('相机1录制功能已禁用')
# 检查相机2视频写入器
if 'camera2' in recording_types:
if self.camera2_video_writer and self.camera2_video_writer.isOpened():
self.logger.info(f'相机2视频写入器初始化成功: {camera2_video_path}')
else:
self.logger.error(f'相机2视频写入器初始化失败: {camera2_video_path}')
else:
self.logger.info('足部录制功能已禁用')
# 检查屏幕视频写入器
if 'screen' in recording_types:
if self.screen_video_writer and self.screen_video_writer.isOpened():
self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}')
else:
self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}')
else:
self.logger.info('屏幕录制功能已禁用')
# 检查FemtoBolt视频写入器
if 'femtobolt' in recording_types:
if self.femtobolt_video_writer and self.femtobolt_video_writer.isOpened():
self.logger.info(f'FemtoBolt视频写入器初始化成功: {femtobolt_video_path}')
else:
self.logger.error(f'FemtoBolt视频写入器初始化失败: {femtobolt_video_path}')
else:
self.logger.info('FemtoBolt录制功能已禁用')
# 重置停止事件
self.recording_stop_event.clear()
self.sync_recording = True
# 启动共享屏幕采集(取所需类型的最大帧率)
max_needed_fps = 0
for t in recording_types:
if t == 'screen':
max_needed_fps = max(max_needed_fps, self.screen_current_fps)
elif t == 'camera1':
max_needed_fps = max(max_needed_fps, self.camera1_current_fps)
elif t == 'camera2':
max_needed_fps = max(max_needed_fps, self.camera2_current_fps)
elif t == 'femtobolt':
max_needed_fps = max(max_needed_fps, self.femtobolt_current_fps)
if max_needed_fps > 0:
self._start_shared_screen_capture(max_needed_fps)
# 根据录制类型启动对应的录制线程
if 'camera1' in recording_types and self.camera1_video_writer and self.camera1_video_writer.isOpened():
self.camera1_recording_thread = threading.Thread(
target=self._generic_recording_thread,
args=('camera1', self.camera1_region, camera1_video_path, self.camera1_video_writer),
daemon=True,
name='Camera1RecordingThread'
)
self.camera1_recording_thread.start()
# self.logger.info(f'相机1录制线程已启动 - 区域: {self.camera1_region}, 输出文件: {camera1_video_path}')
if 'camera2' in recording_types and self.camera2_video_writer and self.camera2_video_writer.isOpened():
self.camera2_recording_thread = threading.Thread(
target=self._generic_recording_thread,
args=('camera2', self.camera2_region, camera2_video_path, self.camera2_video_writer),
daemon=True,
name='Camera2RecordingThread'
)
self.camera2_recording_thread.start()
# self.logger.info(f'相机2录制线程已启动 - 区域: {self.camera2_region}, 输出文件: {camera2_video_path}')
if 'screen' in recording_types and self.screen_video_writer and self.screen_video_writer.isOpened():
self.screen_recording_thread = threading.Thread(
target=self._generic_recording_thread,
args=('screen', self.screen_region, screen_video_path, self.screen_video_writer),
daemon=True,
name='ScreenRecordingThread'
)
self.screen_recording_thread.start()
# self.logger.info(f'屏幕录制线程已启动 - 区域: {self.screen_region}, 输出文件: {screen_video_path}')
if 'femtobolt' in recording_types and self.femtobolt_video_writer and self.femtobolt_video_writer.isOpened():
self.femtobolt_recording_thread = threading.Thread(
target=self._generic_recording_thread,
args=('femtobolt', self.femtobolt_region, femtobolt_video_path, self.femtobolt_video_writer),
daemon=True,
name='FemtoBoltRecordingThread'
)
self.femtobolt_recording_thread.start()
# self.logger.info(f'FemtoBolt录制线程已启动 - 区域: {self.femtobolt_region}, 输出文件: {femtobolt_video_path}')
result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat()
result['message'] = '同步录制已启动'
self.logger.info(f'同步录制已启动 - 会话ID: {session_id}, 患者ID: {patient_id}')
except Exception as e:
self.logger.error(f'启动同步录制失败: {e}')
result['message'] = f'启动录制失败: {str(e)}'
# 清理已创建的写入器
self._cleanup_video_writers()
return result
def stop_recording(self, session_id: str = None) -> Dict[str, Any]:
"""
停止录制
Args:
session_id: 会话ID用于验证是否为当前录制会话
Returns:
Dict: 停止录制的结果
"""
result = {
'success': False,
'session_id': self.current_session_id,
'message': ''
}
try:
# 验证会话ID
if session_id and session_id != self.current_session_id:
result['message'] = f'会话ID不匹配: 期望 {self.current_session_id}, 收到 {session_id}'
return result
if not self.sync_recording:
result['message'] = '当前没有进行录制'
return result
strategy = None
if self.config_manager:
strategy = (
self.config_manager.get_config_value('SCREEN_RECORDING', 'strategy', fallback=None) or
self.config_manager.get_config_value('RECORDING', 'screen_strategy', fallback=None)
)
if strategy:
strategy = str(strategy).lower()
if strategy in ['ffmpeg', 'threaded']:
self.sync_recording = False
self.recording_stop_event.set()
if strategy == 'ffmpeg':
res = self.stop_recording_ffmpeg(session_id)
else:
res = self.stop_recording_threaded(session_id)
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
return res
# 记录停止时间,确保所有录制线程同时结束
recording_stop_time = time.time()
self.logger.info(f'开始停止录制,停止时间: {recording_stop_time}')
# 设置停止标志
self.sync_recording = False
self.recording_stop_event.set()
# 收集活跃的录制线程
active_threads = []
if hasattr(self, 'camera1_recording_thread') and self.camera1_recording_thread and self.camera1_recording_thread.is_alive():
active_threads.append(('camera1', self.camera1_recording_thread))
if hasattr(self, 'camera2_recording_thread') and self.camera2_recording_thread and self.camera2_recording_thread.is_alive():
active_threads.append(('camera2', self.camera2_recording_thread))
if hasattr(self, 'screen_recording_thread') and self.screen_recording_thread and self.screen_recording_thread.is_alive():
active_threads.append(('screen', self.screen_recording_thread))
if hasattr(self, 'femtobolt_recording_thread') and self.femtobolt_recording_thread and self.femtobolt_recording_thread.is_alive():
active_threads.append(('femtobolt', self.femtobolt_recording_thread))
# 同时等待所有录制线程结束
self.logger.info(f'等待 {len(active_threads)} 个录制线程结束')
for thread_name, thread in active_threads:
thread.join(timeout=5.0)
if thread.is_alive():
self.logger.warning(f'{thread_name}录制线程未能在超时时间内结束')
else:
self.logger.info(f'{thread_name}录制线程已结束')
# 计算实际录制时长并记录详细信息
if self.global_recording_start_time:
actual_recording_duration = recording_stop_time - self.global_recording_start_time
self.logger.info(f'录制时长统计:')
self.logger.info(f' 全局开始时间: {self.global_recording_start_time}')
self.logger.info(f' 全局结束时间: {recording_stop_time}')
self.logger.info(f' 实际录制时长: {actual_recording_duration:.3f}')
# 记录各录制类型的预期帧数
for thread_name, thread in active_threads:
if thread_name == 'screen':
expected_frames = int(actual_recording_duration * self.screen_current_fps)
self.logger.info(f' 屏幕录制预期帧数: {expected_frames}帧 (帧率{self.screen_current_fps}fps)')
elif thread_name == 'camera1':
expected_frames = int(actual_recording_duration * self.camera1_current_fps)
self.logger.info(f' 相机1录制预期帧数: {expected_frames}帧 (帧率{self.camera1_current_fps}fps)')
elif thread_name == 'camera2':
expected_frames = int(actual_recording_duration * self.camera2_current_fps)
self.logger.info(f' 相机2录制预期帧数: {expected_frames}帧 (帧率{self.camera2_current_fps}fps)')
elif thread_name == 'femtobolt':
expected_frames = int(actual_recording_duration * self.femtobolt_current_fps)
self.logger.info(f' FemtoBolt录制预期帧数: {expected_frames}帧 (帧率{self.femtobolt_current_fps}fps)')
# 清理视频写入器
self._cleanup_video_writers()
# 停止共享屏幕采集
self._stop_shared_screen_capture()
# 准备数据库更新信息,返回给调用方统一处理
if self.current_session_id:
result['database_updates'] = {
'session_id': self.current_session_id,
'status': 'recorded'
}
self.logger.info(f'数据库更新信息已准备 - 会话ID: {self.current_session_id}')
result['success'] = True
result['message'] = '录制已停止'
self.logger.info(f'录制已停止 - 会话ID: {self.current_session_id}')
# 重置会话信息
self.current_session_id = None
self.current_patient_id = None
self.recording_start_time = None
except Exception as e:
self.logger.error(f'停止录制失败: {e}')
result['message'] = f'停止录制失败: {str(e)}'
return result
def _generic_recording_thread(self, recording_type, region, output_file_name, video_writer):
"""
通用录制线程支持屏幕、相机和FemtoBolt录制
Args:
recording_type: 录制类型 ('screen', 'camera1', 'camera2', 'femtobolt')
region: 录制区域 (x, y, width, height)
output_file_name: 输出文件名
video_writer: 视频写入器对象
"""
try:
self.logger.info(f'{recording_type}录制线程启动 - 区域: {region}, 输出文件: {output_file_name}')
frame_count = 0
# 根据录制类型获取对应的自适应帧率
if recording_type == 'screen':
target_fps = self.screen_current_fps
elif recording_type == 'camera1':
target_fps = self.camera1_current_fps
elif recording_type == 'camera2':
target_fps = self.camera2_current_fps
elif recording_type == 'femtobolt':
target_fps = self.femtobolt_current_fps
else:
target_fps = 25 # 默认帧率
frame_interval = 1.0 / target_fps
last_frame_time = time.time()
self.logger.info(f'{recording_type}录制线程使用帧率: {target_fps}fps')
# 等待所有录制线程准备就绪
if self.recording_sync_barrier:
self.recording_sync_barrier.wait()
# 第一个到达的线程设置全局开始时间
if self.global_recording_start_time is None:
self.global_recording_start_time = time.time()
self.recording_start_sync.set()
else:
self.recording_start_sync.wait()
# 所有线程从相同时间点开始录制
recording_start_time = self.global_recording_start_time
if not video_writer or not video_writer.isOpened():
self.logger.error(f'{recording_type}视频写入器初始化失败: {output_file_name}')
return
# 验证并解包region参数
if not region or len(region) != 4:
self.logger.error(f'{recording_type}录制区域参数无效: {region}')
return
x, y, w, h = region
self.logger.info(f'{recording_type}录制区域解包成功: x={x}, y={y}, w={w}, h={h}')
while self.sync_recording and not self.recording_stop_event.is_set():
try:
current_time = time.time()
# 严格的帧率控制 - 确保按照设定的fps精确录制
elapsed_time = current_time - last_frame_time
if elapsed_time < frame_interval:
sleep_time = frame_interval - elapsed_time
time.sleep(sleep_time)
current_time = time.time() # 重新获取时间
frame = None
# 获取帧数据 - 从共享整屏帧进行区域裁剪
full_frame = self._get_latest_screen_frame()
if full_frame is None:
# 若共享帧尚未就绪,退化为单次全屏采集
try:
screenshot = pyautogui.screenshot()
full_frame = cv2.cvtColor(np.array(screenshot), cv2.COLOR_RGB2BGR)
except Exception as e:
self.logger.error(f'{recording_type}备用屏幕采集失败: {e}')
full_frame = None
if full_frame is not None:
H, W = full_frame.shape[:2]
x0 = max(0, min(x, W-1))
y0 = max(0, min(y, H-1))
x1 = max(0, min(x0 + w, W))
y1 = max(0, min(y0 + h, H))
if x1 > x0 and y1 > y0:
crop = full_frame[y0:y1, x0:x1]
if (x1 - x0) != w or (y1 - y0) != h:
frame = cv2.resize(crop, (w, h), interpolation=cv2.INTER_AREA)
else:
frame = crop
else:
frame = None
else:
frame = None
# 对所有区域录制进行优化以减小文件大小
frame = self._optimize_frame_for_large_region(frame, region, recording_type)
# 写入视频帧
if frame is not None:
video_writer.write(frame)
frame_count += 1
else:
self.logger.warning(f'{recording_type}获取帧失败,跳过此帧')
last_frame_time = current_time
except Exception as e:
self.logger.error(f'{recording_type}录制线程错误: {e}')
time.sleep(0.1)
# 计算录制统计信息
if self.global_recording_start_time:
total_recording_time = time.time() - self.global_recording_start_time
expected_frames = int(total_recording_time * target_fps)
if abs(frame_count - expected_frames) > target_fps * 0.1: # 如果帧数差异超过0.1秒的帧数
self.logger.warning(f'{recording_type}帧数异常: 实际{frame_count}帧 vs 预期{expected_frames}帧,差异{frame_count - expected_frames}')
else:
self.logger.info(f'{recording_type}录制线程结束,总帧数: {frame_count}')
except Exception as e:
self.logger.error(f'{recording_type}录制线程异常: {e}')
finally:
# 清理资源
if video_writer:
try:
video_writer.release()
self.logger.info(f'{recording_type}视频写入器已释放')
except Exception as e:
self.logger.error(f'释放{recording_type}视频写入器失败: {e}')
def _cleanup_video_writers(self):
"""清理视频写入器"""
try:
if hasattr(self, 'feet_video_writer') and self.feet_video_writer:
self.feet_video_writer.release()
self.feet_video_writer = None
self.logger.debug("足部视频写入器已清理")
if hasattr(self, 'screen_video_writer') and self.screen_video_writer:
self.screen_video_writer.release()
self.screen_video_writer = None
self.logger.debug("屏幕视频写入器已清理")
if hasattr(self, 'femtobolt_video_writer') and self.femtobolt_video_writer:
self.femtobolt_video_writer.release()
self.femtobolt_video_writer = None
self.logger.debug("FemtoBolt视频写入器已清理")
except Exception as e:
self.logger.error(f"清理视频写入器失败: {e}")
def _set_directory_permissions(self, path):
"""设置目录权限"""
try:
import subprocess
import platform
if platform.system() == 'Windows':
try:
# 为Users用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Users:(OI)(CI)F'
], check=True, capture_output=True, text=True)
# 为Everyone用户组授予完全控制权限
subprocess.run([
'icacls', path, '/grant', 'Everyone:(OI)(CI)F'
], check=True, capture_output=True, text=True)
self.logger.info(f"已设置Windows目录权限Users和Everyone完全控制: {path}")
except subprocess.CalledProcessError as icacls_error:
self.logger.warning(f"Windows权限设置失败: {icacls_error}")
else:
self.logger.info(f"已设置目录权限为777: {path}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
def set_screen_region(self, region):
"""设置屏幕录制区域"""
if self.sync_recording:
self.logger.warning("录制进行中,无法更改区域设置")
return False
self.screen_region = region
if self.screen_region:
x, y, width, height = self.screen_region
# 确保区域在屏幕范围内
x = max(0, min(x, self.screen_size[0] - 1))
y = max(0, min(y, self.screen_size[1] - 1))
width = min(width, self.screen_size[0] - x)
height = min(height, self.screen_size[1] - y)
self.screen_region = (x, y, width, height)
self.logger.info(f"录制区域已设置: {self.screen_region}")
else:
self.logger.info("录制模式已设置: 全屏录制")
return True
def set_recording_regions(self, screen_region=None, camera_region=None, femtobolt_region=None):
"""
设置三个录制区域
Args:
screen_region: 屏幕录制区域 (x, y, width, height)
camera_region: 相机录制区域 (x, y, width, height)
femtobolt_region: FemtoBolt录制区域 (x, y, width, height)
"""
if self.sync_recording:
self.logger.warning("录制进行中,无法更改区域设置")
return False
self.screen_region = screen_region
# 兼容旧参数如果传入单一camera_region则同时设置为camera1与camera2区域
if camera_region is not None:
self.camera1_region = camera_region
self.camera2_region = camera_region
self.femtobolt_region = femtobolt_region
self.logger.info(f'录制区域已设置:')
self.logger.info(f' 屏幕区域: {screen_region}')
self.logger.info(f' 相机1区域: {self.camera1_region}')
self.logger.info(f' 相机2区域: {self.camera2_region}')
self.logger.info(f' FemtoBolt区域: {femtobolt_region}')
return True
def get_status(self):
"""获取录制状态"""
return {
'recording': self.sync_recording,
'session_id': self.current_session_id,
'patient_id': self.current_patient_id,
'recording_start_time': self.recording_start_time.isoformat() if self.recording_start_time else None,
'screen_size': self.screen_size,
'screen_region': self.screen_region,
'screen_fps': self.screen_fps,
'camera1_region': self.camera1_region,
'camera2_region': self.camera2_region,
'camera1_fps': self.camera1_fps,
'camera2_fps': self.camera2_fps,
'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False,
'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False
}
def save_detection_images(self, session_id: str, patient_id: str, detection_data: Dict[str, Any]) -> Dict[str, Any]:
"""
保存前端传入的检测图片到指定目录
Args:
session_id: 检测会话ID
patient_id: 患者ID
detection_data: 前端传入的检测数据包含base64格式的图片数据
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%H%M%S%f')[:-3] # 精确到毫秒
file_path = self.config_manager.get_config_value('FILEPATH', 'path')
data_dir = Path(os.path.join(file_path,patient_id, session_id, f"image_{timestamp}"))
# 创建数据存储目录
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
self.logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': detection_data.get('head_pose'),
'screen_location': detection_data.get('screen_location'),
'body_pose': None,
'body_image': None,
'foot_data': detection_data.get('foot_data'),
'foot_data_image': None,
'foot1_image': None,
'foot2_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# 保存图片数据
image_fields = [
('body_image', 'body'),
('foot1_image', 'foot1'),
('foot2_image', 'foot2'),
('foot_data_image', 'foot_data')
]
for field, prefix in image_fields:
base64_data = detection_data.get(field)
if base64_data:
try:
# 移除base64头部信息
if ';base64,' in base64_data:
base64_data = base64_data.split(';base64,')[1]
# 解码base64数据
image_data = base64.b64decode(base64_data)
# 生成图片文件名
filename = f'{prefix}_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图片
with open(file_path, 'wb') as f:
f.write(image_data)
# 更新数据字典中的图片路径
data[field] = str(os.path.join(patient_id, session_id, f"image_{timestamp}", filename))
self.logger.debug(f'{field}保存成功: {filename}')
except Exception as e:
self.logger.error(f'保存{field}失败: {e}')
# 屏幕截图
screen_image = self._capture_screen_image(data_dir, data.get('screen_location'), timestamp=timestamp)
if screen_image:
data['screen_image'] = str(os.path.join( patient_id, session_id, f"image_{timestamp}", screen_image))
self.logger.debug(f'数据保存完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
self.logger.error(f'数据保存失败: {e}')
return data
def _capture_screen_image(self, data_dir, screen_location, timestamp) -> Optional[str]:
"""
采集屏幕截图根据screen_region 进行截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
# 截取屏幕
if screen_location:
# 使用指定区域截图
x, y, width, height = screen_location
screenshot = pyautogui.screenshot(region=(x, y, width, height))
else:
# 全屏截图
screenshot = pyautogui.screenshot()
# 保存截图
from pathlib import Path
screen_filename = f'screen_{timestamp}.jpg'
image_path = Path(data_dir) / screen_filename
screenshot.save(str(image_path), quality=95, optimize=True)
return screen_filename
except Exception as e:
self.logger.error(f'屏幕截图失败: {e}')
return None
# 保持向后兼容的ScreenRecorder类
class ScreenRecorder:
def __init__(self, output_dir="recordings", fps=20, quality=80, region=None):
"""向后兼容的屏幕录制器"""
self.recording_manager = RecordingManager()
self.recording_manager.screen_fps = fps
self.recording_manager.set_screen_region(region)
self.output_dir = output_dir
# 创建输出目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
def start_recording(self, filename=None):
"""开始录制"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"screen_record_{timestamp}"
# 使用文件名作为会话ID
session_id = filename
patient_id = "default"
return self.recording_manager.start_recording(session_id, patient_id)
def stop_recording(self):
"""停止录制"""
return self.recording_manager.stop_recording()
def get_status(self):
"""获取状态"""
return self.recording_manager.get_status()