BodyBalanceEvaluation/backend/device_manager.py

479 lines
16 KiB
Python
Raw Normal View History

2025-07-28 11:59:56 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备管理模块
负责摄像头IMU传感器和压力传感器的连接和数据采集
"""
import cv2
import numpy as np
import time
import threading
import json
from datetime import datetime
from typing import Dict, List, Optional, Any
import logging
logger = logging.getLogger(__name__)
class DeviceManager:
"""设备管理器"""
def __init__(self):
self.camera = None
self.imu_device = None
self.pressure_device = None
self.device_status = {
'camera': False,
'imu': False,
'pressure': False
}
self.calibration_data = {}
self.data_lock = threading.Lock()
self.latest_data = {}
# 初始化设备
self._init_devices()
def _init_devices(self):
"""初始化所有设备"""
try:
self._init_camera()
self._init_imu()
self._init_pressure_sensor()
logger.info('设备初始化完成')
except Exception as e:
logger.error(f'设备初始化失败: {e}')
def _init_camera(self):
"""初始化摄像头"""
try:
# 尝试连接默认摄像头
self.camera = cv2.VideoCapture(0)
if self.camera.isOpened():
# 设置摄像头参数
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.camera.set(cv2.CAP_PROP_FPS, 30)
self.device_status['camera'] = True
logger.info('摄像头初始化成功')
else:
logger.warning('摄像头连接失败')
self.camera = None
except Exception as e:
logger.error(f'摄像头初始化异常: {e}')
self.camera = None
def _init_imu(self):
"""初始化IMU传感器"""
try:
# 这里应该连接实际的IMU设备
# 目前使用模拟数据
self.imu_device = MockIMUDevice()
self.device_status['imu'] = True
logger.info('IMU传感器初始化成功模拟')
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
self.imu_device = None
def _init_pressure_sensor(self):
"""初始化压力传感器"""
try:
# 这里应该连接实际的压力传感器
# 目前使用模拟数据
self.pressure_device = MockPressureDevice()
self.device_status['pressure'] = True
logger.info('压力传感器初始化成功(模拟)')
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
self.pressure_device = None
def get_device_status(self) -> Dict[str, bool]:
"""获取设备状态"""
return self.device_status.copy()
def get_connected_devices(self) -> List[str]:
"""获取已连接的设备列表"""
return [device for device, status in self.device_status.items() if status]
def refresh_devices(self):
"""刷新设备连接"""
logger.info('刷新设备连接...')
# 重新初始化所有设备
if self.camera:
self.camera.release()
self._init_devices()
def calibrate_devices(self) -> Dict[str, Any]:
"""校准设备"""
calibration_result = {}
try:
# 摄像头校准
if self.device_status['camera']:
camera_calibration = self._calibrate_camera()
calibration_result['camera'] = camera_calibration
# IMU校准
if self.device_status['imu']:
imu_calibration = self._calibrate_imu()
calibration_result['imu'] = imu_calibration
# 压力传感器校准
if self.device_status['pressure']:
pressure_calibration = self._calibrate_pressure()
calibration_result['pressure'] = pressure_calibration
self.calibration_data = calibration_result
logger.info('设备校准完成')
except Exception as e:
logger.error(f'设备校准失败: {e}')
raise
return calibration_result
def _calibrate_camera(self) -> Dict[str, Any]:
"""校准摄像头"""
if not self.camera or not self.camera.isOpened():
return {'status': 'failed', 'error': '摄像头未连接'}
try:
# 获取几帧图像进行校准
frames = []
for _ in range(10):
ret, frame = self.camera.read()
if ret:
frames.append(frame)
time.sleep(0.1)
if not frames:
return {'status': 'failed', 'error': '无法获取图像'}
# 计算平均亮度和对比度
avg_brightness = np.mean([np.mean(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) for frame in frames])
calibration = {
'status': 'success',
'brightness': float(avg_brightness),
'resolution': (int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)),
int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))),
'fps': float(self.camera.get(cv2.CAP_PROP_FPS)),
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]:
"""校准IMU传感器"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
# 收集静态数据进行零点校准
samples = []
for _ in range(100):
data = self.imu_device.read_data()
samples.append(data)
time.sleep(0.01)
# 计算零点偏移
accel_offset = {
'x': np.mean([s['accel']['x'] for s in samples]),
'y': np.mean([s['accel']['y'] for s in samples]),
'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿
}
gyro_offset = {
'x': np.mean([s['gyro']['x'] for s in samples]),
'y': np.mean([s['gyro']['y'] for s in samples]),
'z': np.mean([s['gyro']['z'] for s in samples])
}
calibration = {
'status': 'success',
'accel_offset': accel_offset,
'gyro_offset': gyro_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _calibrate_pressure(self) -> Dict[str, Any]:
"""校准压力传感器"""
if not self.pressure_device:
return {'status': 'failed', 'error': '压力传感器未连接'}
try:
# 收集零压力数据
samples = []
for _ in range(50):
data = self.pressure_device.read_data()
samples.append(data)
time.sleep(0.02)
# 计算零点偏移
zero_offset = {
'left_foot': np.mean([s['left_foot'] for s in samples]),
'right_foot': np.mean([s['right_foot'] for s in samples])
}
calibration = {
'status': 'success',
'zero_offset': zero_offset,
'timestamp': datetime.now().isoformat()
}
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def collect_data(self) -> Dict[str, Any]:
"""采集所有设备数据"""
data = {}
timestamp = datetime.now().isoformat()
try:
# 采集摄像头数据
if self.device_status['camera']:
camera_data = self._collect_camera_data()
if camera_data:
data['camera'] = camera_data
# 采集IMU数据
if self.device_status['imu']:
imu_data = self._collect_imu_data()
if imu_data:
data['imu'] = imu_data
# 采集压力传感器数据
if self.device_status['pressure']:
pressure_data = self._collect_pressure_data()
if pressure_data:
data['pressure'] = pressure_data
# 添加时间戳
data['timestamp'] = timestamp
# 更新最新数据
with self.data_lock:
self.latest_data = data.copy()
except Exception as e:
logger.error(f'数据采集失败: {e}')
return data
def _collect_camera_data(self) -> Optional[Dict[str, Any]]:
"""采集摄像头数据"""
if not self.camera or not self.camera.isOpened():
return None
try:
ret, frame = self.camera.read()
if not ret:
return None
# 进行姿态检测(这里应该集成实际的姿态检测算法)
pose_data = self._detect_pose(frame)
return {
'frame_available': True,
'frame_size': frame.shape,
'pose_data': pose_data,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
logger.error(f'摄像头数据采集失败: {e}')
return None
def _detect_pose(self, frame: np.ndarray) -> Dict[str, Any]:
"""姿态检测(模拟实现)"""
# 这里应该集成实际的姿态检测算法如MediaPipe或OpenPose
# 目前返回模拟数据
return {
'center_of_gravity': {
'x': np.random.normal(0, 5),
'y': np.random.normal(0, 5)
},
'body_angle': {
'pitch': np.random.normal(0, 2),
'roll': np.random.normal(0, 2),
'yaw': np.random.normal(0, 1)
},
'confidence': np.random.uniform(0.8, 1.0)
}
def _collect_imu_data(self) -> Optional[Dict[str, Any]]:
"""采集IMU数据"""
if not self.imu_device:
return None
try:
raw_data = self.imu_device.read_data()
# 应用校准偏移
if 'imu' in self.calibration_data:
calibration = self.calibration_data['imu']
if calibration.get('status') == 'success':
accel_offset = calibration.get('accel_offset', {})
gyro_offset = calibration.get('gyro_offset', {})
# 校正加速度数据
raw_data['accel']['x'] -= accel_offset.get('x', 0)
raw_data['accel']['y'] -= accel_offset.get('y', 0)
raw_data['accel']['z'] -= accel_offset.get('z', 0)
# 校正陀螺仪数据
raw_data['gyro']['x'] -= gyro_offset.get('x', 0)
raw_data['gyro']['y'] -= gyro_offset.get('y', 0)
raw_data['gyro']['z'] -= gyro_offset.get('z', 0)
return raw_data
except Exception as e:
logger.error(f'IMU数据采集失败: {e}')
return None
def _collect_pressure_data(self) -> Optional[Dict[str, Any]]:
"""采集压力传感器数据"""
if not self.pressure_device:
return None
try:
raw_data = self.pressure_device.read_data()
# 应用校准偏移
if 'pressure' in self.calibration_data:
calibration = self.calibration_data['pressure']
if calibration.get('status') == 'success':
zero_offset = calibration.get('zero_offset', {})
raw_data['left_foot'] -= zero_offset.get('left_foot', 0)
raw_data['right_foot'] -= zero_offset.get('right_foot', 0)
# 计算重心位置
total_pressure = raw_data['left_foot'] + raw_data['right_foot']
if total_pressure > 0:
center_of_pressure = {
'x': (raw_data['right_foot'] - raw_data['left_foot']) / total_pressure * 100,
'y': 0 # 简化模型
}
else:
center_of_pressure = {'x': 0, 'y': 0}
raw_data['center_of_pressure'] = center_of_pressure
raw_data['total_pressure'] = total_pressure
return raw_data
except Exception as e:
logger.error(f'压力传感器数据采集失败: {e}')
return None
def get_latest_data(self) -> Dict[str, Any]:
"""获取最新采集的数据"""
with self.data_lock:
return self.latest_data.copy()
def start_video_recording(self, output_path: str) -> bool:
"""开始视频录制"""
if not self.camera or not self.camera.isOpened():
return False
try:
# 获取摄像头参数
width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(self.camera.get(cv2.CAP_PROP_FPS))
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
self.video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
if self.video_writer.isOpened():
logger.info(f'开始视频录制: {output_path}')
return True
else:
logger.error('视频写入器创建失败')
return False
except Exception as e:
logger.error(f'开始视频录制失败: {e}')
return False
def stop_video_recording(self):
"""停止视频录制"""
if hasattr(self, 'video_writer') and self.video_writer:
self.video_writer.release()
self.video_writer = None
logger.info('视频录制已停止')
def cleanup(self):
"""清理资源"""
try:
if self.camera:
self.camera.release()
if hasattr(self, 'video_writer') and self.video_writer:
self.video_writer.release()
logger.info('设备资源已清理')
except Exception as e:
logger.error(f'清理设备资源失败: {e}')
class MockIMUDevice:
"""模拟IMU设备"""
def __init__(self):
self.noise_level = 0.1
def read_data(self) -> Dict[str, Any]:
"""读取IMU数据"""
return {
'accel': {
'x': np.random.normal(0, self.noise_level),
'y': np.random.normal(0, self.noise_level),
'z': np.random.normal(9.8, self.noise_level) # 重力加速度
},
'gyro': {
'x': np.random.normal(0, self.noise_level),
'y': np.random.normal(0, self.noise_level),
'z': np.random.normal(0, self.noise_level)
},
'temperature': np.random.normal(25, 2),
'timestamp': datetime.now().isoformat()
}
class MockPressureDevice:
"""模拟压力传感器设备"""
def __init__(self):
self.base_pressure = 500 # 基础压力值
self.noise_level = 10
def read_data(self) -> Dict[str, Any]:
"""读取压力数据"""
# 模拟轻微的左右脚压力差异
left_pressure = self.base_pressure + np.random.normal(0, self.noise_level)
right_pressure = self.base_pressure + np.random.normal(0, self.noise_level)
return {
'left_foot': max(0, left_pressure),
'right_foot': max(0, right_pressure),
'timestamp': datetime.now().isoformat()
}