632 lines
23 KiB
Python
632 lines
23 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
IMU传感器管理器
|
||
负责IMU传感器的连接、校准和头部姿态数据采集
|
||
"""
|
||
|
||
import serial
|
||
import threading
|
||
import time
|
||
import json
|
||
import numpy as np
|
||
from typing import Optional, Dict, Any, List, Tuple
|
||
import logging
|
||
from collections import deque
|
||
import struct
|
||
from datetime import datetime
|
||
|
||
try:
|
||
from .base_device import BaseDevice
|
||
from .utils.socket_manager import SocketManager
|
||
from .utils.config_manager import ConfigManager
|
||
except ImportError:
|
||
from base_device import BaseDevice
|
||
from utils.socket_manager import SocketManager
|
||
from utils.config_manager import ConfigManager
|
||
|
||
# 设置日志
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class RealIMUDevice:
|
||
"""真实IMU设备,通过串口读取姿态数据"""
|
||
def __init__(self, port, baudrate):
|
||
self.port = port
|
||
self.baudrate = baudrate
|
||
self.ser = None
|
||
self.buffer = bytearray()
|
||
self.calibration_data = None
|
||
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
|
||
self.last_data = {
|
||
'roll': 0.0,
|
||
'pitch': 0.0,
|
||
'yaw': 0.0,
|
||
'temperature': 25.0
|
||
}
|
||
logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}')
|
||
self._connect()
|
||
|
||
def _connect(self):
|
||
try:
|
||
logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}')
|
||
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
|
||
if hasattr(self.ser, 'reset_input_buffer'):
|
||
try:
|
||
self.ser.reset_input_buffer()
|
||
logger.debug('已清空串口输入缓冲区')
|
||
except Exception as e:
|
||
logger.debug(f'重置串口输入缓冲区失败: {e}')
|
||
logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps')
|
||
except Exception as e:
|
||
# logger.error(f'IMU设备连接失败: {e}', exc_info=True)
|
||
self.ser = None
|
||
|
||
def set_calibration(self, calibration: Dict[str, Any]):
|
||
self.calibration_data = calibration
|
||
if 'head_pose_offset' in calibration:
|
||
self.head_pose_offset = calibration['head_pose_offset']
|
||
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
|
||
|
||
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
|
||
if not raw_data or 'head_pose' not in raw_data:
|
||
return raw_data
|
||
|
||
# 应用校准偏移
|
||
calibrated_data = raw_data.copy()
|
||
head_pose = raw_data['head_pose'].copy()
|
||
|
||
# 减去基准值(零点偏移)
|
||
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
|
||
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
|
||
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
|
||
|
||
calibrated_data['head_pose'] = head_pose
|
||
return calibrated_data
|
||
|
||
@staticmethod
|
||
def _checksum(data: bytes) -> int:
|
||
return sum(data[:-1]) & 0xFF
|
||
|
||
def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]:
|
||
if len(data) != 11:
|
||
logger.debug(f'无效数据包长度: {len(data)}')
|
||
return None
|
||
if data[0] != 0x55:
|
||
logger.debug(f'错误的包头: 0x{data[0]:02X}')
|
||
return None
|
||
if self._checksum(data) != data[-1]:
|
||
logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}')
|
||
return None
|
||
packet_type = data[1]
|
||
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
|
||
if packet_type == 0x53: # 姿态角,单位0.01°
|
||
pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
|
||
# 使用第一段代码里的比例系数
|
||
k_angle = 180.0
|
||
roll = -round(rxl / 32768.0 * k_angle,2)
|
||
pitch = -round(pitchl / 32768.0 * k_angle,2)
|
||
yaw = -round(yawl / 32768.0 * k_angle,2)
|
||
temp = temp / 100.0
|
||
self.last_data = {
|
||
'roll': roll,
|
||
'pitch': pitch,
|
||
'yaw': yaw,
|
||
'temperature': temp
|
||
}
|
||
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
|
||
return self.last_data
|
||
else:
|
||
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
|
||
return None
|
||
|
||
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
|
||
if not self.ser or not getattr(self.ser, 'is_open', False):
|
||
# logger.warning('IMU串口未连接,尝试重新连接...')
|
||
self._connect()
|
||
return {
|
||
'head_pose': {
|
||
'rotation': self.last_data['yaw'],
|
||
'tilt': self.last_data['roll'],
|
||
'pitch': self.last_data['pitch']
|
||
},
|
||
'temperature': self.last_data['temperature'],
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
try:
|
||
bytes_waiting = self.ser.in_waiting
|
||
if bytes_waiting:
|
||
# logger.debug(f'串口缓冲区待读字节: {bytes_waiting}')
|
||
chunk = self.ser.read(bytes_waiting)
|
||
# logger.debug(f'读取到字节: {len(chunk)}')
|
||
self.buffer.extend(chunk)
|
||
while len(self.buffer) >= 11:
|
||
if self.buffer[0] != 0x55:
|
||
dropped = self.buffer.pop(0)
|
||
logger.debug(f'丢弃无效字节: 0x{dropped:02X}')
|
||
continue
|
||
packet = bytes(self.buffer[:11])
|
||
parsed = self._parse_packet(packet)
|
||
del self.buffer[:11]
|
||
if parsed is not None:
|
||
raw = {
|
||
'head_pose': {
|
||
'rotation': parsed['yaw'], # rotation = roll
|
||
'tilt': parsed['roll'], # tilt = yaw
|
||
'pitch': parsed['pitch'] # pitch = pitch
|
||
},
|
||
'temperature': parsed['temperature'],
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
# logger.debug(f'映射后的头部姿态: {raw}')
|
||
return self.apply_calibration(raw) if apply_calibration else raw
|
||
raw = {
|
||
'head_pose': {
|
||
'rotation': self.last_data['yaw'],
|
||
'tilt': self.last_data['roll'],
|
||
'pitch': self.last_data['pitch']
|
||
},
|
||
'temperature': self.last_data['temperature'],
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
return self.apply_calibration(raw) if apply_calibration else raw
|
||
except Exception as e:
|
||
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
|
||
raw = {
|
||
'head_pose': {
|
||
'rotation': self.last_data['yaw'],
|
||
'tilt': self.last_data['roll'],
|
||
'pitch': self.last_data['pitch']
|
||
},
|
||
'temperature': self.last_data['temperature'],
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
return self.apply_calibration(raw) if apply_calibration else raw
|
||
|
||
def __del__(self):
|
||
try:
|
||
if self.ser and getattr(self.ser, 'is_open', False):
|
||
self.ser.close()
|
||
logger.info('IMU设备串口已关闭')
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
class MockIMUDevice:
|
||
"""模拟IMU设备"""
|
||
|
||
def __init__(self):
|
||
self.noise_level = 0.1
|
||
self.calibration_data = None # 校准数据
|
||
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 头部姿态零点偏移
|
||
|
||
def set_calibration(self, calibration: Dict[str, Any]):
|
||
"""设置校准数据"""
|
||
self.calibration_data = calibration
|
||
if 'head_pose_offset' in calibration:
|
||
self.head_pose_offset = calibration['head_pose_offset']
|
||
|
||
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""应用校准:将当前姿态减去初始偏移,得到相对姿态"""
|
||
if not raw_data or 'head_pose' not in raw_data:
|
||
return raw_data
|
||
|
||
calibrated_data = raw_data.copy()
|
||
head_pose = raw_data['head_pose'].copy()
|
||
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
|
||
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
|
||
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
|
||
calibrated_data['head_pose'] = head_pose
|
||
return calibrated_data
|
||
|
||
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
|
||
"""读取IMU数据"""
|
||
# 生成头部姿态角度数据,角度范围(-90°, +90°)
|
||
# 使用正弦波模拟自然的头部运动,添加随机噪声
|
||
import time
|
||
current_time = time.time()
|
||
|
||
# 旋转角(左旋为负,右旋为正)
|
||
rotation_angle = 30 * np.sin(current_time * 0.5) + np.random.normal(0, self.noise_level * 5)
|
||
rotation_angle = np.clip(rotation_angle, -90, 90)
|
||
|
||
# 倾斜角(左倾为负,右倾为正)
|
||
tilt_angle = 20 * np.sin(current_time * 0.3 + np.pi/4) + np.random.normal(0, self.noise_level * 5)
|
||
tilt_angle = np.clip(tilt_angle, -90, 90)
|
||
|
||
# 俯仰角(俯角为负,仰角为正)
|
||
pitch_angle = 15 * np.sin(current_time * 0.7 + np.pi/2) + np.random.normal(0, self.noise_level * 5)
|
||
pitch_angle = np.clip(pitch_angle, -90, 90)
|
||
|
||
# 生成原始数据
|
||
raw_data = {
|
||
'head_pose': {
|
||
'rotation': rotation_angle, # 旋转角:左旋(-), 右旋(+)
|
||
'tilt': tilt_angle, # 倾斜角:左倾(-), 右倾(+)
|
||
'pitch': pitch_angle # 俯仰角:俯角(-), 仰角(+)
|
||
},
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
|
||
# 应用校准并返回
|
||
return self.apply_calibration(raw_data) if apply_calibration else raw_data
|
||
|
||
|
||
class IMUManager(BaseDevice):
|
||
"""IMU传感器管理器"""
|
||
|
||
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
|
||
"""
|
||
初始化IMU管理器
|
||
|
||
Args:
|
||
socketio: SocketIO实例
|
||
config_manager: 配置管理器实例
|
||
"""
|
||
# 配置管理
|
||
self.config_manager = config_manager or ConfigManager()
|
||
config = self.config_manager.get_device_config('imu')
|
||
|
||
super().__init__("imu", config)
|
||
|
||
# 保存socketio实例
|
||
self._socketio = socketio
|
||
|
||
# 设备配置
|
||
self.port = config.get('port', 'COM7')
|
||
self.baudrate = config.get('baudrate', 9600)
|
||
self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock'
|
||
self.use_mock = config.get('use_mock', False) # 保持向后兼容
|
||
# IMU设备实例
|
||
self.imu_device = None
|
||
|
||
# 推流相关
|
||
self.imu_streaming = False
|
||
self.imu_thread = None
|
||
|
||
# 统计信息
|
||
self.data_count = 0
|
||
self.error_count = 0
|
||
|
||
# 校准相关
|
||
self.is_calibrated = False
|
||
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
|
||
|
||
# 数据缓存
|
||
self.data_buffer = deque(maxlen=100)
|
||
self.last_valid_data = None
|
||
|
||
self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}")
|
||
|
||
def initialize(self) -> bool:
|
||
"""
|
||
初始化IMU设备
|
||
|
||
Returns:
|
||
bool: 初始化是否成功
|
||
"""
|
||
try:
|
||
self.logger.info(f"正在初始化IMU设备...")
|
||
|
||
# 根据配置选择真实设备或模拟设备
|
||
# 优先使用device_type配置,如果没有则使用use_mock配置(向后兼容)
|
||
use_real_device = (self.device_type == 'real') or (not self.use_mock)
|
||
|
||
if use_real_device:
|
||
self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}")
|
||
self.imu_device = RealIMUDevice(self.port, self.baudrate)
|
||
|
||
# 检查真实设备是否连接成功
|
||
if self.imu_device.ser is None:
|
||
self.logger.error(f"IMU设备连接失败: 无法打开串口 {self.port}")
|
||
self.is_connected = False
|
||
self.imu_device = None
|
||
return False
|
||
else:
|
||
self.logger.info("使用模拟IMU设备")
|
||
self.imu_device = MockIMUDevice()
|
||
|
||
self.is_connected = True
|
||
self._device_info.update({
|
||
'port': self.port,
|
||
'baudrate': self.baudrate,
|
||
'use_mock': self.use_mock
|
||
})
|
||
|
||
self.logger.info("IMU初始化成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"IMU初始化失败: {e}")
|
||
self.is_connected = False
|
||
self.imu_device = None
|
||
return False
|
||
|
||
def _quick_calibrate_imu(self) -> Dict[str, Any]:
|
||
"""
|
||
快速IMU零点校准(以当前姿态为基准)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 校准结果
|
||
"""
|
||
try:
|
||
if not self.imu_device:
|
||
return {'status': 'error', 'error': 'IMU设备未初始化'}
|
||
|
||
self.logger.info('开始IMU快速零点校准...')
|
||
|
||
# 收集校准样本
|
||
calibration_samples = []
|
||
sample_count = 50 # 减少样本数量以加快校准速度
|
||
|
||
for i in range(sample_count):
|
||
try:
|
||
# 读取原始数据(不应用校准)
|
||
raw_data = self.imu_device.read_data(apply_calibration=False)
|
||
if raw_data and 'head_pose' in raw_data:
|
||
calibration_samples.append(raw_data['head_pose'])
|
||
time.sleep(0.02) # 20ms间隔
|
||
except Exception as e:
|
||
self.logger.warning(f'校准样本采集失败: {e}')
|
||
continue
|
||
|
||
if len(calibration_samples) < sample_count * 0.7:
|
||
return {
|
||
'status': 'error',
|
||
'error': f'校准样本不足: {len(calibration_samples)}/{sample_count}'
|
||
}
|
||
|
||
# 计算平均值作为零点偏移
|
||
rotation_sum = sum(sample['rotation'] for sample in calibration_samples)
|
||
tilt_sum = sum(sample['tilt'] for sample in calibration_samples)
|
||
pitch_sum = sum(sample['pitch'] for sample in calibration_samples)
|
||
|
||
count = len(calibration_samples)
|
||
self.head_pose_offset = {
|
||
'rotation': rotation_sum / count,
|
||
'tilt': tilt_sum / count,
|
||
'pitch': pitch_sum / count
|
||
}
|
||
|
||
# 应用校准到设备
|
||
calibration_data = {'head_pose_offset': self.head_pose_offset}
|
||
self.imu_device.set_calibration(calibration_data)
|
||
|
||
self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}')
|
||
return {
|
||
'status': 'success',
|
||
'head_pose_offset': self.head_pose_offset,
|
||
'samples_used': count
|
||
}
|
||
|
||
except Exception as e:
|
||
self.logger.error(f'IMU快速校准失败: {e}')
|
||
return {'status': 'error', 'error': str(e)}
|
||
|
||
def calibrate(self) -> bool:
|
||
"""
|
||
校准IMU传感器
|
||
|
||
Returns:
|
||
bool: 校准是否成功
|
||
"""
|
||
try:
|
||
if not self.is_connected or not self.imu_device:
|
||
if not self.initialize():
|
||
self.logger.error("IMU设备未连接")
|
||
return False
|
||
|
||
# 使用快速校准方法
|
||
result = self._quick_calibrate_imu()
|
||
|
||
if result['status'] == 'success':
|
||
self.is_calibrated = True
|
||
self.logger.info("IMU校准成功")
|
||
return True
|
||
else:
|
||
self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"IMU校准失败: {e}")
|
||
return False
|
||
|
||
|
||
|
||
def start_streaming(self) -> bool:
|
||
"""
|
||
开始IMU数据流
|
||
|
||
Args:
|
||
socketio: SocketIO实例,用于数据推送
|
||
|
||
Returns:
|
||
bool: 启动是否成功
|
||
"""
|
||
try:
|
||
if not self.is_connected or not self.imu_device:
|
||
if not self.initialize():
|
||
return False
|
||
|
||
if self.imu_streaming:
|
||
self.logger.warning("IMU数据流已在运行")
|
||
return True
|
||
|
||
# 启动前进行快速校准
|
||
if not self.is_calibrated:
|
||
self.logger.info("启动前进行快速零点校准...")
|
||
self._quick_calibrate_imu()
|
||
|
||
self.imu_streaming = True
|
||
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
|
||
self.imu_thread.start()
|
||
|
||
self.logger.info("IMU数据流启动成功")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"IMU数据流启动失败: {e}")
|
||
self.imu_streaming = False
|
||
return False
|
||
|
||
def stop_streaming(self) -> bool:
|
||
"""
|
||
停止IMU数据流
|
||
|
||
Returns:
|
||
bool: 停止是否成功
|
||
"""
|
||
try:
|
||
self.imu_streaming = False
|
||
|
||
if self.imu_thread and self.imu_thread.is_alive():
|
||
self.imu_thread.join(timeout=3.0)
|
||
|
||
self.logger.info("IMU数据流已停止")
|
||
return True
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"停止IMU数据流失败: {e}")
|
||
return False
|
||
|
||
def _imu_streaming_thread(self):
|
||
"""
|
||
IMU数据流工作线程
|
||
"""
|
||
self.logger.info("IMU数据流工作线程启动")
|
||
|
||
while self.imu_streaming:
|
||
try:
|
||
if self.imu_device:
|
||
# 读取IMU数据
|
||
data = self.imu_device.read_data(apply_calibration=True)
|
||
|
||
if data:
|
||
# 缓存数据
|
||
self.data_buffer.append(data)
|
||
self.last_valid_data = data
|
||
|
||
# 发送数据到前端
|
||
if self._socketio:
|
||
self._socketio.emit('imu_data', data, namespace='/devices')
|
||
|
||
# 更新统计
|
||
self.data_count += 1
|
||
else:
|
||
self.error_count += 1
|
||
|
||
time.sleep(0.02) # 50Hz采样率
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"IMU数据流处理异常: {e}")
|
||
self.error_count += 1
|
||
time.sleep(0.1)
|
||
|
||
self.logger.info("IMU数据流工作线程结束")
|
||
|
||
|
||
|
||
def get_status(self) -> Dict[str, Any]:
|
||
"""
|
||
获取设备状态
|
||
|
||
Returns:
|
||
Dict[str, Any]: 设备状态信息
|
||
"""
|
||
status = super().get_status()
|
||
status.update({
|
||
'port': self.port,
|
||
'baudrate': self.baudrate,
|
||
'is_streaming': self.imu_streaming,
|
||
'is_calibrated': self.is_calibrated,
|
||
'data_count': self.data_count,
|
||
'error_count': self.error_count,
|
||
'buffer_size': len(self.data_buffer),
|
||
'has_data': self.last_valid_data is not None,
|
||
'head_pose_offset': self.head_pose_offset,
|
||
'device_type': 'mock' if self.use_mock else 'real'
|
||
})
|
||
return status
|
||
|
||
def get_latest_data(self) -> Optional[Dict[str, float]]:
|
||
"""
|
||
获取最新的IMU数据
|
||
|
||
Returns:
|
||
Optional[Dict[str, float]]: 最新数据,无数据返回None
|
||
"""
|
||
return self.last_valid_data.copy() if self.last_valid_data else None
|
||
|
||
def collect_head_pose_data(self, duration: int = 10) -> List[Dict[str, Any]]:
|
||
"""
|
||
收集头部姿态数据
|
||
|
||
Args:
|
||
duration: 收集时长(秒)
|
||
|
||
Returns:
|
||
List[Dict[str, Any]]: 收集到的数据列表
|
||
"""
|
||
collected_data = []
|
||
|
||
if not self.is_connected or not self.imu_device:
|
||
self.logger.error("IMU设备未连接")
|
||
return collected_data
|
||
|
||
self.logger.info(f"开始收集头部姿态数据,时长: {duration}秒")
|
||
|
||
start_time = time.time()
|
||
while time.time() - start_time < duration:
|
||
try:
|
||
data = self.imu_device.read_data(apply_calibration=True)
|
||
if data:
|
||
# 添加时间戳
|
||
data['timestamp'] = time.time()
|
||
collected_data.append(data)
|
||
|
||
time.sleep(0.02) # 50Hz采样率
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"数据收集异常: {e}")
|
||
break
|
||
|
||
self.logger.info(f"头部姿态数据收集完成,共收集 {len(collected_data)} 个样本")
|
||
return collected_data
|
||
|
||
def disconnect(self):
|
||
"""
|
||
断开IMU设备连接
|
||
"""
|
||
try:
|
||
self.stop_streaming()
|
||
|
||
if self.imu_device:
|
||
self.imu_device = None
|
||
|
||
self.is_connected = False
|
||
self.logger.info("IMU设备已断开连接")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"断开IMU设备连接失败: {e}")
|
||
|
||
def cleanup(self):
|
||
"""
|
||
清理资源
|
||
"""
|
||
try:
|
||
self.disconnect()
|
||
|
||
# 清理缓冲区
|
||
self.data_buffer.clear()
|
||
|
||
# 重置状态
|
||
self.is_calibrated = False
|
||
self.last_valid_data = None
|
||
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
|
||
|
||
super().cleanup()
|
||
self.logger.info("IMU资源清理完成")
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"清理IMU资源失败: {e}") |