BodyBalanceEvaluation/backend/devices/imu_manager.py

865 lines
32 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
IMU传感器管理器
负责IMU传感器的连接校准和头部姿态数据采集
"""
import threading
import time
2025-12-02 08:53:04 +08:00
from typing import Optional, Dict, Any
import logging
from collections import deque
from datetime import datetime
import asyncio
try:
from .base_device import BaseDevice
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.config_manager import ConfigManager
# 设置日志
logger = logging.getLogger(__name__)
class BleIMUDevice:
2026-01-12 15:21:44 +08:00
"""蓝牙IMU设备WitMotion WT9011DCL-BT50基于 device_model.py 官方接口"""
def __init__(self, mac_address: str, ble_name: str = ""):
self.mac_address = mac_address
self.ble_name = ble_name
self.loop = None
self.loop_thread = None
self.running = False
self._lock = threading.Lock()
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
2026-01-12 15:21:44 +08:00
'temperature': 25.0,
}
self._connected = False
2026-01-12 15:21:44 +08:00
self._device_model = None
self._open_task = None
self._main_task = None
self._last_update_ts = None
2026-01-12 15:21:44 +08:00
try:
from . import device_model as wit_device_model
except Exception:
import device_model as wit_device_model
self._wit_device_model = wit_device_model
logger.info(f"BLE IMU实例创建: mac={self.mac_address}")
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
# 与串口IMU保持一致对 rotation 做归一化到 [-180, 180)
angle = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['rotation'] = round(((angle + 180) % 360) - 180,1)
head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'],1)
head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'],1)
calibrated_data['head_pose'] = head_pose
return calibrated_data
def start(self):
if self.running:
return
self.running = True
self.loop_thread = threading.Thread(target=self._run_loop, daemon=True)
self.loop_thread.start()
def stop(self):
self.running = False
try:
if self.loop:
try:
if self._main_task is not None and not self._main_task.done():
self.loop.call_soon_threadsafe(self._main_task.cancel)
except Exception:
pass
try:
fut = asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop)
fut.result(timeout=5.0)
except Exception:
pass
except Exception:
pass
try:
if self.loop_thread and self.loop_thread.is_alive():
self.loop_thread.join(timeout=6.0)
except Exception:
pass
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
with self._lock:
raw = {
'head_pose': {
'rotation': self.last_data['yaw'], # rotation 对应航向角
'tilt': self.last_data['pitch'], # tilt 对应横滚
'pitch': self.last_data['roll'] # pitch 对应俯仰
},
'temperature': self.last_data.get('temperature', 25.0),
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
def _run_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self._main_task = self.loop.create_task(self._connect_and_listen())
self.loop.run_until_complete(self._main_task)
2026-01-12 15:21:44 +08:00
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f'BLE IMU事件循环异常: {e}', exc_info=True)
finally:
try:
try:
pending = asyncio.all_tasks(self.loop)
for t in pending:
t.cancel()
if pending:
self.loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception:
pass
if not self.loop.is_closed():
self.loop.stop()
except Exception:
pass
self._main_task = None
self.loop = None
2026-01-12 15:21:44 +08:00
def _on_device_update(self, dm):
try:
roll = dm.get("AngX")
pitch = dm.get("AngY")
yaw = dm.get("AngZ")
if roll is None or pitch is None or yaw is None:
return
with self._lock:
self.last_data['roll'] = float(roll)
self.last_data['pitch'] = float(pitch)
self.last_data['yaw'] = float(yaw)
self._last_update_ts = time.time()
self._connected = True
2026-01-12 15:21:44 +08:00
except Exception:
pass
async def _disconnect(self):
try:
2026-01-12 15:21:44 +08:00
if self._device_model is not None:
try:
2026-01-12 15:21:44 +08:00
self._device_model.closeDevice()
except Exception:
pass
try:
client = getattr(self._device_model, "client", None)
if client is not None and getattr(client, "is_connected", False):
await client.disconnect()
except Exception:
pass
2026-01-12 15:21:44 +08:00
if self._open_task is not None and not self._open_task.done():
self._open_task.cancel()
try:
await self._open_task
except asyncio.CancelledError:
pass
except Exception:
pass
finally:
self._open_task = None
self._device_model = None
self._last_update_ts = None
async def _connect_and_listen(self):
try:
2026-01-12 15:21:44 +08:00
from bleak import BleakScanner
except Exception as e:
logger.error(f"未安装bleak或导入失败: {e}")
self.running = False
return
async def find_device() -> Optional[Any]:
scan_timeout_s = 30.0
if self.ble_name:
find_by_name = getattr(BleakScanner, "find_device_by_name", None)
if callable(find_by_name):
try:
device = await find_by_name(self.ble_name, timeout=scan_timeout_s)
if device is not None:
if self.mac_address and (getattr(device, "address", "") or "").lower() != self.mac_address.lower():
return None
return device
except Exception:
pass
try:
found = await BleakScanner.discover(timeout=scan_timeout_s)
except Exception:
found = []
if self.ble_name:
for d in found:
if (getattr(d, "name", None) or "") != self.ble_name:
continue
if self.mac_address and (getattr(d, "address", "") or "").lower() != self.mac_address.lower():
continue
return d
if self.mac_address:
target = self.mac_address.lower()
for d in found:
addr = getattr(d, "address", "") or ""
if addr.lower() == target:
return d
candidates = [d for d in found if (getattr(d, "name", "") or "").startswith("WT")]
if len(candidates) == 1:
return candidates[0]
return None
while self.running:
try:
attempt_ts = time.perf_counter()
logger.info(f"BLE IMU开始扫描并连接: name={self.ble_name}, mac={self.mac_address}")
device = await find_device()
2026-01-12 15:21:44 +08:00
if device is None:
logger.info(f"BLE IMU扫描未发现设备 (耗时: {(time.perf_counter()-attempt_ts)*1000:.1f}ms)")
await asyncio.sleep(2.0)
continue
device_addr = getattr(device, "address", None)
device_name = getattr(device, "name", None)
logger.info(f"BLE IMU发现设备 (耗时: {(time.perf_counter()-attempt_ts)*1000:.1f}ms, address={device_addr}, name={device_name})")
self._connected = False
self._last_update_ts = None
2026-01-12 15:21:44 +08:00
self._device_model = self._wit_device_model.DeviceModel("WitMotionBle5.0", device, self._on_device_update)
self._open_task = asyncio.create_task(self._device_model.openDevice())
ready = False
ready_timeout_s = 20.0
deadline = time.time() + ready_timeout_s
while self.running and time.time() < deadline:
if self._open_task is not None and self._open_task.done():
try:
exc = self._open_task.exception()
except Exception:
exc = None
if exc:
logger.warning(f"BLE IMU打开失败: {type(exc).__name__}: {repr(exc)}")
2026-01-12 15:21:44 +08:00
break
if self.has_received_data:
ready = True
2026-01-12 15:21:44 +08:00
break
await asyncio.sleep(0.1)
if not ready:
logger.warning(f"BLE IMU未获取到姿态数据 (耗时: {(time.perf_counter()-attempt_ts)*1000:.1f}ms)")
2026-01-12 15:21:44 +08:00
await self._disconnect()
self._connected = False
await asyncio.sleep(2.0)
continue
logger.info(f"BLE IMU连接并开始产出数据 (耗时: {(time.perf_counter()-attempt_ts)*1000:.1f}ms)")
2026-01-12 15:21:44 +08:00
while self.running and self._open_task is not None and not self._open_task.done():
await asyncio.sleep(1.0)
2026-01-12 15:21:44 +08:00
await self._disconnect()
self._connected = False
2026-01-12 15:21:44 +08:00
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"BLE IMU连接/监听失败: {e}", exc_info=True)
self._connected = False
await asyncio.sleep(2.0)
@property
def connected(self) -> bool:
return self._connected
@property
def has_received_data(self) -> bool:
return self._last_update_ts is not None
@property
def last_update_time(self):
return self._last_update_ts
2025-12-02 08:53:04 +08:00
class MockIMUDevice:
def __init__(self):
self.running = False
self.thread = None
self._lock = threading.Lock()
self._connected = False
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
self._phase = 0.0
self._last_update_ts = None
2025-12-02 08:53:04 +08:00
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
angle = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['rotation'] = round(((angle + 180) % 360) - 180, 1)
head_pose['tilt'] = round(head_pose['tilt'] - self.head_pose_offset['tilt'], 1)
head_pose['pitch'] = round(head_pose['pitch'] - self.head_pose_offset['pitch'], 1)
calibrated_data['head_pose'] = head_pose
return calibrated_data
def start(self):
if self.running:
return
self.running = True
self._connected = True
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def stop(self):
self.running = False
try:
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
except Exception:
pass
self._connected = False
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
with self._lock:
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['pitch'],
'pitch': self.last_data['roll']
},
'temperature': self.last_data.get('temperature', 25.0),
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
def _run_loop(self):
# 模拟IMU设备的后台数据生成线程约60Hz输出平滑变化的姿态与温度
import math
try:
while self.running:
# 相位累加,用于驱动正弦/余弦波形
self._phase += 0.05
# 航向角yaw左右旋转幅度约±30°
yaw = math.sin(self._phase * 0.6) * 30.0
# yaw = 0
# 俯仰角pitch上下点头幅度约±10°
pitch = math.sin(self._phase) * 5.0
# pitch = 0
# 横滚角roll左右侧倾幅度约±8°
roll = math.cos(self._phase * 0.8) * 5.0
# roll = 0
# 写入最新数据使用1位或2位小数以模拟设备精度
with self._lock:
self.last_data['yaw'] = round(yaw, 1)
self.last_data['pitch'] = round(pitch, 1)
self.last_data['roll'] = round(roll, 1)
# 温度模拟以25℃为基准叠加±0.5℃的轻微波动
self.last_data['temperature'] = round(25.0 + math.sin(self._phase * 0.2) * 0.5, 2)
self._last_update_ts = time.time()
2025-12-02 08:53:04 +08:00
# 控制输出频率为约60Hz
time.sleep(1.0 / 30.0)
except Exception:
# 忽略模拟线程异常,避免影响主流程
pass
@property
def connected(self) -> bool:
return self._connected
@property
def has_received_data(self) -> bool:
return self._last_update_ts is not None
@property
def last_update_time(self):
return self._last_update_ts
class IMUManager(BaseDevice):
"""IMU传感器管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化IMU管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
config = self.config_manager.get_device_config('imu')
super().__init__("imu", config)
# 保存socketio实例
self._socketio = socketio
# 设备配置
2026-01-12 15:21:44 +08:00
self.use_mock = bool(config.get('use_mock', False))
self.mac_address = config.get('mac_address', '')
self.ble_name = config.get('ble_name', '')
# IMU设备实例
self.imu_device = None
self._init_abort = threading.Event()
# 推流相关
self.imu_thread = None
# 统计信息
self.data_count = 0
self.error_count = 0
# 校准相关
self.is_calibrated = False
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
# 数据缓存
self.data_buffer = deque(maxlen=100)
self.last_valid_data = None
self.logger.info(f"IMU管理器初始化完成 - use_mock: {self.use_mock}, BLE_NAME: {self.ble_name}, MAC: {self.mac_address}")
def initialize(self) -> bool:
"""
初始化IMU设备
Returns:
bool: 初始化是否成功
"""
self._initializing = True
self._init_abort.clear()
try:
start_ts = time.perf_counter()
self.logger.info(f"正在初始化IMU设备...")
# 使用构造函数中已加载的配置,避免并发读取配置文件
self.logger.info(f"使用已加载配置: use_mock={self.use_mock}, name={self.ble_name}, mac={self.mac_address}")
# 根据配置选择设备类型
2026-01-12 15:21:44 +08:00
if not self.use_mock:
if not self.mac_address and not self.ble_name:
self.logger.error("IMU BLE设备未配置蓝牙名称或MAC地址")
self.is_connected = False
return False
self.logger.info(f"使用蓝牙IMU设备 - NAME: {self.ble_name}, MAC: {self.mac_address}")
try:
if self.imu_device and hasattr(self.imu_device, 'stop'):
self.imu_device.stop()
except Exception:
pass
self.imu_device = BleIMUDevice(self.mac_address, self.ble_name)
self.imu_device.start()
connect_timeout_s = float(self.config.get('connect_timeout', 40.0))
deadline = time.time() + max(0.1, connect_timeout_s)
while time.time() < deadline:
if self._init_abort.is_set():
self.logger.warning("IMU初始化被中断")
return False
if bool(getattr(self.imu_device, 'connected', False)):
break
time.sleep(0.1)
connected = bool(getattr(self.imu_device, 'connected', False))
self._start_connection_monitor()
self.set_connected(connected)
if not connected:
self.logger.error(f"IMU蓝牙连接超时等待 {connect_timeout_s:.1f}s")
try:
self.imu_device.stop()
except Exception:
pass
self.imu_device = None
self.set_connected(False)
return False
else:
self.logger.info("使用模拟IMU设备")
self.imu_device = MockIMUDevice()
2025-12-02 08:53:04 +08:00
self.imu_device.start()
self._start_connection_monitor()
self.set_connected(True)
self._device_info.update({
'mac_address': self.mac_address,
})
self.logger.info(f"IMU初始化完成耗时: {(time.perf_counter() - start_ts)*1000:.1f}ms当前连接状态: {self.is_connected}")
return True
except Exception as e:
self.logger.error(f"IMU初始化失败: {e}")
self.is_connected = False
self.imu_device = None
return False
finally:
self._initializing = False
def _quick_calibrate_imu(self) -> Dict[str, Any]:
"""
快速IMU零点校准以当前姿态为基准
Returns:
Dict[str, Any]: 校准结果
"""
try:
if not self.imu_device:
return {'status': 'error', 'error': 'IMU设备未初始化'}
self.logger.info('开始IMU快速零点校准...')
# 直接读取一次原始数据作为校准偏移量
raw_data = self.imu_device.read_data(apply_calibration=False)
if not raw_data or 'head_pose' not in raw_data:
return {'status': 'error', 'error': '无法读取IMU原始数据'}
# 使用当前姿态作为零点偏移
self.head_pose_offset = {
'rotation': raw_data['head_pose']['rotation'],
'tilt': raw_data['head_pose']['tilt'],
'pitch': raw_data['head_pose']['pitch']
}
# 应用校准到设备
calibration_data = {'head_pose_offset': self.head_pose_offset}
self.imu_device.set_calibration(calibration_data)
self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}')
return {
'status': 'success',
'head_pose_offset': self.head_pose_offset
}
except Exception as e:
self.logger.error(f'IMU快速校准失败: {e}')
return {'status': 'error', 'error': str(e)}
def calibrate(self) -> bool:
"""
校准IMU传感器
Returns:
bool: 校准是否成功
"""
try:
if not self.is_connected or not self.imu_device:
if not self.initialize():
self.logger.error("IMU设备未连接")
return False
# 使用快速校准方法
result = self._quick_calibrate_imu()
if result['status'] == 'success':
self.is_calibrated = True
self.logger.info("IMU校准成功")
return True
else:
self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"IMU校准失败: {e}")
return False
def start_streaming(self) -> bool:
"""
开始IMU数据流
Args:
socketio: SocketIO实例用于数据推送
Returns:
bool: 启动是否成功
"""
try:
if self.is_streaming:
self.logger.warning("IMU数据流已在运行")
return True
if not self.is_connected or not self.imu_device:
if not self.initialize():
return False
if not self.is_connected:
self.logger.error("IMU设备未连接")
return False
first_data_timeout_s = float(self.config.get('first_data_timeout', 30.0))
deadline = time.time() + max(0.1, first_data_timeout_s)
while time.time() < deadline:
if not self.imu_device:
break
if hasattr(self.imu_device, 'connected') and not bool(getattr(self.imu_device, 'connected')):
break
if bool(getattr(self.imu_device, 'has_received_data', False)):
break
time.sleep(0.05)
if not self.imu_device or not bool(getattr(self.imu_device, 'has_received_data', False)):
self.logger.error(f"IMU未获取到有效数据等待 {first_data_timeout_s:.1f}s")
return False
# 启动前进行快速校准
if not self.is_calibrated:
self.logger.info("启动前进行快速零点校准...")
self._quick_calibrate_imu()
self.is_streaming = True
self.update_heartbeat()
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
self.imu_thread.start()
self.logger.info("IMU数据流启动成功")
return True
except Exception as e:
self.logger.error(f"IMU数据流启动失败: {e}")
self.is_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止IMU数据流
Returns:
bool: 停止是否成功
"""
try:
self.is_streaming = False
if self.imu_thread and self.imu_thread.is_alive():
self.imu_thread.join(timeout=3.0)
# 停止BLE后台任务如果是BLE设备
try:
if isinstance(self.imu_device, BleIMUDevice):
self.imu_device.stop()
except Exception:
pass
self.logger.info("IMU数据流已停止")
return True
except Exception as e:
self.logger.error(f"停止IMU数据流失败: {e}")
return False
def _imu_streaming_thread(self):
"""
IMU数据流工作线程
"""
self.logger.info("IMU数据流工作线程启动")
while self.is_streaming:
try:
if self.imu_device:
# 读取IMU数据
data = self.imu_device.read_data(apply_calibration=True)
if data and isinstance(data, dict) and data.get('head_pose') is not None:
# 发送数据到前端
if self._socketio:
2025-08-17 16:42:05 +08:00
self._socketio.emit('imu_data', data, namespace='/devices')
# 更新统计
self.data_count += 1
self.update_heartbeat()
self.last_valid_data = data
try:
self.data_buffer.append(data)
except Exception:
pass
else:
self.error_count += 1
time.sleep(0.02) # 50Hz采样率
except Exception as e:
self.logger.error(f"IMU数据流处理异常: {e}")
self.error_count += 1
time.sleep(0.1)
self.logger.info("IMU数据流工作线程结束")
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
return {
'device_type': 'mock' if self.use_mock else 'ble',
'mac_address': self.mac_address,
'is_connected': self.is_connected,
'is_streaming': self.is_streaming,
'is_calibrated': self.is_calibrated,
'data_count': self.data_count,
'error_count': self.error_count,
'buffer_size': len(self.data_buffer),
'has_data': self.last_valid_data is not None,
'head_pose_offset': self.head_pose_offset,
'device_info': self.get_device_info()
}
def get_latest_data(self) -> Optional[Dict[str, float]]:
"""
获取最新的IMU数据
Returns:
Optional[Dict[str, float]]: 最新数据无数据返回None
"""
return self.last_valid_data.copy() if self.last_valid_data else None
def disconnect(self):
"""
断开IMU设备连接
"""
try:
self._init_abort.set()
self.stop_streaming()
if self.imu_device:
try:
if hasattr(self.imu_device, 'stop'):
self.imu_device.stop()
except Exception:
pass
self.imu_device = None
self.is_connected = False
self.logger.info("IMU设备已断开连接")
except Exception as e:
self.logger.error(f"断开IMU设备连接失败: {e}")
def reload_config(self) -> bool:
"""
重新加载设备配置
Returns:
bool: 重新加载是否成功
"""
try:
self.logger.info("正在重新加载IMU配置...")
# 获取最新配置
config = self.config_manager.get_device_config('imu')
# 更新配置属性
2026-01-12 15:21:44 +08:00
self.use_mock = bool(config.get('use_mock', False))
self.mac_address = config.get('mac_address', '')
# 更新数据缓存队列大小
buffer_size = config.get('buffer_size', 100)
if buffer_size != self.data_buffer.maxlen:
# 保存当前数据
current_data = list(self.data_buffer)
# 创建新缓冲区
self.data_buffer = deque(maxlen=buffer_size)
# 恢复数据(保留最新的数据)
for data in current_data[-buffer_size:]:
self.data_buffer.append(data)
2026-01-12 15:21:44 +08:00
self.logger.info(f"IMU配置重新加载成功 - use_mock: {self.use_mock}, MAC: {self.mac_address}")
return True
except Exception as e:
self.logger.error(f"重新加载IMU配置失败: {e}")
return False
2025-09-10 09:13:21 +08:00
def check_hardware_connection(self) -> bool:
"""
检查IMU硬件连接状态
"""
try:
if not self.imu_device:
return False
2026-01-12 15:21:44 +08:00
if hasattr(self.imu_device, 'connected'):
connected = bool(getattr(self.imu_device, 'connected'))
if connected:
last_ts = getattr(self.imu_device, 'last_update_time', None)
if last_ts:
try:
if time.time() - float(last_ts) <= float(self._connection_timeout):
self.update_heartbeat()
except Exception:
self.update_heartbeat()
else:
self.update_heartbeat()
return connected
2026-01-12 15:21:44 +08:00
if hasattr(self.imu_device, 'ser') and getattr(self.imu_device, 'ser', None):
if not self.imu_device.ser.is_open:
2025-09-10 09:13:21 +08:00
return False
2026-01-12 15:21:44 +08:00
try:
original_timeout = self.imu_device.ser.timeout
self.imu_device.ser.timeout = 0.1
self.imu_device.ser.read(1)
self.imu_device.ser.timeout = original_timeout
self.update_heartbeat()
2026-01-12 15:21:44 +08:00
return True
except Exception:
return False
2025-09-10 09:13:21 +08:00
return True
except Exception as e:
self.logger.debug(f"检查IMU硬件连接时出错: {e}")
return False
def cleanup(self):
"""
清理资源
"""
try:
self._init_abort.set()
2025-09-10 09:13:21 +08:00
# 停止连接监控
self._cleanup_monitoring()
self.disconnect()
# 清理缓冲区
self.data_buffer.clear()
# 重置状态
self.is_calibrated = False
self.last_valid_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
super().cleanup()
self.logger.info("IMU资源清理完成")
except Exception as e:
2025-12-02 08:53:04 +08:00
self.logger.error(f"清理IMU资源失败: {e}")