深度相机距离,opencv渲染、新增蓝牙IMU

This commit is contained in:
zhaozilong12 2025-09-10 18:11:54 +08:00
parent 7daeb5692f
commit 3fe509d109
5 changed files with 1017 additions and 120 deletions

View File

@ -30,7 +30,8 @@ depth_range_max = 1500
[DEVICES] [DEVICES]
imu_device_type = real imu_device_type = real
imu_port = COM8 imu_port = COM14
imu_mac_address = ef:3c:1a:0a:fe:02
imu_baudrate = 9600 imu_baudrate = 9600
pressure_device_type = real pressure_device_type = real
pressure_use_mock = False pressure_use_mock = False

View File

@ -98,6 +98,13 @@ class FemtoBoltManager(BaseDevice):
# 性能监控 # 性能监控
self.fps_counter = 0 self.fps_counter = 0
# 图像渲染缓存
self.background = None
self.output_buffer = None
self._depth_filtered = None # 用于复用深度图过滤结果
self._blur_buffer = None # 用于复用高斯模糊结果
self._current_gamma = None
self.fps_start_time = time.time() self.fps_start_time = time.time()
self.actual_fps = 0 self.actual_fps = 0
self.dropped_frames = 0 self.dropped_frames = 0
@ -122,6 +129,7 @@ class FemtoBoltManager(BaseDevice):
# 预生成网格背景(避免每帧创建) # 预生成网格背景(避免每帧创建)
self._grid_bg = None self._grid_bg = None
self._grid_size = (480, 640) # 默认尺寸 self._grid_size = (480, 640) # 默认尺寸
self.background = None # 用于缓存等高线渲染的背景
# 自定义彩虹色 colormap参考testfemtobolt.py # 自定义彩虹色 colormap参考testfemtobolt.py
colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue',
@ -147,108 +155,70 @@ class FemtoBoltManager(BaseDevice):
self._current_gamma = self.gamma_value self._current_gamma = self.gamma_value
def _generate_contour_image_opencv(self, depth): def _generate_contour_image_opencv(self, depth):
"""优化的等高线图像生成(增强梯度变化清晰度)""" """改进版 OpenCV 等高线渲染,梯度平滑、局部对比增强"""
try: try:
# 深度数据过滤(与原始函数完全一致) # 初始化 depth_filtered 缓冲区
depth_filtered = depth.copy() if self._depth_filtered is None or self._depth_filtered.shape != depth.shape:
depth_filtered[depth_filtered > 1100] = 0 self._depth_filtered = np.zeros_like(depth, dtype=np.uint16)
depth_filtered[depth_filtered < 500] = 0
# 创建输出图像 np.copyto(self._depth_filtered, depth) # 直接覆盖,不生成新数组
depth_filtered = self._depth_filtered
depth_filtered[depth_filtered > self.depth_range_max] = 0
depth_filtered[depth_filtered < self.depth_range_min] = 0
height, width = depth_filtered.shape height, width = depth_filtered.shape
# 背景图与原始函数一致灰色背景alpha=0.3效果) # 背景缓存
background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3)) # 模拟灰色背景alpha混合 if self.background is None or self.background.shape[:2] != (height, width):
output = np.ones((height, width, 3), dtype=np.uint8) * background_gray background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3))
self.background = np.ones((height, width, 3), dtype=np.uint8) * background_gray
grid_spacing = max(height // 20, width // 20, 10)
for x in range(0, width, grid_spacing):
cv2.line(self.background, (x, 0), (x, height-1), (255, 255, 255), 1)
for y in range(0, height, grid_spacing):
cv2.line(self.background, (0, y), (width-1, y), (255, 255, 255), 1)
# 绘制白色网格线与原始函数grid效果一致 # 初始化输出缓存和模糊缓存
grid_spacing = max(height // 20, width // 20, 10) # 自适应网格间距 self.output_buffer = np.empty_like(self.background)
for x in range(0, width, grid_spacing): self._blur_buffer = np.empty_like(self.background)
cv2.line(output, (x, 0), (x, height-1), (255, 255, 255), 1)
for y in range(0, height, grid_spacing):
cv2.line(output, (0, y), (width-1, y), (255, 255, 255), 1)
# 使用masked数据与原始函数np.ma.masked_equal逻辑一致 # 复用输出缓存,避免 copy()
np.copyto(self.output_buffer, self.background)
output = self.output_buffer
valid_mask = depth_filtered > 0 valid_mask = depth_filtered > 0
if np.any(valid_mask): if np.any(valid_mask):
# 将深度值映射到500-1100范围与原始函数vmin=500, vmax=1100一致 # 连续归一化深度值
depth_for_contour = depth_filtered.copy().astype(np.float32) norm_depth = np.zeros_like(depth_filtered, dtype=np.float32)
depth_for_contour[~valid_mask] = np.nan # 无效区域设为NaN norm_depth[valid_mask] = (depth_filtered[valid_mask] - self.depth_range_min) / (self.depth_range_max - self.depth_range_min)
norm_depth = np.clip(norm_depth, 0, 1) ** 0.8 # Gamma增强
# 增加等高线层级数量以获得更细腻的梯度变化从100增加到200 # 使用 colormap 映射
levels = np.linspace(500, 1100, 201) # 200个等高线层级提升梯度细腻度 cmap_colors = (self.custom_cmap(norm_depth)[..., :3] * 255).astype(np.uint8)
output[valid_mask] = cmap_colors[valid_mask]
# 创建等高线边界增强图像 # Sobel 边界检测 + cv2.magnitude 替换 np.hypot
contour_edges = np.zeros((height, width), dtype=np.uint8) depth_uint8 = (norm_depth * 255).astype(np.uint8)
gx = cv2.Sobel(depth_uint8, cv2.CV_32F, 1, 0, ksize=3)
gy = cv2.Sobel(depth_uint8, cv2.CV_32F, 0, 1, ksize=3)
grad_mag = cv2.magnitude(gx, gy)
grad_mag = grad_mag.astype(np.uint8)
# 为每个像素分配等高线层级 # 自适应局部对比度增强(向量化)
for i in range(len(levels) - 1): edge_mask = grad_mag > 30
level_min = levels[i] output[edge_mask] = np.clip(output[edge_mask].astype(np.float32) * 1.5, 0, 255).astype(np.uint8)
level_max = levels[i + 1]
# 创建当前层级的掩码 # 高斯平滑,复用 dst 缓冲区
level_mask = (depth_filtered >= level_min) & (depth_filtered < level_max) cv2.GaussianBlur(output, (3, 3), 0.3, dst=self._blur_buffer)
if np.any(level_mask): # 注意:这里不进行裁剪,而是返回完整图像
# 增强颜色映射算法 - 使用非线性映射增强对比度 # 推迟裁剪到显示阶段,与 testfemtobolt.py 保持一致
color_val = (level_min - 500) / (1100 - 500) # 原代码在这里进行了裁剪:
color_val = np.clip(color_val, 0, 1) # target_width = height // 2
# if width > target_width:
# 应用Gamma校正增强对比度gamma=0.8增强中间色调) # left = (width - target_width) // 2
color_val_enhanced = np.power(color_val, 0.8) # right = left + target_width
# output = output[:, left:right]
# 应用自定义colormap return self._blur_buffer
color = self.custom_cmap(color_val_enhanced)[:3]
color_bgr = (np.array(color) * 255).astype(np.uint8)
# 赋值颜色BGR格式
output[level_mask, 0] = color_bgr[2] # B
output[level_mask, 1] = color_bgr[1] # G
output[level_mask, 2] = color_bgr[0] # R
# 检测等高线边界每10个层级检测一次主要等高线
if i % 10 == 0:
# 使用形态学操作检测边界
kernel = np.ones((3, 3), np.uint8)
dilated = cv2.dilate(level_mask.astype(np.uint8), kernel, iterations=1)
eroded = cv2.erode(level_mask.astype(np.uint8), kernel, iterations=1)
edge = dilated - eroded
contour_edges = cv2.bitwise_or(contour_edges, edge)
# 增强等高线边界
if np.any(contour_edges):
# 对等高线边界进行轻微扩展
kernel = np.ones((2, 2), np.uint8)
contour_edges = cv2.dilate(contour_edges, kernel, iterations=1)
# 在等高线边界处增强对比度
edge_mask = contour_edges > 0
if np.any(edge_mask):
# 增强边界处的颜色对比度
for c in range(3):
channel = output[:, :, c].astype(np.float32)
# 对边界像素应用对比度增强
channel[edge_mask] = np.clip(channel[edge_mask] * 1.2, 0, 255)
output[:, :, c] = channel.astype(np.uint8)
# 减少过度平滑处理以保持清晰度
# 仅应用轻微的降噪处理,保持梯度边界清晰
output = cv2.bilateralFilter(output, 3, 20, 20) # 减少滤波强度
# 裁剪宽度(与原始函数保持一致)
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
output = output[:, left:right]
# 最终锐化处理增强细节
# 使用USM锐化增强等高线细节
gaussian = cv2.GaussianBlur(output, (0, 0), 1.0)
output = cv2.addWeighted(output, 1.5, gaussian, -0.5, 0)
output = np.clip(output, 0, 255).astype(np.uint8)
return output
except Exception as e: except Exception as e:
self.logger.error(f"优化等高线生成失败: {e}") self.logger.error(f"优化等高线生成失败: {e}")
@ -274,8 +244,8 @@ class FemtoBoltManager(BaseDevice):
self.ax.clear() self.ax.clear()
# 深度数据过滤与display_x.py完全一致 # 深度数据过滤与display_x.py完全一致
depth[depth > 1100] = 0 depth[depth > self.depth_range_max] = 0
depth[depth < 500] = 0 depth[depth < self.depth_range_min] = 0
# 背景图与display_x.py完全一致 # 背景图与display_x.py完全一致
background = np.ones_like(depth) * 0.5 # 设定灰色背景 background = np.ones_like(depth) * 0.5 # 设定灰色背景
@ -298,7 +268,7 @@ class FemtoBoltManager(BaseDevice):
# 绘制等高线图并设置原点在上方与display_x.py完全一致 # 绘制等高线图并设置原点在上方与display_x.py完全一致
import time import time
start_time = time.perf_counter() start_time = time.perf_counter()
self.ax.contourf(depth, levels=100, cmap=self.custom_cmap, vmin=500, vmax=1100, origin='upper', zorder=2) self.ax.contourf(depth, levels=100, cmap=self.custom_cmap, vmin=self.depth_range_min, vmax=self.depth_range_max, origin='upper', zorder=2)
contourf_time = time.perf_counter() - start_time contourf_time = time.perf_counter() - start_time
# self.logger.info(f"contourf绘制耗时: {contourf_time*1000:.2f}ms") # self.logger.info(f"contourf绘制耗时: {contourf_time*1000:.2f}ms")
@ -458,9 +428,9 @@ class FemtoBoltManager(BaseDevice):
# 配置FemtoBolt设备参数 # 配置FemtoBolt设备参数
self.femtobolt_config = self.pykinect.default_configuration self.femtobolt_config = self.pykinect.default_configuration
self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_2X2BINNED self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.femtobolt_config.color_format = self.pykinect.K4A_IMAGE_FORMAT_COLOR_BGRA32 self.femtobolt_config.color_format = self.pykinect.K4A_IMAGE_FORMAT_COLOR_BGRA32
self.femtobolt_config.color_resolution = self.pykinect.K4A_COLOR_RESOLUTION_720P self.femtobolt_config.color_resolution = self.pykinect.K4A_COLOR_RESOLUTION_1080P
self.femtobolt_config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15 self.femtobolt_config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15
self.femtobolt_config.synchronized_images_only = False self.femtobolt_config.synchronized_images_only = False
@ -657,8 +627,17 @@ class FemtoBoltManager(BaseDevice):
# 如果等高线生成失败,跳过这一帧 # 如果等高线生成失败,跳过这一帧
continue continue
# 裁剪处理(推迟到显示阶段)
h, w = depth_colored_final.shape[:2]
target_width = h // 2
display_image = depth_colored_final
if w > target_width:
left = (w - target_width) // 2
right = left + target_width
display_image = depth_colored_final[:, left:right]
# 推送SocketIO # 推送SocketIO
success, buffer = cv2.imencode('.jpg', depth_colored_final, self._encode_param) success, buffer = cv2.imencode('.jpg', display_image, self._encode_param)
if success and self._socketio: if success and self._socketio:
jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8') jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8')
self._socketio.emit('femtobolt_frame', { self._socketio.emit('femtobolt_frame', {

View File

@ -15,6 +15,7 @@ import logging
from collections import deque from collections import deque
import struct import struct
from datetime import datetime from datetime import datetime
import asyncio
try: try:
from .base_device import BaseDevice from .base_device import BaseDevice
@ -66,7 +67,6 @@ class RealIMUDevice:
self.calibration_data = calibration self.calibration_data = calibration
if 'head_pose_offset' in calibration: if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset'] self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量""" """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data: if not raw_data or 'head_pose' not in raw_data:
@ -246,11 +246,257 @@ class MockIMUDevice:
}, },
'timestamp': datetime.now().isoformat() 'timestamp': datetime.now().isoformat()
} }
# 应用校准并返回 # 应用校准并返回
return self.apply_calibration(raw_data) if apply_calibration else raw_data return self.apply_calibration(raw_data) if apply_calibration else raw_data
class BleIMUDevice:
"""蓝牙IMU设备基于bleak实现解析逻辑参考tests/testblueimu.py"""
def __init__(self, mac_address: str):
self.mac_address = mac_address
self.loop = None
self.loop_thread = None
self.client = None
self.running = False
self._lock = threading.Lock()
self.disconnected_event = None
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
self._connected = False
# GATT特征参考测试脚本中的handle/short uuid
self._notify_char = 0x0007
self._write_char = 0x0005
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
# 与串口IMU保持一致对 rotation 做归一化到 [-180, 180)
angle = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['rotation'] = ((angle + 180) % 360) - 180
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
def start(self):
if self.running:
return
self.running = True
self.loop_thread = threading.Thread(target=self._run_loop, daemon=True)
self.loop_thread.start()
def stop(self):
self.running = False
try:
if self.loop:
asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop)
except Exception:
pass
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
with self._lock:
raw = {
'head_pose': {
'rotation': self.last_data['yaw'], # rotation 对应航向角
'tilt': self.last_data['pitch'], # tilt 对应横滚
'pitch': self.last_data['roll'] # pitch 对应俯仰
},
'temperature': self.last_data.get('temperature', 25.0),
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
def _run_loop(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
self.loop.run_until_complete(self._connect_and_listen())
except Exception as e:
logger.error(f'BLE IMU事件循环异常: {e}', exc_info=True)
finally:
try:
if not self.loop.is_closed():
self.loop.stop()
self.loop.close()
except Exception:
pass
async def _disconnect(self):
try:
if self.client and self.client.is_connected:
try:
await self.client.stop_notify(self._notify_char)
except Exception:
pass
await self.client.disconnect()
except Exception:
pass
async def _connect_and_listen(self):
try:
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
except Exception as e:
logger.error(f"未安装bleak或导入失败: {e}")
self.running = False
return
while self.running:
try:
logger.info(f"扫描并连接蓝牙IMU: {self.mac_address} ...")
device = await BleakScanner.find_device_by_address(self.mac_address, cb=dict(use_bdaddr=False))
if device is None:
logger.warning(f"未找到设备: {self.mac_address}")
await asyncio.sleep(2.0)
continue
self.disconnected_event = asyncio.Event()
def _on_disconnected(_client):
logger.info("BLE IMU已断开连接")
self._connected = False
try:
self.disconnected_event.set()
except Exception:
pass
self.client = BleakClient(device, disconnected_callback=_on_disconnected)
logger.info("正在连接BLE IMU...")
await self.client.connect()
self._connected = True
logger.info("BLE IMU连接成功")
# 订阅通知
await self.client.start_notify(self._notify_char, self._notification_handler)
# 发送初始化指令(参考测试脚本)
try:
await self.client.write_gatt_char(self._write_char, bytes([0x29])) # 保持连接
await asyncio.sleep(0.2)
await self.client.write_gatt_char(self._write_char, bytes([0x46])) # 高速模式
await asyncio.sleep(0.2)
isCompassOn = 0
barometerFilter = 2
Cmd_ReportTag = 0x0FFF
params = bytearray([0x00 for _ in range(11)])
params[0] = 0x12
params[1] = 5
params[2] = 255
params[3] = 0
params[4] = ((barometerFilter & 3) << 1) | (isCompassOn & 1)
params[5] = 60 # 发送帧率
params[6] = 1
params[7] = 3
params[8] = 5
params[9] = Cmd_ReportTag & 0xff
params[10] = (Cmd_ReportTag >> 8) & 0xff
await self.client.write_gatt_char(self._write_char, params)
await asyncio.sleep(0.2)
await self.client.write_gatt_char(self._write_char, bytes([0x19])) # 开始上报
except Exception as e:
logger.warning(f"BLE IMU写入初始化指令失败: {e}")
# 保持连接直到停止或断开
while self.running and self.client and self.client.is_connected:
await asyncio.sleep(1.0)
# 退出前尝试停止通知
try:
await self.client.stop_notify(self._notify_char)
except Exception:
pass
try:
await self.client.disconnect()
except Exception:
pass
self._connected = False
except Exception as e:
logger.error(f"BLE IMU连接/监听失败: {e}", exc_info=True)
self._connected = False
await asyncio.sleep(2.0)
def _notification_handler(self, characteristic, data: bytearray):
"""通知回调解析IMU数据更新欧拉角"""
try:
buf = data
if len(buf) < 3:
return
if buf[0] != 0x11:
return
# 比例系数
scaleAngle = 0.0054931640625 # 180/32768
scaleTemperature = 0.01
ctl = (buf[2] << 8) | buf[1]
L = 7 # 数据偏移起点
tmp_temperature = None
# 跳过前面不关心的标志位,关注角度与温度
if (ctl & 0x0001) != 0:
L += 6 # aX aY aZ
if (ctl & 0x0002) != 0:
L += 6 # AX AY AZ
if (ctl & 0x0004) != 0:
L += 6 # GX GY GZ
if (ctl & 0x0008) != 0:
L += 6 # CX CY CZ
if (ctl & 0x0010) != 0:
# 温度
tmpX = np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleTemperature
L += 2
# 气压与高度各3字节
# 气压
tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L])))
if (tmpU32 & 0x800000) == 0x800000:
tmpU32 = (tmpU32 | 0xff000000)
tmpY = np.int32(tmpU32) * 0.0002384185791
L += 3
# 高度
tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L])))
if (tmpU32 & 0x800000) == 0x800000:
tmpU32 = (tmpU32 | 0xff000000)
tmpZ = np.int32(tmpU32) * 0.0010728836
L += 3
tmp_temperature = float(tmpX)
if (ctl & 0x0020) != 0:
L += 8 # 四元数 wxyz
if (ctl & 0x0040) != 0:
angleX = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2
angleY = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2
angleZ = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2
with self._lock:
# 映射roll=X, pitch=Y, yaw=Z
self.last_data['roll'] = angleX*-1
self.last_data['pitch'] = angleY*-1
self.last_data['yaw'] = angleZ*-1
if tmp_temperature is not None:
self.last_data['temperature'] = tmp_temperature
except Exception:
# 解析失败忽略
pass
@property
def connected(self) -> bool:
return self._connected
class IMUManager(BaseDevice): class IMUManager(BaseDevice):
"""IMU传感器管理器""" """IMU传感器管理器"""
@ -274,8 +520,9 @@ class IMUManager(BaseDevice):
# 设备配置 # 设备配置
self.port = config.get('port', 'COM7') self.port = config.get('port', 'COM7')
self.baudrate = config.get('baudrate', 9600) self.baudrate = config.get('baudrate', 9600)
self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock' self.device_type = config.get('device_type', 'mock') # 'real' | 'mock' | 'ble'
self.use_mock = config.get('use_mock', False) # 保持向后兼容 self.use_mock = config.get('use_mock', False) # 保持向后兼容
self.mac_address = config.get('mac_address', '')
# IMU设备实例 # IMU设备实例
self.imu_device = None self.imu_device = None
@ -295,7 +542,7 @@ class IMUManager(BaseDevice):
self.data_buffer = deque(maxlen=100) self.data_buffer = deque(maxlen=100)
self.last_valid_data = None self.last_valid_data = None
self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}") self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}, MAC: {self.mac_address}")
def initialize(self) -> bool: def initialize(self) -> bool:
""" """
@ -308,13 +555,19 @@ class IMUManager(BaseDevice):
self.logger.info(f"正在初始化IMU设备...") self.logger.info(f"正在初始化IMU设备...")
# 使用构造函数中已加载的配置,避免并发读取配置文件 # 使用构造函数中已加载的配置,避免并发读取配置文件
self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}") self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}, mac={self.mac_address}")
# 根据配置选择真实设备或模拟设备 # 根据配置选择设备类型
# 优先使用device_type配置如果没有则使用use_mock配置向后兼容 if self.device_type == 'ble':
use_real_device = (self.device_type == 'real') or (not self.use_mock) if not self.mac_address:
self.logger.error("IMU BLE设备未配置MAC地址")
if use_real_device: self.is_connected = False
return False
self.logger.info(f"使用蓝牙IMU设备 - MAC: {self.mac_address}")
self.imu_device = BleIMUDevice(self.mac_address)
self.imu_device.start()
self.is_connected = True
elif self.device_type == 'real' or (self.device_type != 'mock' and not self.use_mock):
self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}") self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}")
self.imu_device = RealIMUDevice(self.port, self.baudrate) self.imu_device = RealIMUDevice(self.port, self.baudrate)
@ -324,15 +577,17 @@ class IMUManager(BaseDevice):
self.is_connected = False self.is_connected = False
self.imu_device = None self.imu_device = None
return False return False
self.is_connected = True
else: else:
self.logger.info("使用模拟IMU设备") self.logger.info("使用模拟IMU设备")
self.imu_device = MockIMUDevice() self.imu_device = MockIMUDevice()
self.is_connected = True
self.is_connected = True
self._device_info.update({ self._device_info.update({
'port': self.port, 'port': self.port,
'baudrate': self.baudrate, 'baudrate': self.baudrate,
'use_mock': self.use_mock 'use_mock': self.use_mock,
'mac_address': self.mac_address,
}) })
self.logger.info("IMU初始化成功") self.logger.info("IMU初始化成功")
@ -462,6 +717,13 @@ class IMUManager(BaseDevice):
if self.imu_thread and self.imu_thread.is_alive(): if self.imu_thread and self.imu_thread.is_alive():
self.imu_thread.join(timeout=3.0) self.imu_thread.join(timeout=3.0)
# 停止BLE后台任务如果是BLE设备
try:
if isinstance(self.imu_device, BleIMUDevice):
self.imu_device.stop()
except Exception:
pass
self.logger.info("IMU数据流已停止") self.logger.info("IMU数据流已停止")
return True return True
@ -524,7 +786,8 @@ class IMUManager(BaseDevice):
'buffer_size': len(self.data_buffer), 'buffer_size': len(self.data_buffer),
'has_data': self.last_valid_data is not None, 'has_data': self.last_valid_data is not None,
'head_pose_offset': self.head_pose_offset, 'head_pose_offset': self.head_pose_offset,
'device_type': 'mock' if self.use_mock else 'real' 'device_type': self.device_type,
'mac_address': self.mac_address
}) })
return status return status
@ -609,6 +872,7 @@ class IMUManager(BaseDevice):
self.baudrate = config.get('baudrate', 9600) self.baudrate = config.get('baudrate', 9600)
self.device_type = config.get('device_type', 'mock') self.device_type = config.get('device_type', 'mock')
self.use_mock = config.get('use_mock', False) self.use_mock = config.get('use_mock', False)
self.mac_address = config.get('mac_address', '')
# 更新数据缓存队列大小 # 更新数据缓存队列大小
buffer_size = config.get('buffer_size', 100) buffer_size = config.get('buffer_size', 100)
@ -621,7 +885,7 @@ class IMUManager(BaseDevice):
for data in current_data[-buffer_size:]: for data in current_data[-buffer_size:]:
self.data_buffer.append(data) self.data_buffer.append(data)
self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}") self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}, MAC: {self.mac_address}")
return True return True
except Exception as e: except Exception as e:

