#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ IMU传感器管理器 负责IMU传感器的连接、校准和头部姿态数据采集 """ import threading import time from typing import Optional, Dict, Any import logging from collections import deque from datetime import datetime import asyncio try: from .base_device import BaseDevice from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.config_manager import ConfigManager # 设置日志 logger = logging.getLogger(__name__) class BleIMUDevice: """蓝牙IMU设备(WitMotion WT9011DCL-BT50),基于 device_model.py 官方接口""" def __init__(self, mac_address: str): self.mac_address = mac_address self.loop = None self.loop_thread = None self.running = False self._lock = threading.Lock() self.calibration_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} self.last_data = { 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'temperature': 25.0, } self._connected = False self._device_model = None self._open_task = None try: from . import device_model as wit_device_model except Exception: import device_model as wit_device_model self._wit_device_model = wit_device_model def set_calibration(self, calibration: Dict[str, Any]): self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: if not raw_data or 'head_pose' not in raw_data: return raw_data calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() # 与串口IMU保持一致:对 rotation 做归一化到 [-180, 180) angle = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['rotation'] = round(((angle + 180) % 360) - 180,1) head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'],1) head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'],1) calibrated_data['head_pose'] = head_pose return calibrated_data def start(self): if self.running: return self.running = True self.loop_thread = threading.Thread(target=self._run_loop, daemon=True) self.loop_thread.start() def stop(self): self.running = False try: if self.loop: asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop) except Exception: pass def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: with self._lock: raw = { 'head_pose': { 'rotation': self.last_data['yaw'], # rotation 对应航向角 'tilt': self.last_data['pitch'], # tilt 对应横滚 'pitch': self.last_data['roll'] # pitch 对应俯仰 }, 'temperature': self.last_data.get('temperature', 25.0), 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw def _run_loop(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) try: self.loop.run_until_complete(self._connect_and_listen()) except asyncio.CancelledError: pass except Exception as e: logger.error(f'BLE IMU事件循环异常: {e}', exc_info=True) finally: try: if not self.loop.is_closed(): self.loop.stop() self.loop.close() except Exception: pass def _on_device_update(self, dm): try: roll = dm.get("AngX") pitch = dm.get("AngY") yaw = dm.get("AngZ") if roll is None or pitch is None or yaw is None: return with self._lock: self.last_data['roll'] = float(roll) self.last_data['pitch'] = float(pitch) self.last_data['yaw'] = float(yaw) except Exception: pass async def _disconnect(self): try: if self._device_model is not None: try: self._device_model.closeDevice() except Exception: pass if self._open_task is not None and not self._open_task.done(): self._open_task.cancel() try: await self._open_task except asyncio.CancelledError: pass except Exception: pass finally: self._open_task = None self._device_model = None async def _connect_and_listen(self): try: from bleak import BleakScanner except Exception as e: logger.error(f"未安装bleak或导入失败: {e}") self.running = False return while self.running: try: try: device = await BleakScanner.find_device_by_address(self.mac_address, timeout=20.0) except TypeError: device = await BleakScanner.find_device_by_address(self.mac_address, cb=dict(use_bdaddr=False)) if device is None: await asyncio.sleep(2.0) continue self._device_model = self._wit_device_model.DeviceModel("WitMotionBle5.0", device, self._on_device_update) self._open_task = asyncio.create_task(self._device_model.openDevice()) connected = False for _ in range(50): if not self.running: break if self._open_task.done(): break client = getattr(self._device_model, "client", None) if client is not None and getattr(client, "is_connected", False): connected = True break await asyncio.sleep(0.2) self._connected = connected if not connected: await self._disconnect() self._connected = False await asyncio.sleep(2.0) continue while self.running and self._open_task is not None and not self._open_task.done(): await asyncio.sleep(1.0) await self._disconnect() self._connected = False except asyncio.CancelledError: break except Exception as e: logger.error(f"BLE IMU连接/监听失败: {e}", exc_info=True) self._connected = False await asyncio.sleep(2.0) @property def connected(self) -> bool: return self._connected class MockIMUDevice: def __init__(self): self.running = False self.thread = None self._lock = threading.Lock() self._connected = False self.calibration_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} self.last_data = { 'roll': 0.0, 'pitch': 0.0, 'yaw': 0.0, 'temperature': 25.0 } self._phase = 0.0 def set_calibration(self, calibration: Dict[str, Any]): self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: if not raw_data or 'head_pose' not in raw_data: return raw_data calibrated_data = raw_data.copy() head_pose = raw_data['head_pose'].copy() angle = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['rotation'] = round(((angle + 180) % 360) - 180, 1) head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'], 1) head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'], 1) calibrated_data['head_pose'] = head_pose return calibrated_data def start(self): if self.running: return self.running = True self._connected = True self.thread = threading.Thread(target=self._run_loop, daemon=True) self.thread.start() def stop(self): self.running = False try: if self.thread and self.thread.is_alive(): self.thread.join(timeout=2.0) except Exception: pass self._connected = False def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: with self._lock: raw = { 'head_pose': { 'rotation': self.last_data['yaw'], 'tilt': self.last_data['pitch'], 'pitch': self.last_data['roll'] }, 'temperature': self.last_data.get('temperature', 25.0), 'timestamp': datetime.now().isoformat() } return self.apply_calibration(raw) if apply_calibration else raw def _run_loop(self): # 模拟IMU设备的后台数据生成线程(约60Hz),输出平滑变化的姿态与温度 import math try: while self.running: # 相位累加,用于驱动正弦/余弦波形 self._phase += 0.05 # 航向角(yaw,左右旋转),幅度约±30° yaw = math.sin(self._phase * 0.6) * 30.0 # yaw = 0 # 俯仰角(pitch,上下点头),幅度约±10° pitch = math.sin(self._phase) * 5.0 # pitch = 0 # 横滚角(roll,左右侧倾),幅度约±8° roll = math.cos(self._phase * 0.8) * 5.0 # roll = 0 # 写入最新数据,使用1位或2位小数以模拟设备精度 with self._lock: self.last_data['yaw'] = round(yaw, 1) self.last_data['pitch'] = round(pitch, 1) self.last_data['roll'] = round(roll, 1) # 温度模拟:以25℃为基准,叠加±0.5℃的轻微波动 self.last_data['temperature'] = round(25.0 + math.sin(self._phase * 0.2) * 0.5, 2) # 控制输出频率为约60Hz time.sleep(1.0 / 30.0) except Exception: # 忽略模拟线程异常,避免影响主流程 pass @property def connected(self) -> bool: return self._connected class IMUManager(BaseDevice): """IMU传感器管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化IMU管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() config = self.config_manager.get_device_config('imu') super().__init__("imu", config) # 保存socketio实例 self._socketio = socketio # 设备配置 self.use_mock = bool(config.get('use_mock', False)) self.mac_address = config.get('mac_address', '') # IMU设备实例 self.imu_device = None # 推流相关 self.imu_streaming = False self.imu_thread = None # 统计信息 self.data_count = 0 self.error_count = 0 # 校准相关 self.is_calibrated = False self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 数据缓存 self.data_buffer = deque(maxlen=100) self.last_valid_data = None self.logger.info(f"IMU管理器初始化完成 - use_mock: {self.use_mock}, MAC: {self.mac_address}") def initialize(self) -> bool: """ 初始化IMU设备 Returns: bool: 初始化是否成功 """ try: self.logger.info(f"正在初始化IMU设备...") # 使用构造函数中已加载的配置,避免并发读取配置文件 self.logger.info(f"使用已加载配置: use_mock={self.use_mock}, mac={self.mac_address}") # 根据配置选择设备类型 if not self.use_mock: if not self.mac_address: self.logger.error("IMU BLE设备未配置MAC地址") self.is_connected = False return False self.logger.info(f"使用蓝牙IMU设备 - MAC: {self.mac_address}") self.imu_device = BleIMUDevice(self.mac_address) self.imu_device.start() # 使用set_connected方法来正确启动连接监控线程 self.set_connected(True) else: self.logger.info("使用模拟IMU设备") self.imu_device = MockIMUDevice() self.imu_device.start() # 使用set_connected方法来正确启动连接监控线程 self.set_connected(True) self._device_info.update({ 'mac_address': self.mac_address, }) self.logger.info("IMU初始化成功") return True except Exception as e: self.logger.error(f"IMU初始化失败: {e}") self.is_connected = False self.imu_device = None return False def _quick_calibrate_imu(self) -> Dict[str, Any]: """ 快速IMU零点校准(以当前姿态为基准) Returns: Dict[str, Any]: 校准结果 """ try: if not self.imu_device: return {'status': 'error', 'error': 'IMU设备未初始化'} self.logger.info('开始IMU快速零点校准...') # 直接读取一次原始数据作为校准偏移量 raw_data = self.imu_device.read_data(apply_calibration=False) if not raw_data or 'head_pose' not in raw_data: return {'status': 'error', 'error': '无法读取IMU原始数据'} # 使用当前姿态作为零点偏移 self.head_pose_offset = { 'rotation': raw_data['head_pose']['rotation'], 'tilt': raw_data['head_pose']['tilt'], 'pitch': raw_data['head_pose']['pitch'] } # 应用校准到设备 calibration_data = {'head_pose_offset': self.head_pose_offset} self.imu_device.set_calibration(calibration_data) self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}') return { 'status': 'success', 'head_pose_offset': self.head_pose_offset } except Exception as e: self.logger.error(f'IMU快速校准失败: {e}') return {'status': 'error', 'error': str(e)} def calibrate(self) -> bool: """ 校准IMU传感器 Returns: bool: 校准是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): self.logger.error("IMU设备未连接") return False # 使用快速校准方法 result = self._quick_calibrate_imu() if result['status'] == 'success': self.is_calibrated = True self.logger.info("IMU校准成功") return True else: self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}") return False except Exception as e: self.logger.error(f"IMU校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始IMU数据流 Args: socketio: SocketIO实例,用于数据推送 Returns: bool: 启动是否成功 """ try: if not self.is_connected or not self.imu_device: if not self.initialize(): return False if self.imu_streaming: self.logger.warning("IMU数据流已在运行") return True # 启动前进行快速校准 if not self.is_calibrated: self.logger.info("启动前进行快速零点校准...") self._quick_calibrate_imu() self.imu_streaming = True self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) self.imu_thread.start() self.logger.info("IMU数据流启动成功") return True except Exception as e: self.logger.error(f"IMU数据流启动失败: {e}") self.imu_streaming = False return False def stop_streaming(self) -> bool: """ 停止IMU数据流 Returns: bool: 停止是否成功 """ try: self.imu_streaming = False if self.imu_thread and self.imu_thread.is_alive(): self.imu_thread.join(timeout=3.0) # 停止BLE后台任务(如果是BLE设备) try: if isinstance(self.imu_device, BleIMUDevice): self.imu_device.stop() except Exception: pass self.logger.info("IMU数据流已停止") return True except Exception as e: self.logger.error(f"停止IMU数据流失败: {e}") return False def _imu_streaming_thread(self): """ IMU数据流工作线程 """ self.logger.info("IMU数据流工作线程启动") while self.imu_streaming: try: if self.imu_device: # 读取IMU数据 data = self.imu_device.read_data(apply_calibration=True) if data: # 发送数据到前端 if self._socketio: self._socketio.emit('imu_data', data, namespace='/devices') # 更新统计 self.data_count += 1 else: self.error_count += 1 time.sleep(0.02) # 50Hz采样率 except Exception as e: self.logger.error(f"IMU数据流处理异常: {e}") self.error_count += 1 time.sleep(0.1) self.logger.info("IMU数据流工作线程结束") def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'is_streaming': self.imu_streaming, 'is_calibrated': self.is_calibrated, 'data_count': self.data_count, 'error_count': self.error_count, 'buffer_size': len(self.data_buffer), 'has_data': self.last_valid_data is not None, 'head_pose_offset': self.head_pose_offset, 'device_type': 'mock' if self.use_mock else 'ble', 'mac_address': self.mac_address }) return status def get_latest_data(self) -> Optional[Dict[str, float]]: """ 获取最新的IMU数据 Returns: Optional[Dict[str, float]]: 最新数据,无数据返回None """ return self.last_valid_data.copy() if self.last_valid_data else None def disconnect(self): """ 断开IMU设备连接 """ try: self.stop_streaming() if self.imu_device: self.imu_device = None self.is_connected = False self.logger.info("IMU设备已断开连接") except Exception as e: self.logger.error(f"断开IMU设备连接失败: {e}") def reload_config(self) -> bool: """ 重新加载设备配置 Returns: bool: 重新加载是否成功 """ try: self.logger.info("正在重新加载IMU配置...") # 获取最新配置 config = self.config_manager.get_device_config('imu') # 更新配置属性 self.use_mock = bool(config.get('use_mock', False)) self.mac_address = config.get('mac_address', '') # 更新数据缓存队列大小 buffer_size = config.get('buffer_size', 100) if buffer_size != self.data_buffer.maxlen: # 保存当前数据 current_data = list(self.data_buffer) # 创建新缓冲区 self.data_buffer = deque(maxlen=buffer_size) # 恢复数据(保留最新的数据) for data in current_data[-buffer_size:]: self.data_buffer.append(data) self.logger.info(f"IMU配置重新加载成功 - use_mock: {self.use_mock}, MAC: {self.mac_address}") return True except Exception as e: self.logger.error(f"重新加载IMU配置失败: {e}") return False def check_hardware_connection(self) -> bool: """ 检查IMU硬件连接状态 """ try: if not self.imu_device: return False if hasattr(self.imu_device, 'connected'): return bool(getattr(self.imu_device, 'connected')) if hasattr(self.imu_device, 'ser') and getattr(self.imu_device, 'ser', None): if not self.imu_device.ser.is_open: return False try: original_timeout = self.imu_device.ser.timeout self.imu_device.ser.timeout = 0.1 self.imu_device.ser.read(1) self.imu_device.ser.timeout = original_timeout return True except Exception: return False return True except Exception as e: self.logger.debug(f"检查IMU硬件连接时出错: {e}") return False def cleanup(self): """ 清理资源 """ try: # 停止连接监控 self._cleanup_monitoring() self.disconnect() # 清理缓冲区 self.data_buffer.clear() # 重置状态 self.is_calibrated = False self.last_valid_data = None self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} super().cleanup() self.logger.info("IMU资源清理完成") except Exception as e: self.logger.error(f"清理IMU资源失败: {e}")