前端提交

This commit is contained in:
zhaozilong12 2025-08-13 09:43:11 +08:00
parent a93a7fa712
commit 0abe79ba1c
6 changed files with 10047 additions and 47 deletions

File diff suppressed because it is too large Load Diff

View File

@ -408,41 +408,60 @@ class DeviceManager:
return {'status': 'failed', 'error': str(e)} return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]: def _calibrate_imu(self) -> Dict[str, Any]:
"""校准IMU传感器""" """标准校准:采样较多帧,计算稳定零点偏移"""
if not self.imu_device: if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'} return {'status': 'failed', 'error': 'IMU设备未连接'}
try: try:
# 收集静态数据进行零点校准
samples = [] samples = []
for _ in range(100): for _ in range(100):
data = self.imu_device.read_data() data = self.imu_device.read_data()
if data and 'head_pose' in data: if data and 'head_pose' in data:
samples.append(data) samples.append(data['head_pose'])
time.sleep(0.01) time.sleep(0.01)
if not samples: if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行校准'} return {'status': 'failed', 'error': '无法获取IMU数据进行校准'}
# 计算头部姿态零点偏移(正立状态为标准零位)
head_pose_offset = { head_pose_offset = {
'rotation': np.mean([s['head_pose']['rotation'] for s in samples]), 'rotation': float(np.mean([s['rotation'] for s in samples])),
'tilt': np.mean([s['head_pose']['tilt'] for s in samples]), 'tilt': float(np.mean([s['tilt'] for s in samples])),
'pitch': np.mean([s['head_pose']['pitch'] for s in samples]) 'pitch': float(np.mean([s['pitch'] for s in samples]))
} }
calibration = { calibration = {
'status': 'success', 'status': 'success',
'head_pose_offset': head_pose_offset, # 头部姿态零点偏移 'head_pose_offset': head_pose_offset,
'timestamp': datetime.now().isoformat()
}
if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration)
return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _quick_calibrate_imu(self) -> Dict[str, Any]:
"""快速校准:采样少量帧,以当前姿态为零点(用于每次推流启动)"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
samples = []
for _ in range(10): # 少量采样,加快启动
data = self.imu_device.read_data()
if data and 'head_pose' in data:
samples.append(data['head_pose'])
time.sleep(0.01)
if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行快速校准'}
head_pose_offset = {
'rotation': float(np.median([s['rotation'] for s in samples])),
'tilt': float(np.median([s['tilt'] for s in samples])),
'pitch': float(np.median([s['pitch'] for s in samples]))
}
calibration = {
'status': 'success',
'head_pose_offset': head_pose_offset,
'timestamp': datetime.now().isoformat() 'timestamp': datetime.now().isoformat()
} }
# 保存校准数据到设备实例
if hasattr(self.imu_device, 'set_calibration'): if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration) self.imu_device.set_calibration(calibration)
return calibration return calibration
except Exception as e: except Exception as e:
return {'status': 'failed', 'error': str(e)} return {'status': 'failed', 'error': str(e)}
@ -661,6 +680,15 @@ class DeviceManager:
logger.error('IMU设备未初始化') logger.error('IMU设备未初始化')
return False return False
# 注释掉自动零点校准功能,直接发送原始数据
# # 在启动推流前进行快速零点校准(自动以当前姿态为基准)
# logger.info('正在进行IMU零点校准...')
# calibration_result = self._quick_calibrate_imu()
# if calibration_result.get('status') == 'success':
# logger.info(f'IMU零点校准完成: {calibration_result["head_pose_offset"]}')
# else:
# logger.warning(f'IMU零点校准失败将使用默认零偏移: {calibration_result.get("error", "未知错误")}')
self.imu_streaming = True self.imu_streaming = True
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
self.imu_thread.start() self.imu_thread.start()
@ -1078,7 +1106,7 @@ class DeviceManager:
if imu_data and 'head_pose' in imu_data: if imu_data and 'head_pose' in imu_data:
# 直接使用设备提供的头部姿态数据,减少数据包装 # 直接使用设备提供的头部姿态数据,减少数据包装
head_pose = imu_data['head_pose'] head_pose = imu_data['head_pose']
logger.warning(f'推送数据{head_pose}')
# 优化:直接发送最精简的数据格式,避免重复时间戳 # 优化:直接发送最精简的数据格式,避免重复时间戳
self.socketio.emit('imu_data', { self.socketio.emit('imu_data', {
'rotation': head_pose.get('rotation'), # 旋转角:左旋(-), 右旋(+) 'rotation': head_pose.get('rotation'), # 旋转角:左旋(-), 右旋(+)
@ -1917,14 +1945,8 @@ class RealIMUDevice:
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}') logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.calibration_data: """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
return raw_data # 禁用校准应用:直接返回原始数据
if 'head_pose' in raw_data:
before = raw_data['head_pose'].copy()
raw_data['head_pose']['rotation'] -= self.head_pose_offset.get('rotation', 0)
raw_data['head_pose']['tilt'] -= self.head_pose_offset.get('tilt', 0)
raw_data['head_pose']['pitch'] -= self.head_pose_offset.get('pitch', 0)
logger.debug(f"校准前: {before}, 校准后: {raw_data['head_pose']}")
return raw_data return raw_data
@staticmethod @staticmethod
@ -1944,10 +1966,12 @@ class RealIMUDevice:
packet_type = data[1] packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)] vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01° if packet_type == 0x53: # 姿态角单位0.01°
roll, pitch, yaw, temp = vals rxl, pitchl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
roll /= 100.0 # 使用第一段代码里的比例系数
pitch /= 100.0 k_angle = 180.0
yaw /= 100.0 roll = rxl / 32768.0 * k_angle
pitch = pitchl / 32768.0 * k_angle
yaw = yawl / 32768.0 * k_angle
temp = temp / 100.0 temp = temp / 100.0
self.last_data = { self.last_data = {
'roll': roll, 'roll': roll,
@ -2047,16 +2071,8 @@ class MockIMUDevice:
self.head_pose_offset = calibration['head_pose_offset'] self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准数据""" """应用校准:将当前姿态减去初始偏移,得到相对姿态"""
if not self.calibration_data: # 禁用校准应用:直接返回原始数据
return raw_data
# 应用头部姿态零点校准
if 'head_pose' in raw_data:
raw_data['head_pose']['rotation'] -= self.head_pose_offset['rotation']
raw_data['head_pose']['tilt'] -= self.head_pose_offset['tilt']
raw_data['head_pose']['pitch'] -= self.head_pose_offset['pitch']
return raw_data return raw_data
def read_data(self) -> Dict[str, Any]: def read_data(self) -> Dict[str, Any]:

View File

@ -0,0 +1,89 @@
import serial
import time
def checksum(data):
return sum(data[:-1]) & 0xFF
def parse_packet(data):
if len(data) != 11:
return None
if data[0] != 0x55:
return None
if checksum(data) != data[-1]:
print("校验失败")
return None
packet_type = data[1]
# 后 8 字节拆成 4 个 16 位有符号整数(小端序)
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
# if packet_type == 0x51: # 加速度 (单位 0.001g)
# ax, ay, az, temp = vals
# ax /= 1000
# ay /= 1000
# az /= 1000
# temp /= 100
# return f"加速度 (g): x={ax:.3f}, y={ay:.3f}, z={az:.3f}, 温度={temp:.2f}℃"
# elif packet_type == 0x52: # 角速度 (单位 0.01°/s)
# wx, wy, wz, temp = vals
# wx /= 100
# wy /= 100
# wz /= 100
# temp /= 100
# return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃"
if packet_type == 0x53: # 姿态角
rxl, pitchl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = rxl / 32768.0 * k_angle
pitch = pitchl / 32768.0 * k_angle
yaw = yawl / 32768.0 * k_angle
temp /= 100 # 温度依然是 0.01°C
return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}"
# elif packet_type == 0x54: # 磁力计 (单位 uT)
# mx, my, mz, temp = vals
# temp /= 100
# return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃"
# elif packet_type == 0x56: # 气压 (单位 Pa)
# p1, p2, p3, temp = vals
# pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100
# temp /= 100
# return f"气压 (Pa): pressure={pressure:.2f}, 温度={temp:.2f}℃"
# else:
# return f"未知包类型: {packet_type:#04x}"
def read_imu(port='COM6', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray()
try:
while True:
bytes_waiting = ser.in_waiting
if bytes_waiting:
data = ser.read(bytes_waiting)
buffer.extend(data)
while len(buffer) >= 11:
if buffer[0] != 0x55:
buffer.pop(0)
continue
packet = buffer[:11]
result = parse_packet(packet)
if result:
print(result)
buffer = buffer[11:]
time.sleep(0.01)
except KeyboardInterrupt:
print("程序终止")
finally:
ser.close()
if __name__ == "__main__":
read_imu(port='COM8', baudrate=9600)

View File

@ -33,15 +33,15 @@ def parse_packet(data):
# return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃" # return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃"
elif packet_type == 0x53: # 姿态角单位0.01° elif packet_type == 0x53: # 姿态角单位0.01°
roll, pitch, yaw, temp = vals roll, pitch, yaw, temp = vals
roll /= 100 # roll /= 100
pitch /= 100 # pitch /= 100
yaw /= 100 # yaw /= 100
temp = temp / 100 # temp = temp / 100
return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}" return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}"
elif packet_type == 0x54: # 磁力计单位uT elif packet_type == 0x54: # 磁力计单位uT
mx, my, mz, temp = vals mx, my, mz, temp = vals
temp = temp / 100 temp = temp / 100
# return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃" #return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃"
elif packet_type == 0x56: # 气压单位Pa elif packet_type == 0x56: # 气压单位Pa
p1, p2, p3, temp = vals p1, p2, p3, temp = vals
pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100 # 简单合成大部分IMU气压是3字节这里简单示范 pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100 # 简单合成大部分IMU气压是3字节这里简单示范

View File

@ -1020,17 +1020,17 @@ function handleIMUData(data) {
if (rotationCharts) { if (rotationCharts) {
rotationCharts.setOption({ rotationCharts.setOption({
series: [{ data: [{ value: rVal }] }] series: [{ data: [{ value: rVal }] }]
}, true, true) })
} }
if (pitchCharts) { if (pitchCharts) {
pitchCharts.setOption({ pitchCharts.setOption({
series: [{ data: [{ value: pVal }] }] series: [{ data: [{ value: pVal }] }]
}, true, true) })
} }
if (tiltCharts) { if (tiltCharts) {
tiltCharts.setOption({ tiltCharts.setOption({
series: [{ data: [{ value: tVal }] }] series: [{ data: [{ value: tVal }] }]
}, true, true) })
} }
// 使 // 使