View File

@ -0,0 +1,649 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
IMU传感器管理器
负责IMU传感器的连接校准和头部姿态数据采集
"""
import serial
import threading
import time
import json
import numpy as np
from typing import Optional, Dict, Any, List, Tuple
import logging
from collections import deque
import struct
from datetime import datetime
try:
from .base_device import BaseDevice
from .utils.socket_manager import SocketManager
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.socket_manager import SocketManager
from utils.config_manager import ConfigManager
# 设置日志
logger = logging.getLogger(__name__)
class RealIMUDevice:
"""真实IMU设备通过串口读取姿态数据"""
def __init__(self, port, baudrate):
self.port = port
self.baudrate = baudrate
self.ser = None
self.buffer = bytearray()
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}')
self._connect()
def _connect(self):
try:
logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}')
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
if hasattr(self.ser, 'reset_input_buffer'):
try:
self.ser.reset_input_buffer()
logger.debug('已清空串口输入缓冲区')
except Exception as e:
logger.debug(f'重置串口输入缓冲区失败: {e}')
logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps')
except Exception as e:
# logger.error(f'IMU设备连接失败: {e}', exc_info=True)
self.ser = None
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data:
return raw_data
# 应用校准偏移
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
angle=head_pose['rotation'] - self.head_pose_offset['rotation']
# 减去基准值(零点偏移)
head_pose['rotation'] = ((angle + 180) % 360) - 180
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
@staticmethod
def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF
def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]:
if len(data) != 11:
logger.debug(f'无效数据包长度: {len(data)}')
return None
if data[0] != 0x55:
logger.debug(f'错误的包头: 0x{data[0]:02X}')
return None
if self._checksum(data) != data[-1]:
logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}')
return None
packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01°
pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = -round(rxl / 32768.0 * k_angle,2)
pitch = -round(pitchl / 32768.0 * k_angle,2)
yaw = -round(yawl / 32768.0 * k_angle,2)
temp = temp / 100.0
self.last_data = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'temperature': temp
}
# print(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data
else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
return None
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
if not self.ser or not getattr(self.ser, 'is_open', False):
# logger.warning('IMU串口未连接尝试重新连接...')
self._connect()
return {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
try:
bytes_waiting = self.ser.in_waiting
if bytes_waiting:
# logger.debug(f'串口缓冲区待读字节: {bytes_waiting}')
chunk = self.ser.read(bytes_waiting)
# logger.debug(f'读取到字节: {len(chunk)}')
self.buffer.extend(chunk)
while len(self.buffer) >= 11:
if self.buffer[0] != 0x55:
dropped = self.buffer.pop(0)
logger.debug(f'丢弃无效字节: 0x{dropped:02X}')
continue
packet = bytes(self.buffer[:11])
parsed = self._parse_packet(packet)
del self.buffer[:11]
if parsed is not None:
raw = {
'head_pose': {
'rotation': parsed['yaw'], # rotation = roll
'tilt': parsed['roll'], # tilt = yaw
'pitch': parsed['pitch'] # pitch = pitch
},
'temperature': parsed['temperature'],
'timestamp': datetime.now().isoformat()
}
# logger.debug(f'映射后的头部姿态: {raw}')
return self.apply_calibration(raw) if apply_calibration else raw
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
except Exception as e:
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw) if apply_calibration else raw
def __del__(self):
try:
if self.ser and getattr(self.ser, 'is_open', False):
self.ser.close()
logger.info('IMU设备串口已关闭')
except Exception:
pass
class MockIMUDevice:
"""模拟IMU设备"""
def __init__(self):
self.noise_level = 0.1
self.calibration_data = None # 校准数据
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 头部姿态零点偏移
def set_calibration(self, calibration: Dict[str, Any]):
"""设置校准数据"""
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对姿态"""
if not raw_data or 'head_pose' not in raw_data:
return raw_data
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]:
"""读取IMU数据"""
# 生成头部姿态角度数据,角度范围(-90°, +90°)
# 使用正弦波模拟自然的头部运动,添加随机噪声
import time
current_time = time.time()
# 旋转角(左旋为负,右旋为正)
rotation_angle = 30 * np.sin(current_time * 0.5) + np.random.normal(0, self.noise_level * 5)
rotation_angle = np.clip(rotation_angle, -90, 90)
# 倾斜角(左倾为负,右倾为正)
tilt_angle = 20 * np.sin(current_time * 0.3 + np.pi/4) + np.random.normal(0, self.noise_level * 5)
tilt_angle = np.clip(tilt_angle, -90, 90)
# 俯仰角(俯角为负,仰角为正)
pitch_angle = 15 * np.sin(current_time * 0.7 + np.pi/2) + np.random.normal(0, self.noise_level * 5)
pitch_angle = np.clip(pitch_angle, -90, 90)
# 生成原始数据
raw_data = {
'head_pose': {
'rotation': rotation_angle, # 旋转角:左旋(-), 右旋(+)
'tilt': tilt_angle, # 倾斜角:左倾(-), 右倾(+)
'pitch': pitch_angle # 俯仰角:俯角(-), 仰角(+)
},
'timestamp': datetime.now().isoformat()
}
# 应用校准并返回
return self.apply_calibration(raw_data) if apply_calibration else raw_data
class IMUManager(BaseDevice):
"""IMU传感器管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化IMU管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
config = self.config_manager.get_device_config('imu')
super().__init__("imu", config)
# 保存socketio实例
self._socketio = socketio
# 设备配置
self.port = config.get('port', 'COM7')
self.baudrate = config.get('baudrate', 9600)
self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock'
self.use_mock = config.get('use_mock', False) # 保持向后兼容
# IMU设备实例
self.imu_device = None
# 推流相关
self.imu_streaming = False
self.imu_thread = None
# 统计信息
self.data_count = 0
self.error_count = 0
# 校准相关
self.is_calibrated = False
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
# 数据缓存
self.data_buffer = deque(maxlen=100)
self.last_valid_data = None
self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}")
def initialize(self) -> bool:
"""
初始化IMU设备
Returns:
bool: 初始化是否成功
"""
try:
self.logger.info(f"正在初始化IMU设备...")
# 使用构造函数中已加载的配置,避免并发读取配置文件
self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}")
# 根据配置选择真实设备或模拟设备
# 优先使用device_type配置如果没有则使用use_mock配置向后兼容
use_real_device = (self.device_type == 'real') or (not self.use_mock)
if use_real_device:
self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}")
self.imu_device = RealIMUDevice(self.port, self.baudrate)
# 检查真实设备是否连接成功
if self.imu_device.ser is None:
self.logger.error(f"IMU设备连接失败: 无法打开串口 {self.port}")
self.is_connected = False
self.imu_device = None
return False
else:
self.logger.info("使用模拟IMU设备")
self.imu_device = MockIMUDevice()
self.is_connected = True
self._device_info.update({
'port': self.port,
'baudrate': self.baudrate,
'use_mock': self.use_mock
})
self.logger.info("IMU初始化成功")
return True
except Exception as e:
self.logger.error(f"IMU初始化失败: {e}")
self.is_connected = False
self.imu_device = None
return False
def _quick_calibrate_imu(self) -> Dict[str, Any]:
"""
快速IMU零点校准以当前姿态为基准
Returns:
Dict[str, Any]: 校准结果
"""
try:
if not self.imu_device:
return {'status': 'error', 'error': 'IMU设备未初始化'}
self.logger.info('开始IMU快速零点校准...')
# 直接读取一次原始数据作为校准偏移量
raw_data = self.imu_device.read_data(apply_calibration=False)
if not raw_data or 'head_pose' not in raw_data:
return {'status': 'error', 'error': '无法读取IMU原始数据'}
# 使用当前姿态作为零点偏移
self.head_pose_offset = {
'rotation': raw_data['head_pose']['rotation'],
'tilt': raw_data['head_pose']['tilt'],
'pitch': raw_data['head_pose']['pitch']
}
# 应用校准到设备
calibration_data = {'head_pose_offset': self.head_pose_offset}
self.imu_device.set_calibration(calibration_data)
self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}')
return {
'status': 'success',
'head_pose_offset': self.head_pose_offset
}
except Exception as e:
self.logger.error(f'IMU快速校准失败: {e}')
return {'status': 'error', 'error': str(e)}
def calibrate(self) -> bool:
"""
校准IMU传感器
Returns:
bool: 校准是否成功
"""
try:
if not self.is_connected or not self.imu_device:
if not self.initialize():
self.logger.error("IMU设备未连接")
return False
# 使用快速校准方法
result = self._quick_calibrate_imu()
if result['status'] == 'success':
self.is_calibrated = True
self.logger.info("IMU校准成功")
return True
else:
self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}")
return False
except Exception as e:
self.logger.error(f"IMU校准失败: {e}")
return False
def start_streaming(self) -> bool:
"""
开始IMU数据流
Args:
socketio: SocketIO实例用于数据推送
Returns:
bool: 启动是否成功
"""
try:
if not self.is_connected or not self.imu_device:
if not self.initialize():
return False
if self.imu_streaming:
self.logger.warning("IMU数据流已在运行")
return True
# 启动前进行快速校准
if not self.is_calibrated:
self.logger.info("启动前进行快速零点校准...")
self._quick_calibrate_imu()
self.imu_streaming = True
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
self.imu_thread.start()
self.logger.info("IMU数据流启动成功")
return True
except Exception as e:
self.logger.error(f"IMU数据流启动失败: {e}")
self.imu_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止IMU数据流
Returns:
bool: 停止是否成功
"""
try:
self.imu_streaming = False
if self.imu_thread and self.imu_thread.is_alive():
self.imu_thread.join(timeout=3.0)
self.logger.info("IMU数据流已停止")
return True
except Exception as e:
self.logger.error(f"停止IMU数据流失败: {e}")
return False
def _imu_streaming_thread(self):
"""
IMU数据流工作线程
"""
self.logger.info("IMU数据流工作线程启动")
while self.imu_streaming:
try:
if self.imu_device:
# 读取IMU数据
data = self.imu_device.read_data(apply_calibration=True)
if data:
# 缓存数据
# self.data_buffer.append(data)
# self.last_valid_data = data
# 发送数据到前端
if self._socketio:
self._socketio.emit('imu_data', data, namespace='/devices')
# 更新统计
self.data_count += 1
else:
self.error_count += 1
time.sleep(0.02) # 50Hz采样率
except Exception as e:
self.logger.error(f"IMU数据流处理异常: {e}")
self.error_count += 1
time.sleep(0.1)
self.logger.info("IMU数据流工作线程结束")
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
status = super().get_status()
status.update({
'port': self.port,
'baudrate': self.baudrate,
'is_streaming': self.imu_streaming,
'is_calibrated': self.is_calibrated,
'data_count': self.data_count,
'error_count': self.error_count,
'buffer_size': len(self.data_buffer),
'has_data': self.last_valid_data is not None,
'head_pose_offset': self.head_pose_offset,
'device_type': 'mock' if self.use_mock else 'real'
})
return status
def get_latest_data(self) -> Optional[Dict[str, float]]:
"""
获取最新的IMU数据
Returns:
Optional[Dict[str, float]]: 最新数据无数据返回None
"""
return self.last_valid_data.copy() if self.last_valid_data else None
def collect_head_pose_data(self, duration: int = 10) -> List[Dict[str, Any]]:
"""
收集头部姿态数据
Args:
duration: 收集时长
Returns:
List[Dict[str, Any]]: 收集到的数据列表
"""
collected_data = []
if not self.is_connected or not self.imu_device:
self.logger.error("IMU设备未连接")
return collected_data
self.logger.info(f"开始收集头部姿态数据,时长: {duration}")
start_time = time.time()
while time.time() - start_time < duration:
try:
data = self.imu_device.read_data(apply_calibration=True)
if data:
# 添加时间戳
data['timestamp'] = time.time()
collected_data.append(data)
time.sleep(0.02) # 50Hz采样率
except Exception as e:
self.logger.error(f"数据收集异常: {e}")
break
self.logger.info(f"头部姿态数据收集完成,共收集 {len(collected_data)} 个样本")
return collected_data
def disconnect(self):
"""
断开IMU设备连接
"""
try:
self.stop_streaming()
if self.imu_device:
self.imu_device = None
self.is_connected = False
self.logger.info("IMU设备已断开连接")
except Exception as e:
self.logger.error(f"断开IMU设备连接失败: {e}")
def reload_config(self) -> bool:
"""
重新加载设备配置
Returns:
bool: 重新加载是否成功
"""
try:
self.logger.info("正在重新加载IMU配置...")
# 获取最新配置
config = self.config_manager.get_device_config('imu')
# 更新配置属性
self.port = config.get('port', 'COM7')
self.baudrate = config.get('baudrate', 9600)
self.device_type = config.get('device_type', 'mock')
self.use_mock = config.get('use_mock', False)
# 更新数据缓存队列大小
buffer_size = config.get('buffer_size', 100)
if buffer_size != self.data_buffer.maxlen:
# 保存当前数据
current_data = list(self.data_buffer)
# 创建新缓冲区
self.data_buffer = deque(maxlen=buffer_size)
# 恢复数据(保留最新的数据)
for data in current_data[-buffer_size:]:
self.data_buffer.append(data)
self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}")
return True
except Exception as e:
self.logger.error(f"重新加载IMU配置失败: {e}")
return False
def cleanup(self):
"""
清理资源
"""
try:
self.disconnect()
# 清理缓冲区
self.data_buffer.clear()
# 重置状态
self.is_calibrated = False
self.last_valid_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
super().cleanup()
self.logger.info("IMU资源清理完成")
except Exception as e:
self.logger.error(f"清理IMU资源失败: {e}")

