#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 设备管理模块 负责摄像头、IMU传感器和压力传感器的连接和数据采集 """ import cv2 import numpy as np import time import threading import json from datetime import datetime from typing import Dict, List, Optional, Any import logging logger = logging.getLogger(__name__) class DeviceManager: """设备管理器""" def __init__(self): self.camera = None self.imu_device = None self.pressure_device = None self.device_status = { 'camera': False, 'imu': False, 'pressure': False } self.calibration_data = {} self.data_lock = threading.Lock() self.latest_data = {} # 初始化设备 self._init_devices() def _init_devices(self): """初始化所有设备""" try: self._init_camera() self._init_imu() self._init_pressure_sensor() logger.info('设备初始化完成') except Exception as e: logger.error(f'设备初始化失败: {e}') def _init_camera(self): """初始化摄像头""" try: # 尝试连接默认摄像头 self.camera = cv2.VideoCapture(0) if self.camera.isOpened(): # 设置摄像头参数 self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) self.camera.set(cv2.CAP_PROP_FPS, 30) self.device_status['camera'] = True logger.info('摄像头初始化成功') else: logger.warning('摄像头连接失败') self.camera = None except Exception as e: logger.error(f'摄像头初始化异常: {e}') self.camera = None def _init_imu(self): """初始化IMU传感器""" try: # 这里应该连接实际的IMU设备 # 目前使用模拟数据 self.imu_device = MockIMUDevice() self.device_status['imu'] = True logger.info('IMU传感器初始化成功(模拟)') except Exception as e: logger.error(f'IMU传感器初始化失败: {e}') self.imu_device = None def _init_pressure_sensor(self): """初始化压力传感器""" try: # 这里应该连接实际的压力传感器 # 目前使用模拟数据 self.pressure_device = MockPressureDevice() self.device_status['pressure'] = True logger.info('压力传感器初始化成功(模拟)') except Exception as e: logger.error(f'压力传感器初始化失败: {e}') self.pressure_device = None def get_device_status(self) -> Dict[str, bool]: """获取设备状态""" return self.device_status.copy() def get_connected_devices(self) -> List[str]: """获取已连接的设备列表""" return [device for device, status in self.device_status.items() if status] def refresh_devices(self): """刷新设备连接""" logger.info('刷新设备连接...') # 重新初始化所有设备 if self.camera: self.camera.release() self._init_devices() def calibrate_devices(self) -> Dict[str, Any]: """校准设备""" calibration_result = {} try: # 摄像头校准 if self.device_status['camera']: camera_calibration = self._calibrate_camera() calibration_result['camera'] = camera_calibration # IMU校准 if self.device_status['imu']: imu_calibration = self._calibrate_imu() calibration_result['imu'] = imu_calibration # 压力传感器校准 if self.device_status['pressure']: pressure_calibration = self._calibrate_pressure() calibration_result['pressure'] = pressure_calibration self.calibration_data = calibration_result logger.info('设备校准完成') except Exception as e: logger.error(f'设备校准失败: {e}') raise return calibration_result def _calibrate_camera(self) -> Dict[str, Any]: """校准摄像头""" if not self.camera or not self.camera.isOpened(): return {'status': 'failed', 'error': '摄像头未连接'} try: # 获取几帧图像进行校准 frames = [] for _ in range(10): ret, frame = self.camera.read() if ret: frames.append(frame) time.sleep(0.1) if not frames: return {'status': 'failed', 'error': '无法获取图像'} # 计算平均亮度和对比度 avg_brightness = np.mean([np.mean(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) for frame in frames]) calibration = { 'status': 'success', 'brightness': float(avg_brightness), 'resolution': (int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))), 'fps': float(self.camera.get(cv2.CAP_PROP_FPS)), 'timestamp': datetime.now().isoformat() } return calibration except Exception as e: return {'status': 'failed', 'error': str(e)} def _calibrate_imu(self) -> Dict[str, Any]: """校准IMU传感器""" if not self.imu_device: return {'status': 'failed', 'error': 'IMU设备未连接'} try: # 收集静态数据进行零点校准 samples = [] for _ in range(100): data = self.imu_device.read_data() samples.append(data) time.sleep(0.01) # 计算零点偏移 accel_offset = { 'x': np.mean([s['accel']['x'] for s in samples]), 'y': np.mean([s['accel']['y'] for s in samples]), 'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿 } gyro_offset = { 'x': np.mean([s['gyro']['x'] for s in samples]), 'y': np.mean([s['gyro']['y'] for s in samples]), 'z': np.mean([s['gyro']['z'] for s in samples]) } calibration = { 'status': 'success', 'accel_offset': accel_offset, 'gyro_offset': gyro_offset, 'timestamp': datetime.now().isoformat() } return calibration except Exception as e: return {'status': 'failed', 'error': str(e)} def _calibrate_pressure(self) -> Dict[str, Any]: """校准压力传感器""" if not self.pressure_device: return {'status': 'failed', 'error': '压力传感器未连接'} try: # 收集零压力数据 samples = [] for _ in range(50): data = self.pressure_device.read_data() samples.append(data) time.sleep(0.02) # 计算零点偏移 zero_offset = { 'left_foot': np.mean([s['left_foot'] for s in samples]), 'right_foot': np.mean([s['right_foot'] for s in samples]) } calibration = { 'status': 'success', 'zero_offset': zero_offset, 'timestamp': datetime.now().isoformat() } return calibration except Exception as e: return {'status': 'failed', 'error': str(e)} def collect_data(self) -> Dict[str, Any]: """采集所有设备数据""" data = {} timestamp = datetime.now().isoformat() try: # 采集摄像头数据 if self.device_status['camera']: camera_data = self._collect_camera_data() if camera_data: data['camera'] = camera_data # 采集IMU数据 if self.device_status['imu']: imu_data = self._collect_imu_data() if imu_data: data['imu'] = imu_data # 采集压力传感器数据 if self.device_status['pressure']: pressure_data = self._collect_pressure_data() if pressure_data: data['pressure'] = pressure_data # 添加时间戳 data['timestamp'] = timestamp # 更新最新数据 with self.data_lock: self.latest_data = data.copy() except Exception as e: logger.error(f'数据采集失败: {e}') return data def _collect_camera_data(self) -> Optional[Dict[str, Any]]: """采集摄像头数据""" if not self.camera or not self.camera.isOpened(): return None try: ret, frame = self.camera.read() if not ret: return None # 进行姿态检测(这里应该集成实际的姿态检测算法) pose_data = self._detect_pose(frame) return { 'frame_available': True, 'frame_size': frame.shape, 'pose_data': pose_data, 'timestamp': datetime.now().isoformat() } except Exception as e: logger.error(f'摄像头数据采集失败: {e}') return None def _detect_pose(self, frame: np.ndarray) -> Dict[str, Any]: """姿态检测(模拟实现)""" # 这里应该集成实际的姿态检测算法,如MediaPipe或OpenPose # 目前返回模拟数据 return { 'center_of_gravity': { 'x': np.random.normal(0, 5), 'y': np.random.normal(0, 5) }, 'body_angle': { 'pitch': np.random.normal(0, 2), 'roll': np.random.normal(0, 2), 'yaw': np.random.normal(0, 1) }, 'confidence': np.random.uniform(0.8, 1.0) } def _collect_imu_data(self) -> Optional[Dict[str, Any]]: """采集IMU数据""" if not self.imu_device: return None try: raw_data = self.imu_device.read_data() # 应用校准偏移 if 'imu' in self.calibration_data: calibration = self.calibration_data['imu'] if calibration.get('status') == 'success': accel_offset = calibration.get('accel_offset', {}) gyro_offset = calibration.get('gyro_offset', {}) # 校正加速度数据 raw_data['accel']['x'] -= accel_offset.get('x', 0) raw_data['accel']['y'] -= accel_offset.get('y', 0) raw_data['accel']['z'] -= accel_offset.get('z', 0) # 校正陀螺仪数据 raw_data['gyro']['x'] -= gyro_offset.get('x', 0) raw_data['gyro']['y'] -= gyro_offset.get('y', 0) raw_data['gyro']['z'] -= gyro_offset.get('z', 0) return raw_data except Exception as e: logger.error(f'IMU数据采集失败: {e}') return None def _collect_pressure_data(self) -> Optional[Dict[str, Any]]: """采集压力传感器数据""" if not self.pressure_device: return None try: raw_data = self.pressure_device.read_data() # 应用校准偏移 if 'pressure' in self.calibration_data: calibration = self.calibration_data['pressure'] if calibration.get('status') == 'success': zero_offset = calibration.get('zero_offset', {}) raw_data['left_foot'] -= zero_offset.get('left_foot', 0) raw_data['right_foot'] -= zero_offset.get('right_foot', 0) # 计算重心位置 total_pressure = raw_data['left_foot'] + raw_data['right_foot'] if total_pressure > 0: center_of_pressure = { 'x': (raw_data['right_foot'] - raw_data['left_foot']) / total_pressure * 100, 'y': 0 # 简化模型 } else: center_of_pressure = {'x': 0, 'y': 0} raw_data['center_of_pressure'] = center_of_pressure raw_data['total_pressure'] = total_pressure return raw_data except Exception as e: logger.error(f'压力传感器数据采集失败: {e}') return None def get_latest_data(self) -> Dict[str, Any]: """获取最新采集的数据""" with self.data_lock: return self.latest_data.copy() def start_video_recording(self, output_path: str) -> bool: """开始视频录制""" if not self.camera or not self.camera.isOpened(): return False try: # 获取摄像头参数 width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(self.camera.get(cv2.CAP_PROP_FPS)) # 创建视频写入器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') self.video_writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) if self.video_writer.isOpened(): logger.info(f'开始视频录制: {output_path}') return True else: logger.error('视频写入器创建失败') return False except Exception as e: logger.error(f'开始视频录制失败: {e}') return False def stop_video_recording(self): """停止视频录制""" if hasattr(self, 'video_writer') and self.video_writer: self.video_writer.release() self.video_writer = None logger.info('视频录制已停止') def cleanup(self): """清理资源""" try: if self.camera: self.camera.release() if hasattr(self, 'video_writer') and self.video_writer: self.video_writer.release() logger.info('设备资源已清理') except Exception as e: logger.error(f'清理设备资源失败: {e}') class MockIMUDevice: """模拟IMU设备""" def __init__(self): self.noise_level = 0.1 def read_data(self) -> Dict[str, Any]: """读取IMU数据""" return { 'accel': { 'x': np.random.normal(0, self.noise_level), 'y': np.random.normal(0, self.noise_level), 'z': np.random.normal(9.8, self.noise_level) # 重力加速度 }, 'gyro': { 'x': np.random.normal(0, self.noise_level), 'y': np.random.normal(0, self.noise_level), 'z': np.random.normal(0, self.noise_level) }, 'temperature': np.random.normal(25, 2), 'timestamp': datetime.now().isoformat() } class MockPressureDevice: """模拟压力传感器设备""" def __init__(self): self.base_pressure = 500 # 基础压力值 self.noise_level = 10 def read_data(self) -> Dict[str, Any]: """读取压力数据""" # 模拟轻微的左右脚压力差异 left_pressure = self.base_pressure + np.random.normal(0, self.noise_level) right_pressure = self.base_pressure + np.random.normal(0, self.noise_level) return { 'left_foot': max(0, left_pressure), 'right_foot': max(0, right_pressure), 'timestamp': datetime.now().isoformat() }