IMU姿态数据提交

This commit is contained in:
zhaozilong12 2025-08-13 13:58:15 +08:00
parent 08718766d4
commit 904ef5bd29
6 changed files with 38266 additions and 114 deletions

File diff suppressed because it is too large Load Diff

View File

@ -680,14 +680,13 @@ class DeviceManager:
logger.error('IMU设备未初始化')
return False
# 注释掉自动零点校准功能,直接发送原始数据
# # 在启动推流前进行快速零点校准(自动以当前姿态为基准)
# logger.info('正在进行IMU零点校准...')
# calibration_result = self._quick_calibrate_imu()
# if calibration_result.get('status') == 'success':
# logger.info(f'IMU零点校准完成: {calibration_result["head_pose_offset"]}')
# else:
# logger.warning(f'IMU零点校准失败将使用默认零偏移: {calibration_result.get("error", "未知错误")}')
# 在启动推流前进行快速零点校准(自动以当前姿态为基准)
logger.info('正在进行IMU零点校准...')
calibration_result = self._quick_calibrate_imu()
if calibration_result.get('status') == 'success':
logger.info(f'IMU零点校准完成: {calibration_result["head_pose_offset"]}')
else:
logger.warning(f'IMU零点校准失败将使用默认零偏移: {calibration_result.get("error", "未知错误")}')
self.imu_streaming = True
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
@ -1106,12 +1105,27 @@ class DeviceManager:
if imu_data and 'head_pose' in imu_data:
# 直接使用设备提供的头部姿态数据,减少数据包装
head_pose = imu_data['head_pose']
logger.warning(f'推送数据{head_pose}')
# logger.warning(f'推送数据{head_pose}')
# 优化:直接发送最精简的数据格式,避免重复时间戳
rotation = head_pose.get('rotation')
tilt = head_pose.get('tilt')
pitch = head_pose.get('pitch')
try:
rotation = round(float(rotation), 2) if rotation is not None else rotation
except Exception:
pass
try:
tilt = round(float(tilt), 2) if tilt is not None else tilt
except Exception:
pass
try:
pitch = round(float(pitch), 2) if pitch is not None else pitch
except Exception:
pass
self.socketio.emit('imu_data', {
'rotation': head_pose.get('rotation'), # 旋转角:左旋(-), 右旋(+)
'tilt': head_pose.get('tilt'), # 倾斜角:左倾(-), 右倾(+)
'pitch': head_pose.get('pitch'), # 俯仰角:俯角(-), 仰角(+)
'rotation': rotation, # 旋转角:左旋(-), 右旋(+)
'tilt': tilt, # 倾斜角:左倾(-), 右倾(+)
'pitch': pitch, # 俯仰角:俯角(-), 仰角(+)
})
# 优化提高数据发送频率到30Hz降低延时
@ -1900,8 +1914,8 @@ class DeviceManager:
for key in expired_keys:
del self.frame_cache[frame_type][key]
if expired_keys:
logger.debug(f'清理了 {len(expired_keys)} 个过期帧: {frame_type}')
# if expired_keys:
# logger.debug(f'清理了 {len(expired_keys)} 个过期帧: {frame_type}')
except Exception as e:
logger.error(f'清理过期帧失败: {e}')
@ -1946,8 +1960,20 @@ class RealIMUDevice:
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
# 禁用校准应用:直接返回原始数据
return raw_data
if not raw_data or 'head_pose' not in raw_data:
return raw_data
# 应用校准偏移
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
# 减去基准值(零点偏移)
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
@staticmethod
def _checksum(data: bytes) -> int:
@ -1966,12 +1992,12 @@ class RealIMUDevice:
packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01°
rxl, pitchl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = rxl / 32768.0 * k_angle
pitch = pitchl / 32768.0 * k_angle
yaw = yawl / 32768.0 * k_angle
roll = -round(rxl / 32768.0 * k_angle,2)
pitch = -round(pitchl / 32768.0 * k_angle,2)
yaw = -round(yawl / 32768.0 * k_angle,2)
temp = temp / 100.0
self.last_data = {
'roll': roll,
@ -1991,8 +2017,8 @@ class RealIMUDevice:
self._connect()
return {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
@ -2016,8 +2042,8 @@ class RealIMUDevice:
if parsed is not None:
raw = {
'head_pose': {
'rotation': parsed['roll'], # rotation = roll
'tilt': parsed['yaw'], # tilt = yaw
'rotation': parsed['yaw'], # rotation = roll
'tilt': parsed['roll'], # tilt = yaw
'pitch': parsed['pitch'] # pitch = pitch
},
'temperature': parsed['temperature'],
@ -2027,8 +2053,8 @@ class RealIMUDevice:
return self.apply_calibration(raw)
raw = {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
@ -2039,8 +2065,8 @@ class RealIMUDevice:
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
raw = {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
@ -2072,8 +2098,16 @@ class MockIMUDevice:
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对姿态"""
# 禁用校准应用:直接返回原始数据
return raw_data
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
def read_data(self) -> Dict[str, Any]:
"""读取IMU数据"""
@ -2504,8 +2538,8 @@ class VideoStreamManager:
if self.device_manager:
self.device_manager._save_frame_to_cache(cropped_frame, 'camera')
# 每1000帧记录一次缓存保存状态
if frame_count % 1000 == 0:
logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存")
# if frame_count % 1000 == 0:
# logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存")
else:
logger.warning("VideoStreamManager未关联DeviceManager无法保存帧到缓存")

View File

@ -1,81 +0,0 @@
import serial
import time
def checksum(data):
return sum(data[:-1]) & 0xFF
def parse_packet(data):
if len(data) != 11:
return None
if data[0] != 0x55:
return None
if checksum(data) != data[-1]:
print("校验失败")
return None
packet_type = data[1]
# 将后8字节分成4个16位有符号整数小端序
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x51: # 加速度单位0.001g
ax, ay, az, temp = vals
ax /= 1000
ay /= 1000
az /= 1000
temp = temp / 100 # 温度单位摄氏度
# return f"加速度 (g): x={ax:.3f}, y={ay:.3f}, z={az:.3f}, 温度={temp:.2f}℃"
elif packet_type == 0x52: # 角速度单位0.01°/s
wx, wy, wz, temp = vals
wx /= 100
wy /= 100
wz /= 100
temp = temp / 100
# return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃"
elif packet_type == 0x53: # 姿态角单位0.01°
roll, pitch, yaw, temp = vals
# roll /= 100
# pitch /= 100
# yaw /= 100
# temp = temp / 100
return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}"
elif packet_type == 0x54: # 磁力计单位uT
mx, my, mz, temp = vals
temp = temp / 100
#return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃"
elif packet_type == 0x56: # 气压单位Pa
p1, p2, p3, temp = vals
pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100 # 简单合成大部分IMU气压是3字节这里简单示范
temp = temp / 100
# return f"气压 (Pa): pressure={pressure:.2f}, 温度={temp:.2f}℃"
else:
return f"未知包类型: {packet_type}"
def read_imu(port='COM6', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray()
try:
while True:
bytes_waiting = ser.in_waiting
if bytes_waiting:
data = ser.read(bytes_waiting)
buffer.extend(data)
# 解析buffer中所有完整包
while len(buffer) >= 11:
if buffer[0] != 0x55:
buffer.pop(0)
continue
packet = buffer[:11]
result = parse_packet(packet)
if result:
print(result)
buffer = buffer[11:]
time.sleep(0.01)
except KeyboardInterrupt:
print("程序终止")
finally:
ser.close()
if __name__ == "__main__":
read_imu(port='COM8', baudrate=9600)

View File

@ -19,7 +19,7 @@ camera_index = 0
camera_width = 640
camera_height = 480
camera_fps = 30
imu_port = COM8
imu_port = COM9
imu_baudrate = 9600
pressure_port = COM4