#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ IMU传感器管理器 负责IMU传感器的连接、校准和头部姿态数据采集 """ import serial import threading import time import numpy as np from typing import Optional, Dict, Any import logging from collections import deque from datetime import datetime import asyncio try: from .base_device import BaseDevice from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.config_manager import ConfigManager # 设置日志 logger = logging.getLogger(__name__) class BleIMUDevice: """蓝牙IMU设备,基于bleak实现,解析逻辑参考tests/testblueimu.py""" def __init__(self, mac_address: str): self.mac_address = mac_address self.loop = None self.loop_thread = None self.client = None self.running = False self._lock = threading.Lock() self.disconnected_event = None self.calibration_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} self.last_data = { 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'temperature': 25.0 } self._connected = False # GATT特征(参考测试脚本中的handle/short uuid) self._notify_char = 0x0007 self._write_char = 0x0005 def set_calibration(self, calibration: Dict[str, Any]): self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: if not raw_data or 'head_pose' not in raw_data: return raw_data calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() # 与串口IMU保持一致:对 rotation 做归一化到 [-180, 180) angle = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['rotation'] = round(((angle + 180) % 360) - 180,1) head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'],1) head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'],1) calibrated_data['head_pose'] = head_pose return calibrated_data def start(self): if self.running: return self.running = True self.loop_thread = threading.Thread(target=self._run_loop, daemon=True) self.loop_thread.start() def stop(self): self.running = False try: if self.loop: asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop) except Exception: pass def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: with self._lock: raw = { 'head_pose': { 'rotation': self.last_data['yaw'], # rotation 对应航向角 'tilt': self.last_data['pitch'], # tilt 对应横滚 'pitch': self.last_data['roll'] # pitch 对应俯仰 }, 'temperature': self.last_data.get('temperature', 25.0), 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw def _run_loop(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self._connect_and_listen()) except Exception as e: logger.error(f'BLE IMU事件循环异常: {e}', exc_info=True) finally: try: if not self.loop.is_closed(): self.loop.stop() self.loop.close() except Exception: pass async def _disconnect(self): try: if self.client and self.client.is_connected: try: await self.client.stop_notify(self._notify_char) except Exception: pass await self.client.disconnect() except Exception: pass async def _connect_and_listen(self): try: from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic except Exception as e: logger.error(f"未安装bleak或导入失败: {e}") self.running = False return while self.running: try: # logger.info(f"扫描并连接蓝牙IMU: {self.mac_address} ...") device = await BleakScanner.find_device_by_address(self.mac_address, cb=dict(use_bdaddr=False)) if device is None: # logger.warning(f"未找到设备: {self.mac_address}") await asyncio.sleep(2.0) continue self.disconnected_event = asyncio.Event() def _on_disconnected(_client): logger.info("BLE IMU已断开连接") self._connected = False try: self.disconnected_event.set() except Exception: pass self.client = BleakClient(device, disconnected_callback=_on_disconnected) logger.info("正在连接BLE IMU...") await self.client.connect() self._connected = True logger.info("BLE IMU连接成功") # 订阅通知 await self.client.start_notify(self._notify_char, self._notification_handler) # 发送初始化指令(参考测试脚本) try: await self.client.write_gatt_char(self._write_char, bytes([0x29])) # 保持连接 await asyncio.sleep(0.2) await self.client.write_gatt_char(self._write_char, bytes([0x46])) # 高速模式 await asyncio.sleep(0.2) isCompassOn = 0 barometerFilter = 2 Cmd_ReportTag = 0x0FFF params = bytearray([0x00 for _ in range(11)]) params[0] = 0x12 params[1] = 5 params[2] = 255 params[3] = 0 params[4] = ((barometerFilter & 3) << 1) | (isCompassOn & 1) params[5] = 60 # 发送帧率 params[6] = 1 params[7] = 3 params[8] = 5 params[9] = Cmd_ReportTag & 0xff params[10] = (Cmd_ReportTag >> 8) & 0xff await self.client.write_gatt_char(self._write_char, params) await asyncio.sleep(0.2) await self.client.write_gatt_char(self._write_char, bytes([0x19])) # 开始上报 except Exception as e: logger.warning(f"BLE IMU写入初始化指令失败: {e}") # 保持连接直到停止或断开 while self.running and self.client and self.client.is_connected: await asyncio.sleep(1.0) # 退出前尝试停止通知 try: await self.client.stop_notify(self._notify_char) except Exception: pass try: await self.client.disconnect() except Exception: pass self._connected = False except Exception as e: logger.error(f"BLE IMU连接/监听失败: {e}", exc_info=True) self._connected = False await asyncio.sleep(2.0) def _notification_handler(self, characteristic, data: bytearray): """通知回调:解析IMU数据,更新欧拉角""" try: buf = data if len(buf) < 3: return if buf[0] != 0x11: return # 比例系数 scaleAngle = 0.0054931640625 # 180/32768 scaleTemperature = 0.01 ctl = (buf[2] << 8) | buf[1] L = 7 # 数据偏移起点 tmp_temperature = None # 跳过前面不关心的标志位,关注角度与温度 if (ctl & 0x0001) != 0: L += 6 # aX aY aZ if (ctl & 0x0002) != 0: L += 6 # AX AY AZ if (ctl & 0x0004) != 0: L += 6 # GX GY GZ if (ctl & 0x0008) != 0: L += 6 # CX CY CZ if (ctl & 0x0010) != 0: # 温度 tmpX = np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleTemperature L += 2 # 气压与高度各3字节 # 气压 tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L]))) if (tmpU32 & 0x800000) == 0x800000: tmpU32 = (tmpU32 | 0xff000000) tmpY = np.int32(tmpU32) * 0.0002384185791 L += 3 # 高度 tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L]))) if (tmpU32 & 0x800000) == 0x800000: tmpU32 = (tmpU32 | 0xff000000) tmpZ = np.int32(tmpU32) * 0.0010728836 L += 3 tmp_temperature = float(tmpX) if (ctl & 0x0020) != 0: L += 8 # 四元数 wxyz if (ctl & 0x0040) != 0: angleX = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 angleY = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 angleZ = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 with self._lock: # 映射:roll=X, pitch=Y, yaw=Z self.last_data['roll'] = round(angleX*-1, 1) self.last_data['pitch'] = round(angleY*-1, 1) self.last_data['yaw'] = round(angleZ*-1, 1) if tmp_temperature is not None: self.last_data['temperature'] = tmp_temperature except Exception: # 解析失败忽略 pass @property def connected(self) -> bool: return self._connected class MockIMUDevice: def __init__(self): self.running = False self.thread = None self._lock = threading.Lock() self._connected = False self.calibration_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} self.last_data = { 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'temperature': 25.0 } self._phase = 0.0 def set_calibration(self, calibration: Dict[str, Any]): self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: if not raw_data or 'head_pose' not in raw_data: return raw_data calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() angle = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['rotation'] = round(((angle + 180) % 360) - 180, 1) head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'], 1) head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'], 1) calibrated_data['head_pose'] = head_pose return calibrated_data def start(self): if self.running: return self.running = True self._connected = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() def stop(self): self.running = False try: if self.thread and self.thread.is_alive(): self.thread.join(timeout=2.0) except Exception: pass self._connected = False def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: with self._lock: raw = { 'head_pose': { 'rotation': self.last_data['yaw'], 'tilt': self.last_data['pitch'], 'pitch': self.last_data['roll'] }, 'temperature': self.last_data.get('temperature', 25.0), 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw def _run_loop(self): # 模拟IMU设备的后台数据生成线程(约60Hz),输出平滑变化的姿态与温度 import math try: while self.running: # 相位累加,用于驱动正弦/余弦波形 self._phase += 0.05 # 航向角(yaw,左右旋转),幅度约±30° yaw = math.sin(self._phase * 0.6) * 30.0 # yaw = 0 # 俯仰角(pitch,上下点头),幅度约±10° pitch = math.sin(self._phase) * 5.0 # pitch = 0 # 横滚角(roll,左右侧倾),幅度约±8° roll = math.cos(self._phase * 0.8) * 5.0 # roll = 0 # 写入最新数据,使用1位或2位小数以模拟设备精度 with self._lock: self.last_data['yaw'] = round(yaw, 1) self.last_data['pitch'] = round(pitch, 1) self.last_data['roll'] = round(roll, 1) # 温度模拟:以25℃为基准,叠加±0.5℃的轻微波动 self.last_data['temperature'] = round(25.0 + math.sin(self._phase * 0.2) * 0.5, 2) # 控制输出频率为约60Hz time.sleep(1.0 / 30.0) except Exception: # 忽略模拟线程异常,避免影响主流程 pass @property def connected(self) -> bool: return self._connected class IMUManager(BaseDevice): """IMU传感器管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化IMU管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() config = self.config_manager.get_device_config('imu') super().__init__("imu", config) # 保存socketio实例 self._socketio = socketio # 设备配置 self.port = config.get('port', 'COM7') self.baudrate = config.get('baudrate', 9600) self.device_type = config.get('device_type', 'ble') # 'real' | 'mock' | 'ble' self.mac_address = config.get('mac_address', '') # IMU设备实例 self.imu_device = None # 推流相关 self.imu_streaming = False self.imu_thread = None # 统计信息 self.data_count = 0 self.error_count = 0 # 校准相关 self.is_calibrated = False self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 数据缓存 self.data_buffer = deque(maxlen=100) self.last_valid_data = None self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}, MAC: {self.mac_address}") def initialize(self) -> bool: """ 初始化IMU设备 Returns: bool: 初始化是否成功 """ try: self.logger.info(f"正在初始化IMU设备...") # 使用构造函数中已加载的配置,避免并发读取配置文件 self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}, mac={self.mac_address}") # 根据配置选择设备类型 if self.device_type == 'ble': if not self.mac_address: self.logger.error("IMU BLE设备未配置MAC地址") self.is_connected = False return False self.logger.info(f"使用蓝牙IMU设备 - MAC: {self.mac_address}") self.imu_device = BleIMUDevice(self.mac_address) self.imu_device.start() # 使用set_connected方法来正确启动连接监控线程 self.set_connected(True) else: self.logger.info("使用模拟IMU设备") self.imu_device = MockIMUDevice() self.imu_device.start() # 使用set_connected方法来正确启动连接监控线程 self.set_connected(True) self._device_info.update({ 'port': self.port, 'baudrate': self.baudrate, 'mac_address': self.mac_address, }) self.logger.info("IMU初始化成功") return True except Exception as e: self.logger.error(f"IMU初始化失败: {e}") self.is_connected = False self.imu_device = None return False def _quick_calibrate_imu(self) -> Dict[str, Any]: """ 快速IMU零点校准(以当前姿态为基准) Returns: Dict[str, Any]: 校准结果 """ try: if not self.imu_device: return {'status': 'error', 'error': 'IMU设备未初始化'} self.logger.info('开始IMU快速零点校准...') # 直接读取一次原始数据作为校准偏移量 raw_data = self.imu_device.read_data(apply_calibration=False) if not raw_data or 'head_pose' not in raw_data: return {'status': 'error', 'error': '无法读取IMU原始数据'} # 使用当前姿态作为零点偏移 self.head_pose_offset = { 'rotation': raw_data['head_pose']['rotation'], 'tilt': raw_data['head_pose']['tilt'], 'pitch': raw_data['head_pose']['pitch'] } # 应用校准到设备 calibration_data = {'head_pose_offset': self.head_pose_offset} self.imu_device.set_calibration(calibration_data) self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}') return { 'status': 'success', 'head_pose_offset': self.head_pose_offset } except Exception as e: self.logger.error(f'IMU快速校准失败: {e}') return {'status': 'error', 'error': str(e)} def calibrate(self) -> bool: """ 校准IMU传感器 Returns: bool: 校准是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): self.logger.error("IMU设备未连接") return False # 使用快速校准方法 result = self._quick_calibrate_imu() if result['status'] == 'success': self.is_calibrated = True self.logger.info("IMU校准成功") return True else: self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}") return False except Exception as e: self.logger.error(f"IMU校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始IMU数据流 Args: socketio: SocketIO实例,用于数据推送 Returns: bool: 启动是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): return False if self.imu_streaming: self.logger.warning("IMU数据流已在运行") return True # 启动前进行快速校准 if not self.is_calibrated: self.logger.info("启动前进行快速零点校准...") self._quick_calibrate_imu() self.imu_streaming = True self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) self.imu_thread.start() self.logger.info("IMU数据流启动成功") return True except Exception as e: self.logger.error(f"IMU数据流启动失败: {e}") self.imu_streaming = False return False def stop_streaming(self) -> bool: """ 停止IMU数据流 Returns: bool: 停止是否成功 """ try: self.imu_streaming = False if self.imu_thread and self.imu_thread.is_alive(): self.imu_thread.join(timeout=3.0) # 停止BLE后台任务(如果是BLE设备) try: if isinstance(self.imu_device, BleIMUDevice): self.imu_device.stop() except Exception: pass self.logger.info("IMU数据流已停止") return True except Exception as e: self.logger.error(f"停止IMU数据流失败: {e}") return False def _imu_streaming_thread(self): """ IMU数据流工作线程 """ self.logger.info("IMU数据流工作线程启动") while self.imu_streaming: try: if self.imu_device: # 读取IMU数据 data = self.imu_device.read_data(apply_calibration=True) if data: # 发送数据到前端 if self._socketio: self._socketio.emit('imu_data', data, namespace='/devices') # 更新统计 self.data_count += 1 else: self.error_count += 1 time.sleep(0.02) # 50Hz采样率 except Exception as e: self.logger.error(f"IMU数据流处理异常: {e}") self.error_count += 1 time.sleep(0.1) self.logger.info("IMU数据流工作线程结束") def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'port': self.port, 'baudrate': self.baudrate, 'is_streaming': self.imu_streaming, 'is_calibrated': self.is_calibrated, 'data_count': self.data_count, 'error_count': self.error_count, 'buffer_size': len(self.data_buffer), 'has_data': self.last_valid_data is not None, 'head_pose_offset': self.head_pose_offset, 'device_type': self.device_type, 'mac_address': self.mac_address }) return status def get_latest_data(self) -> Optional[Dict[str, float]]: """ 获取最新的IMU数据 Returns: Optional[Dict[str, float]]: 最新数据,无数据返回None """ return self.last_valid_data.copy() if self.last_valid_data else None def disconnect(self): """ 断开IMU设备连接 """ try: self.stop_streaming() if self.imu_device: self.imu_device = None self.is_connected = False self.logger.info("IMU设备已断开连接") except Exception as e: self.logger.error(f"断开IMU设备连接失败: {e}") def reload_config(self) -> bool: """ 重新加载设备配置 Returns: bool: 重新加载是否成功 """ try: self.logger.info("正在重新加载IMU配置...") # 获取最新配置 config = self.config_manager.get_device_config('imu') # 更新配置属性 self.port = config.get('port', 'COM7') self.baudrate = config.get('baudrate', 9600) self.device_type = config.get('device_type', 'mock') self.mac_address = config.get('mac_address', '') # 更新数据缓存队列大小 buffer_size = config.get('buffer_size', 100) if buffer_size != self.data_buffer.maxlen: # 保存当前数据 current_data = list(self.data_buffer) # 创建新缓冲区 self.data_buffer = deque(maxlen=buffer_size) # 恢复数据(保留最新的数据) for data in current_data[-buffer_size:]: self.data_buffer.append(data) self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}, MAC: {self.mac_address}") return True except Exception as e: self.logger.error(f"重新加载IMU配置失败: {e}") return False def check_hardware_connection(self) -> bool: """ 检查IMU硬件连接状态 """ try: if not self.imu_device: return False # 检查设备类型并分别处理 if isinstance(self.imu_device, RealIMUDevice): # 对于真实串口设备,检查串口连接状态 if hasattr(self.imu_device, 'ser') and self.imu_device.ser: # 检查串口是否仍然打开 if not self.imu_device.ser.is_open: return False # 尝试读取数据来验证连接 try: # 保存当前超时设置 original_timeout = self.imu_device.ser.timeout self.imu_device.ser.timeout = 0.1 # 设置短超时 # 尝试读取少量数据 test_data = self.imu_device.ser.read(1) # 恢复原始超时设置 self.imu_device.ser.timeout = original_timeout return True # 如果没有异常,认为连接正常 except Exception: return False else: return False elif isinstance(self.imu_device, BleIMUDevice): # 对于蓝牙设备,检查连接状态 return self.imu_device.connected # 对于模拟设备或其他类型,总是返回True return True except Exception as e: self.logger.debug(f"检查IMU硬件连接时出错: {e}") return False def cleanup(self): """ 清理资源 """ try: # 停止连接监控 self._cleanup_monitoring() self.disconnect() # 清理缓冲区 self.data_buffer.clear() # 重置状态 self.is_calibrated = False self.last_valid_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} super().cleanup() self.logger.info("IMU资源清理完成") except Exception as e: self.logger.error(f"清理IMU资源失败: {e}")