View File

@ -206,6 +206,7 @@ class ConfigManager:
'baudrate': self.config.getint('DEVICES', 'imu_baudrate', fallback=9600), 'baudrate': self.config.getint('DEVICES', 'imu_baudrate', fallback=9600),
'timeout': self.config.getfloat('DEVICES', 'imu_timeout', fallback=1.0), 'timeout': self.config.getfloat('DEVICES', 'imu_timeout', fallback=1.0),
'calibration_samples': self.config.getint('DEVICES', 'imu_calibration_samples', fallback=100), 'calibration_samples': self.config.getint('DEVICES', 'imu_calibration_samples', fallback=100),
'mac_address': self.config.get('DEVICES', 'imu_mac_address', fallback=''),
} }
def _get_pressure_config(self) -> Dict[str, Any]: def _get_pressure_config(self) -> Dict[str, Any]:
@ -352,9 +353,10 @@ class ConfigManager:
Args: Args:
config_data: IMU配置数据 config_data: IMU配置数据
{ {
'device_type': 'real' | 'mock', 'device_type': 'real' | 'mock' | 'ble',
'port': 'COM6', 'port': 'COM6',
'baudrate': 9600 'baudrate': 9600,
'mac_address': 'ef:3c:1a:0a:fe:02'
} }
Returns: Returns:
@ -368,6 +370,8 @@ class ConfigManager:
self.set_config_value('DEVICES', 'imu_port', config_data['port']) self.set_config_value('DEVICES', 'imu_port', config_data['port'])
if 'baudrate' in config_data: if 'baudrate' in config_data:
self.set_config_value('DEVICES', 'imu_baudrate', str(config_data['baudrate'])) self.set_config_value('DEVICES', 'imu_baudrate', str(config_data['baudrate']))
if 'mac_address' in config_data:
self.set_config_value('DEVICES', 'imu_mac_address', config_data['mac_address'])
# 保存配置 # 保存配置
self.save_config() self.save_config()