录像功能优化提交

This commit is contained in:
zhaozilong12 2025-08-20 16:04:38 +08:00
parent f07af4cd61
commit 03127c843a
4 changed files with 426 additions and 18 deletions

3
.gitignore vendored
View File

@ -47,6 +47,7 @@ build/
frontend/src/renderer/dist/
frontend/src/renderer/dist-electron/
backend/data/patients/
frontend/src/renderer/src/services/
# 临时文件
*.tmp
@ -21415,3 +21416,5 @@ frontend/src/renderer/dist-electron/win-unpacked/resources/backend/BodyBalanceBa
frontend/src/renderer/dist-electron/win-unpacked/resources/backend/BodyBalanceBackend/dll/smitsense/SMiTSenseUsbWrapper.dll
frontend/src/renderer/dist-electron/win-unpacked/resources/backend/BodyBalanceBackend/dll/smitsense/Wrapper.dll
backend/data/patients/202508060001/20250820102556/feet.mp4
frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js

View File

@ -163,7 +163,7 @@ def init_app():
db_manager.init_database()
# 初始化设备管理器(不自动初始化设备)
device_manager = DeviceManager(db_manager)
device_manager = DeviceManager(db_manager, recording_manager)
# 初始化相机管理器
global camera_manager, recording_manager
@ -902,7 +902,6 @@ def collect_detection_data(session_id):
# 获取请求数据
data = flask_request.get_json() or {}
patient_id = data.get('patient_id')
screen_image_base64 = data.get('imageData')
# 如果没有提供patient_id从会话信息中获取
if not patient_id:
@ -920,11 +919,10 @@ def collect_detection_data(session_id):
'error': '无法获取患者ID'
}), 400
# 调用设备管理器采集数据
collected_data = device_manager.collect_data(
# 调用录制管理器采集数据
collected_data = recording_manager.collect_detection_data(
session_id=session_id,
patient_id=patient_id,
screen_image_base64=screen_image_base64
patient_id=patient_id
)
# 将采集的数据保存到数据库

View File

@ -13,24 +13,37 @@ import time
from datetime import datetime
import os
import logging
import json
import base64
from pathlib import Path
from typing import Optional, Dict, Any
try:
from .camera_manager import CameraManager
from .femtobolt_manager import FemtoBoltManager
from .pressure_manager import PressureManager
except ImportError:
from camera_manager import CameraManager
from femtobolt_manager import FemtoBoltManager
from pressure_manager import PressureManager
class RecordingManager:
def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None):
def __init__(self, camera_manager: Optional[CameraManager] = None, db_manager=None,
femtobolt_manager: Optional[FemtoBoltManager] = None,
pressure_manager: Optional[PressureManager] = None):
"""
初始化录制管理器
Args:
camera_manager: 相机管理器实例
db_manager: 数据库管理器实例
femtobolt_manager: FemtoBolt深度相机管理器实例
pressure_manager: 压力传感器管理器实例
"""
self.camera_manager = camera_manager
self.db_manager = db_manager
self.femtobolt_manager = femtobolt_manager
self.pressure_manager = pressure_manager
# 录制状态
self.sync_recording = False
@ -134,7 +147,7 @@ class RecordingManager:
self.logger.error(f'更新数据库视频路径失败: {db_error}')
# 视频编码参数
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
fourcc = cv2.VideoWriter_fourcc(*'avc1')
fps = 30
# 初始化足部视频写入器
@ -152,9 +165,10 @@ class RecordingManager:
self.logger.warning('相机设备未启用,跳过脚部视频写入器初始化')
# 初始化屏幕录制写入器
record_size = self.screen_region[2:4] if self.screen_region else self.screen_size
# record_size = self.screen_region[2:4] if self.screen_region else self.screen_size
# print('屏幕写入器的宽高..............',record_size)
self.screen_video_writer = cv2.VideoWriter(
screen_video_path, fourcc, self.screen_fps, record_size
screen_video_path, fourcc, fps, (self.screen_size[0],self.screen_size[1])
)
if self.screen_video_writer.isOpened():
@ -361,11 +375,13 @@ class RecordingManager:
# 检查是否到了下一帧的时间
if current_time - last_frame_time >= frame_interval:
try:
# 截取屏幕
if self.screen_region:
x, y, width, height = self.screen_region
screenshot = pyautogui.screenshot(region=(x, y, width, height))
# 截取屏幕self.screen_size
if self.screen_size:
# print('获取截图的时候屏幕写入器的宽高..............',self.screen_region)
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
else:
# print('screen_region方法没找到。。。。。。。。。。。。。。。。。')
screenshot = pyautogui.screenshot()
# 转换为numpy数组
@ -475,6 +491,319 @@ class RecordingManager:
'feet_writer_active': self.feet_video_writer is not None and self.feet_video_writer.isOpened() if self.feet_video_writer else False,
'screen_writer_active': self.screen_video_writer is not None and self.screen_video_writer.isOpened() if self.screen_video_writer else False
}
def capture_images(self, session_id: str, patient_id: str, data_dir) -> Dict[str, str]:
"""
采集屏幕截图和足部视频截图
Args:
session_id: 检测会话ID
patient_id: 患者ID
data_dir: 数据存储目录路径
Returns:
Dict: 包含截图文件路径的字典
"""
result = {
'screen_image': None,
'foot_image': None
}
try:
# 1. 采集屏幕截图
screen_image_path = self._capture_screen_image(data_dir)
if screen_image_path:
result['screen_image'] = str(screen_image_path)
self.logger.debug(f'屏幕截图保存成功: {screen_image_path}')
# 2. 采集足部视频截图
if self.camera_manager and self.camera_manager.is_connected:
foot_image_path = self._capture_foot_image(data_dir)
if foot_image_path:
result['foot_image'] = str(foot_image_path)
self.logger.debug(f'足部截图保存成功: {foot_image_path}')
else:
self.logger.warning('相机设备未连接,跳过足部截图')
except Exception as e:
self.logger.error(f'截图采集失败: {e}')
return result
def collect_detection_data(self, session_id: str, patient_id: str) -> Dict[str, Any]:
"""
采集所有设备数据并保存到指定目录结构
Args:
session_id: 检测会话ID
patient_id: 患者ID
Returns:
Dict: 包含所有采集数据的字典符合detection_data表结构
"""
# 生成采集时间戳
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 精确到毫秒
# 创建数据存储目录
data_dir = Path(f'data/patients/{patient_id}/{session_id}/{timestamp}')
data_dir.mkdir(parents=True, exist_ok=True)
# 设置目录权限为777完全权限
try:
import stat
os.chmod(str(data_dir), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 777权限
self.logger.debug(f"已设置目录权限为777: {data_dir}")
except Exception as perm_error:
self.logger.warning(f"设置目录权限失败: {perm_error},但目录创建成功")
# 初始化数据字典
data = {
'session_id': session_id,
'head_pose': None,
'body_pose': None,
'body_image': None,
'foot_data': None,
'foot_image': None,
'foot_data_image': None,
'screen_image': None,
'timestamp': timestamp
}
try:
# 1. 采集头部姿态数据从IMU设备获取
# 注意这里需要从外部传入IMU设备或者在初始化时添加IMU管理器
# if self.imu_manager and self.imu_manager.is_connected:
# head_pose_data = self._collect_head_pose_data()
# if head_pose_data:
# data['head_pose'] = json.dumps(head_pose_data)
# self.logger.debug(f'头部姿态数据采集成功: {session_id}')
# 2. 采集身体姿态数据从FemtoBolt深度相机获取
if self.femtobolt_manager and self.femtobolt_manager.is_connected:
body_pose_data = self._collect_body_pose_data()
if body_pose_data:
data['body_pose'] = json.dumps(body_pose_data)
self.logger.debug(f'身体姿态数据采集成功: {session_id}')
# 3. 采集身体视频截图从FemtoBolt深度相机获取
if self.femtobolt_manager and self.femtobolt_manager.is_connected:
try:
body_image_path = self._capture_body_image(data_dir)
if body_image_path:
data['body_image'] = str(body_image_path)
self.logger.debug(f'身体截图保存成功: {body_image_path}')
except Exception as e:
self.logger.error(f'采集身体截图异常: {e}')
# 4. 采集足部压力数据(从压力传感器获取)
if self.pressure_manager and hasattr(self.pressure_manager, 'is_connected') and self.pressure_manager.is_connected:
foot_data = self._collect_foot_pressure_data()
if foot_data:
data['foot_data'] = json.dumps(foot_data)
self.logger.debug(f'足部压力数据采集成功: {session_id}')
# 5. 采集足部监测视频截图(从摄像头获取)
if self.camera_manager and self.camera_manager.is_connected:
foot_image_path = self._capture_foot_image(data_dir)
if foot_image_path:
data['foot_image'] = str(foot_image_path)
self.logger.debug(f'足部截图保存成功: {foot_image_path}')
# 6. 生成足底压力数据图(从压力传感器数据生成)
if self.pressure_manager and hasattr(self.pressure_manager, 'is_connected') and self.pressure_manager.is_connected:
foot_data_image_path = self._generate_foot_pressure_image(data_dir)
if foot_data_image_path:
data['foot_data_image'] = str(foot_data_image_path)
self.logger.debug(f'足底压力数据图生成成功: {foot_data_image_path}')
# 7. 采集屏幕截图
screen_image_path = self._capture_screen_image(data_dir)
if screen_image_path:
data['screen_image'] = str(screen_image_path)
self.logger.debug(f'屏幕截图保存成功: {screen_image_path}')
self.logger.debug(f'数据采集完成: {session_id}, 时间戳: {timestamp}')
except Exception as e:
self.logger.error(f'数据采集失败: {e}')
return data
def _collect_body_pose_data(self) -> Optional[Dict[str, Any]]:
"""
从FemtoBolt深度相机采集身体姿态数据
Returns:
Dict: 身体姿态数据字典
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_pose_data'):
pose_data = self.femtobolt_manager.get_pose_data()
return pose_data
else:
self.logger.warning('FemtoBolt管理器未连接或不支持姿态数据采集')
return None
except Exception as e:
self.logger.error(f'采集身体姿态数据失败: {e}')
return None
def _capture_body_image(self, data_dir) -> Optional[str]:
"""
从FemtoBolt深度相机采集身体截图
Args:
data_dir: 数据存储目录
Returns:
str: 身体截图文件的相对路径
"""
try:
if self.femtobolt_manager and hasattr(self.femtobolt_manager, 'get_latest_frame'):
frame = self.femtobolt_manager.get_latest_frame()
if frame is not None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'body_{timestamp}.jpg'
file_path = data_dir / filename
# 保存图像
cv2.imwrite(str(file_path), frame)
# 返回相对路径
return str(file_path.relative_to(Path.cwd()))
else:
self.logger.warning('FemtoBolt相机未获取到有效帧')
return None
else:
self.logger.warning('FemtoBolt管理器未连接或不支持图像采集')
return None
except Exception as e:
self.logger.error(f'采集身体截图失败: {e}')
return None
def _collect_foot_pressure_data(self) -> Optional[Dict[str, Any]]:
"""
从压力传感器采集足部压力数据
Returns:
Dict: 足部压力数据字典
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'get_pressure_data'):
pressure_data = self.pressure_manager.get_pressure_data()
return pressure_data
else:
self.logger.warning('压力传感器管理器未连接或不支持压力数据采集')
return None
except Exception as e:
self.logger.error(f'采集足部压力数据失败: {e}')
return None
def _generate_foot_pressure_image(self, data_dir) -> Optional[str]:
"""
生成足底压力数据图
Args:
data_dir: 数据存储目录
Returns:
str: 足底压力数据图文件的相对路径
"""
try:
if self.pressure_manager and hasattr(self.pressure_manager, 'generate_pressure_heatmap'):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3]
filename = f'foot_pressure_{timestamp}.jpg'
file_path = data_dir / filename
# 生成压力热力图
success = self.pressure_manager.generate_pressure_heatmap(str(file_path))
if success and file_path.exists():
# 返回相对路径
return str(file_path.relative_to(Path.cwd()))
else:
self.logger.warning('足底压力数据图生成失败')
return None
else:
self.logger.warning('压力传感器管理器未连接或不支持压力图生成')
return None
except Exception as e:
self.logger.error(f'生成足底压力数据图失败: {e}')
return None
def _capture_screen_image(self, data_dir) -> Optional[str]:
"""
采集屏幕截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
# 截取屏幕
if self.screen_size:
width, height = self.screen_size
screenshot = pyautogui.screenshot(region=(0, 0, width, height))
else:
screenshot = pyautogui.screenshot()
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'screen_image.png'
screenshot.save(str(image_path))
# 返回相对路径
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
relative_path = abs_image_path.relative_to(abs_cwd)
return str(relative_path)
except Exception as e:
self.logger.error(f'屏幕截图失败: {e}')
return None
def _capture_foot_image(self, data_dir) -> Optional[str]:
"""
采集足部视频截图
Args:
data_dir: 数据存储目录路径
Returns:
str: 截图文件的相对路径失败返回None
"""
try:
if not self.camera_manager or not self.camera_manager.is_connected:
self.logger.warning('相机设备未连接,无法采集足部截图')
return None
# 从相机管理器获取最新帧
frame, frame_timestamp = self.camera_manager._get_latest_frame_from_cache('camera')
if frame is None:
self.logger.warning('无法从相机获取帧数据')
return None
# 调整帧尺寸
resized_frame = cv2.resize(frame, self.MAX_FRAME_SIZE)
# 保存截图
from pathlib import Path
image_path = Path(data_dir) / 'foot_image.png'
cv2.imwrite(str(image_path), resized_frame)
# 返回相对路径
abs_image_path = image_path.resolve()
abs_cwd = Path.cwd().resolve()
relative_path = abs_image_path.relative_to(abs_cwd)
return str(relative_path)
except Exception as e:
self.logger.error(f'足部截图失败: {e}')
return None
# 保持向后兼容的ScreenRecorder类

View File

@ -267,6 +267,85 @@ class AppServer:
'version': '1.0.0'
})
# ==================== 静态文件服务 ====================
@self.app.route('/data/<path:filename>', methods=['GET'])
def serve_static_files(filename):
"""提供静态文件服务代理backend/data/目录"""
try:
# 获取data目录的绝对路径
if getattr(sys, 'frozen', False):
# 打包环境
data_dir = os.path.join(os.path.dirname(sys.executable), 'data')
else:
# 开发环境
data_dir = os.path.join(os.path.dirname(__file__), 'data')
# 安全检查:防止路径遍历攻击
safe_path = os.path.normpath(filename)
if '..' in safe_path or safe_path.startswith('/'):
return jsonify({'error': '非法路径'}), 400
file_path = os.path.join(data_dir, safe_path)
# 检查文件是否存在
if not os.path.exists(file_path):
return jsonify({'error': '文件不存在'}), 404
# 检查是否在允许的目录内
if not os.path.commonpath([data_dir, file_path]) == data_dir:
return jsonify({'error': '访问被拒绝'}), 403
# 返回文件
from flask import send_file
return send_file(file_path)
except Exception as e:
self.logger.error(f'静态文件服务错误: {e}')
return jsonify({'error': '服务器内部错误'}), 500
@self.app.route('/data/', methods=['GET'])
@self.app.route('/data', methods=['GET'])
def list_data_directory():
"""列出data目录下的文件和文件夹"""
try:
# 获取data目录的绝对路径
if getattr(sys, 'frozen', False):
# 打包环境
data_dir = os.path.join(os.path.dirname(sys.executable), 'data')
else:
# 开发环境
data_dir = os.path.join(os.path.dirname(__file__), 'data')
if not os.path.exists(data_dir):
return jsonify({'error': 'data目录不存在'}), 404
# 获取目录内容
items = []
for item in os.listdir(data_dir):
item_path = os.path.join(data_dir, item)
is_dir = os.path.isdir(item_path)
size = os.path.getsize(item_path) if not is_dir else None
modified = datetime.fromtimestamp(os.path.getmtime(item_path)).isoformat()
items.append({
'name': item,
'type': 'directory' if is_dir else 'file',
'size': size,
'modified': modified,
'url': f'/data/{item}' if not is_dir else None
})
return jsonify({
'success': True,
'path': '/data/',
'items': sorted(items, key=lambda x: (x['type'] == 'file', x['name']))
})
except Exception as e:
self.logger.error(f'目录列表错误: {e}')
return jsonify({'error': '服务器内部错误'}), 500
@self.app.route('/test-socketio')
def test_socketio():
"""测试SocketIO连接"""
@ -1102,11 +1181,10 @@ class AppServer:
'error': '无法获取患者ID'
}), 400
# 调用设备管理器采集数据
collected_data = self.device_coordinator.collect_data(
# 调用录制管理器采集数据
collected_data = self.recording_manager.collect_detection_data(
session_id=session_id,
patient_id=patient_id,
screen_image_base64=screen_image_base64
patient_id=patient_id
)
# 将采集的数据保存到数据库