#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ IMU传感器管理器 负责IMU传感器的连接、校准和头部姿态数据采集 """ import serial import threading import time import json import numpy as np from typing import Optional, Dict, Any, List, Tuple import logging from collections import deque import struct from datetime import datetime try: from .base_device import BaseDevice from .utils.socket_manager import SocketManager from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.socket_manager import SocketManager from utils.config_manager import ConfigManager # 设置日志 logger = logging.getLogger(__name__) class RealIMUDevice: """真实IMU设备,通过串口读取姿态数据""" def __init__(self, port, baudrate): self.port = port self.baudrate = baudrate self.ser = None self.buffer = bytearray() self.calibration_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} self.last_data = { 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'temperature': 25.0 } logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}') self._connect() def _connect(self): try: logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}') self.ser = serial.Serial(self.port, self.baudrate, timeout=1) if hasattr(self.ser, 'reset_input_buffer'): try: self.ser.reset_input_buffer() logger.debug('已清空串口输入缓冲区') except Exception as e: logger.debug(f'重置串口输入缓冲区失败: {e}') logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps') except Exception as e: # logger.error(f'IMU设备连接失败: {e}', exc_info=True) self.ser = None def set_calibration(self, calibration: Dict[str, Any]): self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] logger.debug(f'应用IMU校准数据: {self.head_pose_offset}') def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量""" if not raw_data or 'head_pose' not in raw_data: return raw_data # 应用校准偏移 calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() angle=head_pose['rotation'] - self.head_pose_offset['rotation'] # 减去基准值(零点偏移) head_pose['rotation'] = ((angle + 180) % 360) - 180 head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] calibrated_data['head_pose'] = head_pose return calibrated_data @staticmethod def _checksum(data: bytes) -> int: return sum(data[:-1]) & 0xFF def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]: if len(data) != 11: logger.debug(f'无效数据包长度: {len(data)}') return None if data[0] != 0x55: logger.debug(f'错误的包头: 0x{data[0]:02X}') return None if self._checksum(data) != data[-1]: logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}') return None packet_type = data[1] vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)] if packet_type == 0x53: # 姿态角,单位0.01° pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数 # 使用第一段代码里的比例系数 k_angle = 180.0 roll = -round(rxl / 32768.0 * k_angle,2) pitch = -round(pitchl / 32768.0 * k_angle,2) yaw = -round(yawl / 32768.0 * k_angle,2) temp = temp / 100.0 self.last_data = { 'roll': roll, 'pitch': pitch, 'yaw': yaw, 'temperature': temp } # print(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}') return self.last_data else: # logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}') return None def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: if not self.ser or not getattr(self.ser, 'is_open', False): # logger.warning('IMU串口未连接,尝试重新连接...') self._connect() return { 'head_pose': { 'rotation': self.last_data['yaw'], 'tilt': self.last_data['roll'], 'pitch': self.last_data['pitch'] }, 'temperature': self.last_data['temperature'], 'timestamp': datetime.now().isoformat() } try: bytes_waiting = self.ser.in_waiting if bytes_waiting: # logger.debug(f'串口缓冲区待读字节: {bytes_waiting}') chunk = self.ser.read(bytes_waiting) # logger.debug(f'读取到字节: {len(chunk)}') self.buffer.extend(chunk) while len(self.buffer) >= 11: if self.buffer[0] != 0x55: dropped = self.buffer.pop(0) logger.debug(f'丢弃无效字节: 0x{dropped:02X}') continue packet = bytes(self.buffer[:11]) parsed = self._parse_packet(packet) del self.buffer[:11] if parsed is not None: raw = { 'head_pose': { 'rotation': parsed['yaw'], # rotation = roll 'tilt': parsed['roll'], # tilt = yaw 'pitch': parsed['pitch'] # pitch = pitch }, 'temperature': parsed['temperature'], 'timestamp': datetime.now().isoformat() } # logger.debug(f'映射后的头部姿态: {raw}') return self.apply_calibration(raw) if apply_calibration else raw raw = { 'head_pose': { 'rotation': self.last_data['yaw'], 'tilt': self.last_data['roll'], 'pitch': self.last_data['pitch'] }, 'temperature': self.last_data['temperature'], 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw except Exception as e: logger.error(f'IMU数据读取异常: {e}', exc_info=True) raw = { 'head_pose': { 'rotation': self.last_data['yaw'], 'tilt': self.last_data['roll'], 'pitch': self.last_data['pitch'] }, 'temperature': self.last_data['temperature'], 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw def __del__(self): try: if self.ser and getattr(self.ser, 'is_open', False): self.ser.close() logger.info('IMU设备串口已关闭') except Exception: pass class MockIMUDevice: """模拟IMU设备""" def __init__(self): self.noise_level = 0.1 self.calibration_data = None # 校准数据 self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 头部姿态零点偏移 def set_calibration(self, calibration: Dict[str, Any]): """设置校准数据""" self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: """应用校准:将当前姿态减去初始偏移,得到相对姿态""" if not raw_data or 'head_pose' not in raw_data: return raw_data calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] calibrated_data['head_pose'] = head_pose return calibrated_data def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: """读取IMU数据""" # 生成头部姿态角度数据,角度范围(-90°, +90°) # 使用正弦波模拟自然的头部运动,添加随机噪声 import time current_time = time.time() # 旋转角(左旋为负,右旋为正) rotation_angle = 30 * np.sin(current_time * 0.5) + np.random.normal(0, self.noise_level * 5) rotation_angle = np.clip(rotation_angle, -90, 90) # 倾斜角(左倾为负,右倾为正) tilt_angle = 20 * np.sin(current_time * 0.3 + np.pi/4) + np.random.normal(0, self.noise_level * 5) tilt_angle = np.clip(tilt_angle, -90, 90) # 俯仰角(俯角为负,仰角为正) pitch_angle = 15 * np.sin(current_time * 0.7 + np.pi/2) + np.random.normal(0, self.noise_level * 5) pitch_angle = np.clip(pitch_angle, -90, 90) # 生成原始数据 raw_data = { 'head_pose': { 'rotation': rotation_angle, # 旋转角:左旋(-), 右旋(+) 'tilt': tilt_angle, # 倾斜角:左倾(-), 右倾(+) 'pitch': pitch_angle # 俯仰角:俯角(-), 仰角(+) }, 'timestamp': datetime.now().isoformat() } # 应用校准并返回 return self.apply_calibration(raw_data) if apply_calibration else raw_data class IMUManager(BaseDevice): """IMU传感器管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化IMU管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() config = self.config_manager.get_device_config('imu') super().__init__("imu", config) # 保存socketio实例 self._socketio = socketio # 设备配置 self.port = config.get('port', 'COM7') self.baudrate = config.get('baudrate', 9600) self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock' self.use_mock = config.get('use_mock', False) # 保持向后兼容 # IMU设备实例 self.imu_device = None # 推流相关 self.imu_streaming = False self.imu_thread = None # 统计信息 self.data_count = 0 self.error_count = 0 # 校准相关 self.is_calibrated = False self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 数据缓存 self.data_buffer = deque(maxlen=100) self.last_valid_data = None self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}") def initialize(self) -> bool: """ 初始化IMU设备 Returns: bool: 初始化是否成功 """ try: self.logger.info(f"正在初始化IMU设备...") # 根据配置选择真实设备或模拟设备 # 优先使用device_type配置,如果没有则使用use_mock配置(向后兼容) use_real_device = (self.device_type == 'real') or (not self.use_mock) if use_real_device: self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}") self.imu_device = RealIMUDevice(self.port, self.baudrate) # 检查真实设备是否连接成功 if self.imu_device.ser is None: self.logger.error(f"IMU设备连接失败: 无法打开串口 {self.port}") self.is_connected = False self.imu_device = None return False else: self.logger.info("使用模拟IMU设备") self.imu_device = MockIMUDevice() self.is_connected = True self._device_info.update({ 'port': self.port, 'baudrate': self.baudrate, 'use_mock': self.use_mock }) self.logger.info("IMU初始化成功") return True except Exception as e: self.logger.error(f"IMU初始化失败: {e}") self.is_connected = False self.imu_device = None return False def _quick_calibrate_imu(self) -> Dict[str, Any]: """ 快速IMU零点校准(以当前姿态为基准) Returns: Dict[str, Any]: 校准结果 """ try: if not self.imu_device: return {'status': 'error', 'error': 'IMU设备未初始化'} self.logger.info('开始IMU快速零点校准...') # 直接读取一次原始数据作为校准偏移量 raw_data = self.imu_device.read_data(apply_calibration=False) if not raw_data or 'head_pose' not in raw_data: return {'status': 'error', 'error': '无法读取IMU原始数据'} # 使用当前姿态作为零点偏移 self.head_pose_offset = { 'rotation': raw_data['head_pose']['rotation'], 'tilt': raw_data['head_pose']['tilt'], 'pitch': raw_data['head_pose']['pitch'] } # 应用校准到设备 calibration_data = {'head_pose_offset': self.head_pose_offset} self.imu_device.set_calibration(calibration_data) self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}') return { 'status': 'success', 'head_pose_offset': self.head_pose_offset } except Exception as e: self.logger.error(f'IMU快速校准失败: {e}') return {'status': 'error', 'error': str(e)} def calibrate(self) -> bool: """ 校准IMU传感器 Returns: bool: 校准是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): self.logger.error("IMU设备未连接") return False # 使用快速校准方法 result = self._quick_calibrate_imu() if result['status'] == 'success': self.is_calibrated = True self.logger.info("IMU校准成功") return True else: self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}") return False except Exception as e: self.logger.error(f"IMU校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始IMU数据流 Args: socketio: SocketIO实例,用于数据推送 Returns: bool: 启动是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): return False if self.imu_streaming: self.logger.warning("IMU数据流已在运行") return True # 启动前进行快速校准 if not self.is_calibrated: self.logger.info("启动前进行快速零点校准...") self._quick_calibrate_imu() self.imu_streaming = True self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) self.imu_thread.start() self.logger.info("IMU数据流启动成功") return True except Exception as e: self.logger.error(f"IMU数据流启动失败: {e}") self.imu_streaming = False return False def stop_streaming(self) -> bool: """ 停止IMU数据流 Returns: bool: 停止是否成功 """ try: self.imu_streaming = False if self.imu_thread and self.imu_thread.is_alive(): self.imu_thread.join(timeout=3.0) self.logger.info("IMU数据流已停止") return True except Exception as e: self.logger.error(f"停止IMU数据流失败: {e}") return False def _imu_streaming_thread(self): """ IMU数据流工作线程 """ self.logger.info("IMU数据流工作线程启动") while self.imu_streaming: try: if self.imu_device: # 读取IMU数据 data = self.imu_device.read_data(apply_calibration=True) if data: # 缓存数据 # self.data_buffer.append(data) # self.last_valid_data = data # 发送数据到前端 if self._socketio: self._socketio.emit('imu_data', data, namespace='/devices') # 更新统计 self.data_count += 1 else: self.error_count += 1 time.sleep(0.02) # 50Hz采样率 except Exception as e: self.logger.error(f"IMU数据流处理异常: {e}") self.error_count += 1 time.sleep(0.1) self.logger.info("IMU数据流工作线程结束") def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'port': self.port, 'baudrate': self.baudrate, 'is_streaming': self.imu_streaming, 'is_calibrated': self.is_calibrated, 'data_count': self.data_count, 'error_count': self.error_count, 'buffer_size': len(self.data_buffer), 'has_data': self.last_valid_data is not None, 'head_pose_offset': self.head_pose_offset, 'device_type': 'mock' if self.use_mock else 'real' }) return status def get_latest_data(self) -> Optional[Dict[str, float]]: """ 获取最新的IMU数据 Returns: Optional[Dict[str, float]]: 最新数据,无数据返回None """ return self.last_valid_data.copy() if self.last_valid_data else None def collect_head_pose_data(self, duration: int = 10) -> List[Dict[str, Any]]: """ 收集头部姿态数据 Args: duration: 收集时长(秒) Returns: List[Dict[str, Any]]: 收集到的数据列表 """ collected_data = [] if not self.is_connected or not self.imu_device: self.logger.error("IMU设备未连接") return collected_data self.logger.info(f"开始收集头部姿态数据,时长: {duration}秒") start_time = time.time() while time.time() - start_time < duration: try: data = self.imu_device.read_data(apply_calibration=True) if data: # 添加时间戳 data['timestamp'] = time.time() collected_data.append(data) time.sleep(0.02) # 50Hz采样率 except Exception as e: self.logger.error(f"数据收集异常: {e}") break self.logger.info(f"头部姿态数据收集完成,共收集 {len(collected_data)} 个样本") return collected_data def disconnect(self): """ 断开IMU设备连接 """ try: self.stop_streaming() if self.imu_device: self.imu_device = None self.is_connected = False self.logger.info("IMU设备已断开连接") except Exception as e: self.logger.error(f"断开IMU设备连接失败: {e}") def cleanup(self): """ 清理资源 """ try: self.disconnect() # 清理缓冲区 self.data_buffer.clear() # 重置状态 self.is_calibrated = False self.last_valid_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} super().cleanup() self.logger.info("IMU资源清理完成") except Exception as e: self.logger.error(f"清理IMU资源失败: {e}")