From 3fe509d109fd64a30a17abea3c795bcabf579aa4 Mon Sep 17 00:00:00 2001 From: zhaozilong12 <405241463@qq.com> Date: Wed, 10 Sep 2025 18:11:54 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=B1=E5=BA=A6=E7=9B=B8=E6=9C=BA=E8=B7=9D?= =?UTF-8?q?=E7=A6=BB=EF=BC=8Copencv=E6=B8=B2=E6=9F=93=E3=80=81=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E8=93=9D=E7=89=99IMU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/config.ini | 3 +- backend/devices/femtobolt_manager.py | 185 +++---- backend/devices/imu_manager.py | 292 ++++++++++- backend/devices/imu_manager_usb_bak.py | 649 ++++++++++++++++++++++++ backend/devices/utils/config_manager.py | 8 +- 5 files changed, 1017 insertions(+), 120 deletions(-) create mode 100644 backend/devices/imu_manager_usb_bak.py diff --git a/backend/config.ini b/backend/config.ini index c23d90d6..e7de00df 100644 --- a/backend/config.ini +++ b/backend/config.ini @@ -30,7 +30,8 @@ depth_range_max = 1500 [DEVICES] imu_device_type = real -imu_port = COM8 +imu_port = COM14 +imu_mac_address = ef:3c:1a:0a:fe:02 imu_baudrate = 9600 pressure_device_type = real pressure_use_mock = False diff --git a/backend/devices/femtobolt_manager.py b/backend/devices/femtobolt_manager.py index e1e815a3..4ff854ca 100644 --- a/backend/devices/femtobolt_manager.py +++ b/backend/devices/femtobolt_manager.py @@ -98,6 +98,13 @@ class FemtoBoltManager(BaseDevice): # 性能监控 self.fps_counter = 0 + + # 图像渲染缓存 + self.background = None + self.output_buffer = None + self._depth_filtered = None # 用于复用深度图过滤结果 + self._blur_buffer = None # 用于复用高斯模糊结果 + self._current_gamma = None self.fps_start_time = time.time() self.actual_fps = 0 self.dropped_frames = 0 @@ -122,6 +129,7 @@ class FemtoBoltManager(BaseDevice): # 预生成网格背景(避免每帧创建) self._grid_bg = None self._grid_size = (480, 640) # 默认尺寸 + self.background = None # 用于缓存等高线渲染的背景 # 自定义彩虹色 colormap(参考testfemtobolt.py) colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', @@ -147,108 +155,70 @@ class FemtoBoltManager(BaseDevice): self._current_gamma = self.gamma_value def _generate_contour_image_opencv(self, depth): - """优化的等高线图像生成(增强梯度变化清晰度)""" + """改进版 OpenCV 等高线渲染,梯度平滑、局部对比增强""" try: - # 深度数据过滤(与原始函数完全一致) - depth_filtered = depth.copy() - depth_filtered[depth_filtered > 1100] = 0 - depth_filtered[depth_filtered < 500] = 0 - - # 创建输出图像 + # 初始化 depth_filtered 缓冲区 + if self._depth_filtered is None or self._depth_filtered.shape != depth.shape: + self._depth_filtered = np.zeros_like(depth, dtype=np.uint16) + + np.copyto(self._depth_filtered, depth) # 直接覆盖,不生成新数组 + depth_filtered = self._depth_filtered + depth_filtered[depth_filtered > self.depth_range_max] = 0 + depth_filtered[depth_filtered < self.depth_range_min] = 0 height, width = depth_filtered.shape - - # 背景图(与原始函数一致:灰色背景,alpha=0.3效果) - background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3)) # 模拟灰色背景alpha混合 - output = np.ones((height, width, 3), dtype=np.uint8) * background_gray - - # 绘制白色网格线(与原始函数grid效果一致) - grid_spacing = max(height // 20, width // 20, 10) # 自适应网格间距 - for x in range(0, width, grid_spacing): - cv2.line(output, (x, 0), (x, height-1), (255, 255, 255), 1) - for y in range(0, height, grid_spacing): - cv2.line(output, (0, y), (width-1, y), (255, 255, 255), 1) - - # 使用masked数据(与原始函数np.ma.masked_equal逻辑一致) + + # 背景缓存 + if self.background is None or self.background.shape[:2] != (height, width): + background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3)) + self.background = np.ones((height, width, 3), dtype=np.uint8) * background_gray + grid_spacing = max(height // 20, width // 20, 10) + for x in range(0, width, grid_spacing): + cv2.line(self.background, (x, 0), (x, height-1), (255, 255, 255), 1) + for y in range(0, height, grid_spacing): + cv2.line(self.background, (0, y), (width-1, y), (255, 255, 255), 1) + + # 初始化输出缓存和模糊缓存 + self.output_buffer = np.empty_like(self.background) + self._blur_buffer = np.empty_like(self.background) + + # 复用输出缓存,避免 copy() + np.copyto(self.output_buffer, self.background) + output = self.output_buffer valid_mask = depth_filtered > 0 + if np.any(valid_mask): - # 将深度值映射到500-1100范围(与原始函数vmin=500, vmax=1100一致) - depth_for_contour = depth_filtered.copy().astype(np.float32) - depth_for_contour[~valid_mask] = np.nan # 无效区域设为NaN - - # 增加等高线层级数量以获得更细腻的梯度变化(从100增加到200) - levels = np.linspace(500, 1100, 201) # 200个等高线层级,提升梯度细腻度 - - # 创建等高线边界增强图像 - contour_edges = np.zeros((height, width), dtype=np.uint8) - - # 为每个像素分配等高线层级 - for i in range(len(levels) - 1): - level_min = levels[i] - level_max = levels[i + 1] - - # 创建当前层级的掩码 - level_mask = (depth_filtered >= level_min) & (depth_filtered < level_max) - - if np.any(level_mask): - # 增强颜色映射算法 - 使用非线性映射增强对比度 - color_val = (level_min - 500) / (1100 - 500) - color_val = np.clip(color_val, 0, 1) - - # 应用Gamma校正增强对比度(gamma=0.8增强中间色调) - color_val_enhanced = np.power(color_val, 0.8) - - # 应用自定义colormap - color = self.custom_cmap(color_val_enhanced)[:3] - color_bgr = (np.array(color) * 255).astype(np.uint8) - - # 赋值颜色(BGR格式) - output[level_mask, 0] = color_bgr[2] # B - output[level_mask, 1] = color_bgr[1] # G - output[level_mask, 2] = color_bgr[0] # R - - # 检测等高线边界(每10个层级检测一次主要等高线) - if i % 10 == 0: - # 使用形态学操作检测边界 - kernel = np.ones((3, 3), np.uint8) - dilated = cv2.dilate(level_mask.astype(np.uint8), kernel, iterations=1) - eroded = cv2.erode(level_mask.astype(np.uint8), kernel, iterations=1) - edge = dilated - eroded - contour_edges = cv2.bitwise_or(contour_edges, edge) - - # 增强等高线边界 - if np.any(contour_edges): - # 对等高线边界进行轻微扩展 - kernel = np.ones((2, 2), np.uint8) - contour_edges = cv2.dilate(contour_edges, kernel, iterations=1) - - # 在等高线边界处增强对比度 - edge_mask = contour_edges > 0 - if np.any(edge_mask): - # 增强边界处的颜色对比度 - for c in range(3): - channel = output[:, :, c].astype(np.float32) - # 对边界像素应用对比度增强 - channel[edge_mask] = np.clip(channel[edge_mask] * 1.2, 0, 255) - output[:, :, c] = channel.astype(np.uint8) + # 连续归一化深度值 + norm_depth = np.zeros_like(depth_filtered, dtype=np.float32) + norm_depth[valid_mask] = (depth_filtered[valid_mask] - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) + norm_depth = np.clip(norm_depth, 0, 1) ** 0.8 # Gamma增强 + + # 使用 colormap 映射 + cmap_colors = (self.custom_cmap(norm_depth)[..., :3] * 255).astype(np.uint8) + output[valid_mask] = cmap_colors[valid_mask] + + # Sobel 边界检测 + cv2.magnitude 替换 np.hypot + depth_uint8 = (norm_depth * 255).astype(np.uint8) + gx = cv2.Sobel(depth_uint8, cv2.CV_32F, 1, 0, ksize=3) + gy = cv2.Sobel(depth_uint8, cv2.CV_32F, 0, 1, ksize=3) + grad_mag = cv2.magnitude(gx, gy) + grad_mag = grad_mag.astype(np.uint8) + + # 自适应局部对比度增强(向量化) + edge_mask = grad_mag > 30 + output[edge_mask] = np.clip(output[edge_mask].astype(np.float32) * 1.5, 0, 255).astype(np.uint8) + + # 高斯平滑,复用 dst 缓冲区 + cv2.GaussianBlur(output, (3, 3), 0.3, dst=self._blur_buffer) - # 减少过度平滑处理以保持清晰度 - # 仅应用轻微的降噪处理,保持梯度边界清晰 - output = cv2.bilateralFilter(output, 3, 20, 20) # 减少滤波强度 - - # 裁剪宽度(与原始函数保持一致) - target_width = height // 2 - if width > target_width: - left = (width - target_width) // 2 - right = left + target_width - output = output[:, left:right] - - # 最终锐化处理增强细节 - # 使用USM锐化增强等高线细节 - gaussian = cv2.GaussianBlur(output, (0, 0), 1.0) - output = cv2.addWeighted(output, 1.5, gaussian, -0.5, 0) - output = np.clip(output, 0, 255).astype(np.uint8) - - return output + # 注意:这里不进行裁剪,而是返回完整图像 + # 推迟裁剪到显示阶段,与 testfemtobolt.py 保持一致 + # 原代码在这里进行了裁剪: + # target_width = height // 2 + # if width > target_width: + # left = (width - target_width) // 2 + # right = left + target_width + # output = output[:, left:right] + return self._blur_buffer except Exception as e: self.logger.error(f"优化等高线生成失败: {e}") @@ -274,8 +244,8 @@ class FemtoBoltManager(BaseDevice): self.ax.clear() # 深度数据过滤(与display_x.py完全一致) - depth[depth > 1100] = 0 - depth[depth < 500] = 0 + depth[depth > self.depth_range_max] = 0 + depth[depth < self.depth_range_min] = 0 # 背景图(与display_x.py完全一致) background = np.ones_like(depth) * 0.5 # 设定灰色背景 @@ -298,7 +268,7 @@ class FemtoBoltManager(BaseDevice): # 绘制等高线图并设置原点在上方(与display_x.py完全一致) import time start_time = time.perf_counter() - self.ax.contourf(depth, levels=100, cmap=self.custom_cmap, vmin=500, vmax=1100, origin='upper', zorder=2) + self.ax.contourf(depth, levels=100, cmap=self.custom_cmap, vmin=self.depth_range_min, vmax=self.depth_range_max, origin='upper', zorder=2) contourf_time = time.perf_counter() - start_time # self.logger.info(f"contourf绘制耗时: {contourf_time*1000:.2f}ms") @@ -458,9 +428,9 @@ class FemtoBoltManager(BaseDevice): # 配置FemtoBolt设备参数 self.femtobolt_config = self.pykinect.default_configuration - self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_2X2BINNED + self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED self.femtobolt_config.color_format = self.pykinect.K4A_IMAGE_FORMAT_COLOR_BGRA32 - self.femtobolt_config.color_resolution = self.pykinect.K4A_COLOR_RESOLUTION_720P + self.femtobolt_config.color_resolution = self.pykinect.K4A_COLOR_RESOLUTION_1080P self.femtobolt_config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15 self.femtobolt_config.synchronized_images_only = False @@ -657,8 +627,17 @@ class FemtoBoltManager(BaseDevice): # 如果等高线生成失败,跳过这一帧 continue + # 裁剪处理(推迟到显示阶段) + h, w = depth_colored_final.shape[:2] + target_width = h // 2 + display_image = depth_colored_final + if w > target_width: + left = (w - target_width) // 2 + right = left + target_width + display_image = depth_colored_final[:, left:right] + # 推送SocketIO - success, buffer = cv2.imencode('.jpg', depth_colored_final, self._encode_param) + success, buffer = cv2.imencode('.jpg', display_image, self._encode_param) if success and self._socketio: jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8') self._socketio.emit('femtobolt_frame', { diff --git a/backend/devices/imu_manager.py b/backend/devices/imu_manager.py index b597bb57..042a063d 100644 --- a/backend/devices/imu_manager.py +++ b/backend/devices/imu_manager.py @@ -15,6 +15,7 @@ import logging from collections import deque import struct from datetime import datetime +import asyncio try: from .base_device import BaseDevice @@ -66,7 +67,6 @@ class RealIMUDevice: self.calibration_data = calibration if 'head_pose_offset' in calibration: self.head_pose_offset = calibration['head_pose_offset'] - logger.debug(f'应用IMU校准数据: {self.head_pose_offset}') def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量""" if not raw_data or 'head_pose' not in raw_data: @@ -246,11 +246,257 @@ class MockIMUDevice: }, 'timestamp': datetime.now().isoformat() } - # 应用校准并返回 return self.apply_calibration(raw_data) if apply_calibration else raw_data +class BleIMUDevice: + """蓝牙IMU设备,基于bleak实现,解析逻辑参考tests/testblueimu.py""" + def __init__(self, mac_address: str): + self.mac_address = mac_address + self.loop = None + self.loop_thread = None + self.client = None + self.running = False + self._lock = threading.Lock() + self.disconnected_event = None + self.calibration_data = None + self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} + self.last_data = { + 'roll': 0.0, + 'pitch': 0.0, + 'yaw': 0.0, + 'temperature': 25.0 + } + self._connected = False + # GATT特征(参考测试脚本中的handle/short uuid) + self._notify_char = 0x0007 + self._write_char = 0x0005 + + def set_calibration(self, calibration: Dict[str, Any]): + self.calibration_data = calibration + if 'head_pose_offset' in calibration: + self.head_pose_offset = calibration['head_pose_offset'] + + def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: + if not raw_data or 'head_pose' not in raw_data: + return raw_data + calibrated_data = raw_data.copy() + head_pose = raw_data['head_pose'].copy() + # 与串口IMU保持一致:对 rotation 做归一化到 [-180, 180) + angle = head_pose['rotation'] - self.head_pose_offset['rotation'] + head_pose['rotation'] = ((angle + 180) % 360) - 180 + head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] + head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] + calibrated_data['head_pose'] = head_pose + return calibrated_data + + def start(self): + if self.running: + return + self.running = True + self.loop_thread = threading.Thread(target=self._run_loop, daemon=True) + self.loop_thread.start() + + def stop(self): + self.running = False + try: + if self.loop: + asyncio.run_coroutine_threadsafe(self._disconnect(), self.loop) + except Exception: + pass + + def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: + with self._lock: + raw = { + 'head_pose': { + 'rotation': self.last_data['yaw'], # rotation 对应航向角 + 'tilt': self.last_data['pitch'], # tilt 对应横滚 + 'pitch': self.last_data['roll'] # pitch 对应俯仰 + }, + 'temperature': self.last_data.get('temperature', 25.0), + 'timestamp': datetime.now().isoformat() + } + return self.apply_calibration(raw) if apply_calibration else raw + + def _run_loop(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + try: + self.loop.run_until_complete(self._connect_and_listen()) + except Exception as e: + logger.error(f'BLE IMU事件循环异常: {e}', exc_info=True) + finally: + try: + if not self.loop.is_closed(): + self.loop.stop() + self.loop.close() + except Exception: + pass + + async def _disconnect(self): + try: + if self.client and self.client.is_connected: + try: + await self.client.stop_notify(self._notify_char) + except Exception: + pass + await self.client.disconnect() + except Exception: + pass + + async def _connect_and_listen(self): + try: + from bleak import BleakClient, BleakScanner + from bleak.backends.characteristic import BleakGATTCharacteristic + except Exception as e: + logger.error(f"未安装bleak或导入失败: {e}") + self.running = False + return + + while self.running: + try: + logger.info(f"扫描并连接蓝牙IMU: {self.mac_address} ...") + device = await BleakScanner.find_device_by_address(self.mac_address, cb=dict(use_bdaddr=False)) + if device is None: + logger.warning(f"未找到设备: {self.mac_address}") + await asyncio.sleep(2.0) + continue + + self.disconnected_event = asyncio.Event() + + def _on_disconnected(_client): + logger.info("BLE IMU已断开连接") + self._connected = False + try: + self.disconnected_event.set() + except Exception: + pass + + self.client = BleakClient(device, disconnected_callback=_on_disconnected) + logger.info("正在连接BLE IMU...") + await self.client.connect() + self._connected = True + logger.info("BLE IMU连接成功") + + # 订阅通知 + await self.client.start_notify(self._notify_char, self._notification_handler) + + # 发送初始化指令(参考测试脚本) + try: + await self.client.write_gatt_char(self._write_char, bytes([0x29])) # 保持连接 + await asyncio.sleep(0.2) + await self.client.write_gatt_char(self._write_char, bytes([0x46])) # 高速模式 + await asyncio.sleep(0.2) + + isCompassOn = 0 + barometerFilter = 2 + Cmd_ReportTag = 0x0FFF + params = bytearray([0x00 for _ in range(11)]) + params[0] = 0x12 + params[1] = 5 + params[2] = 255 + params[3] = 0 + params[4] = ((barometerFilter & 3) << 1) | (isCompassOn & 1) + params[5] = 60 # 发送帧率 + params[6] = 1 + params[7] = 3 + params[8] = 5 + params[9] = Cmd_ReportTag & 0xff + params[10] = (Cmd_ReportTag >> 8) & 0xff + await self.client.write_gatt_char(self._write_char, params) + await asyncio.sleep(0.2) + + await self.client.write_gatt_char(self._write_char, bytes([0x19])) # 开始上报 + except Exception as e: + logger.warning(f"BLE IMU写入初始化指令失败: {e}") + + # 保持连接直到停止或断开 + while self.running and self.client and self.client.is_connected: + await asyncio.sleep(1.0) + + # 退出前尝试停止通知 + try: + await self.client.stop_notify(self._notify_char) + except Exception: + pass + try: + await self.client.disconnect() + except Exception: + pass + self._connected = False + + except Exception as e: + logger.error(f"BLE IMU连接/监听失败: {e}", exc_info=True) + self._connected = False + await asyncio.sleep(2.0) + + def _notification_handler(self, characteristic, data: bytearray): + """通知回调:解析IMU数据,更新欧拉角""" + try: + buf = data + if len(buf) < 3: + return + if buf[0] != 0x11: + return + + # 比例系数 + scaleAngle = 0.0054931640625 # 180/32768 + scaleTemperature = 0.01 + + ctl = (buf[2] << 8) | buf[1] + L = 7 # 数据偏移起点 + tmp_temperature = None + + # 跳过前面不关心的标志位,关注角度与温度 + if (ctl & 0x0001) != 0: + L += 6 # aX aY aZ + if (ctl & 0x0002) != 0: + L += 6 # AX AY AZ + if (ctl & 0x0004) != 0: + L += 6 # GX GY GZ + if (ctl & 0x0008) != 0: + L += 6 # CX CY CZ + if (ctl & 0x0010) != 0: + # 温度 + tmpX = np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleTemperature + L += 2 + # 气压与高度各3字节 + # 气压 + tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L]))) + if (tmpU32 & 0x800000) == 0x800000: + tmpU32 = (tmpU32 | 0xff000000) + tmpY = np.int32(tmpU32) * 0.0002384185791 + L += 3 + # 高度 + tmpU32 = np.uint32(((np.uint32(buf[L+2]) << 16) | (np.uint32(buf[L+1]) << 8) | np.uint32(buf[L]))) + if (tmpU32 & 0x800000) == 0x800000: + tmpU32 = (tmpU32 | 0xff000000) + tmpZ = np.int32(tmpU32) * 0.0010728836 + L += 3 + tmp_temperature = float(tmpX) + if (ctl & 0x0020) != 0: + L += 8 # 四元数 wxyz + if (ctl & 0x0040) != 0: + angleX = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 + angleY = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 + angleZ = float(np.short((np.short(buf[L+1]) << 8) | buf[L]) * scaleAngle); L += 2 + with self._lock: + # 映射:roll=X, pitch=Y, yaw=Z + self.last_data['roll'] = angleX*-1 + self.last_data['pitch'] = angleY*-1 + self.last_data['yaw'] = angleZ*-1 + if tmp_temperature is not None: + self.last_data['temperature'] = tmp_temperature + except Exception: + # 解析失败忽略 + pass + + @property + def connected(self) -> bool: + return self._connected + + class IMUManager(BaseDevice): """IMU传感器管理器""" @@ -274,8 +520,9 @@ class IMUManager(BaseDevice): # 设备配置 self.port = config.get('port', 'COM7') self.baudrate = config.get('baudrate', 9600) - self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock' + self.device_type = config.get('device_type', 'mock') # 'real' | 'mock' | 'ble' self.use_mock = config.get('use_mock', False) # 保持向后兼容 + self.mac_address = config.get('mac_address', '') # IMU设备实例 self.imu_device = None @@ -295,7 +542,7 @@ class IMUManager(BaseDevice): self.data_buffer = deque(maxlen=100) self.last_valid_data = None - self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}") + self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}, MAC: {self.mac_address}") def initialize(self) -> bool: """ @@ -308,13 +555,19 @@ class IMUManager(BaseDevice): self.logger.info(f"正在初始化IMU设备...") # 使用构造函数中已加载的配置,避免并发读取配置文件 - self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}") + self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}, mac={self.mac_address}") - # 根据配置选择真实设备或模拟设备 - # 优先使用device_type配置,如果没有则使用use_mock配置(向后兼容) - use_real_device = (self.device_type == 'real') or (not self.use_mock) - - if use_real_device: + # 根据配置选择设备类型 + if self.device_type == 'ble': + if not self.mac_address: + self.logger.error("IMU BLE设备未配置MAC地址") + self.is_connected = False + return False + self.logger.info(f"使用蓝牙IMU设备 - MAC: {self.mac_address}") + self.imu_device = BleIMUDevice(self.mac_address) + self.imu_device.start() + self.is_connected = True + elif self.device_type == 'real' or (self.device_type != 'mock' and not self.use_mock): self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}") self.imu_device = RealIMUDevice(self.port, self.baudrate) @@ -324,15 +577,17 @@ class IMUManager(BaseDevice): self.is_connected = False self.imu_device = None return False + self.is_connected = True else: self.logger.info("使用模拟IMU设备") self.imu_device = MockIMUDevice() + self.is_connected = True - self.is_connected = True self._device_info.update({ 'port': self.port, 'baudrate': self.baudrate, - 'use_mock': self.use_mock + 'use_mock': self.use_mock, + 'mac_address': self.mac_address, }) self.logger.info("IMU初始化成功") @@ -462,6 +717,13 @@ class IMUManager(BaseDevice): if self.imu_thread and self.imu_thread.is_alive(): self.imu_thread.join(timeout=3.0) + # 停止BLE后台任务(如果是BLE设备) + try: + if isinstance(self.imu_device, BleIMUDevice): + self.imu_device.stop() + except Exception: + pass + self.logger.info("IMU数据流已停止") return True @@ -524,7 +786,8 @@ class IMUManager(BaseDevice): 'buffer_size': len(self.data_buffer), 'has_data': self.last_valid_data is not None, 'head_pose_offset': self.head_pose_offset, - 'device_type': 'mock' if self.use_mock else 'real' + 'device_type': self.device_type, + 'mac_address': self.mac_address }) return status @@ -609,6 +872,7 @@ class IMUManager(BaseDevice): self.baudrate = config.get('baudrate', 9600) self.device_type = config.get('device_type', 'mock') self.use_mock = config.get('use_mock', False) + self.mac_address = config.get('mac_address', '') # 更新数据缓存队列大小 buffer_size = config.get('buffer_size', 100) @@ -621,7 +885,7 @@ class IMUManager(BaseDevice): for data in current_data[-buffer_size:]: self.data_buffer.append(data) - self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}") + self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}, MAC: {self.mac_address}") return True except Exception as e: diff --git a/backend/devices/imu_manager_usb_bak.py b/backend/devices/imu_manager_usb_bak.py new file mode 100644 index 00000000..01c5ae50 --- /dev/null +++ b/backend/devices/imu_manager_usb_bak.py @@ -0,0 +1,649 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +IMU传感器管理器 +负责IMU传感器的连接、校准和头部姿态数据采集 +""" + +import serial +import threading +import time +import json +import numpy as np +from typing import Optional, Dict, Any, List, Tuple +import logging +from collections import deque +import struct +from datetime import datetime + +try: + from .base_device import BaseDevice + from .utils.socket_manager import SocketManager + from .utils.config_manager import ConfigManager +except ImportError: + from base_device import BaseDevice + from utils.socket_manager import SocketManager + from utils.config_manager import ConfigManager + +# 设置日志 +logger = logging.getLogger(__name__) + + +class RealIMUDevice: + """真实IMU设备,通过串口读取姿态数据""" + def __init__(self, port, baudrate): + self.port = port + self.baudrate = baudrate + self.ser = None + self.buffer = bytearray() + self.calibration_data = None + self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} + self.last_data = { + 'roll': 0.0, + 'pitch': 0.0, + 'yaw': 0.0, + 'temperature': 25.0 + } + logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}') + self._connect() + + def _connect(self): + try: + logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}') + self.ser = serial.Serial(self.port, self.baudrate, timeout=1) + if hasattr(self.ser, 'reset_input_buffer'): + try: + self.ser.reset_input_buffer() + logger.debug('已清空串口输入缓冲区') + except Exception as e: + logger.debug(f'重置串口输入缓冲区失败: {e}') + logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps') + except Exception as e: + # logger.error(f'IMU设备连接失败: {e}', exc_info=True) + self.ser = None + + def set_calibration(self, calibration: Dict[str, Any]): + self.calibration_data = calibration + if 'head_pose_offset' in calibration: + self.head_pose_offset = calibration['head_pose_offset'] + logger.debug(f'应用IMU校准数据: {self.head_pose_offset}') + def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: + """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量""" + if not raw_data or 'head_pose' not in raw_data: + return raw_data + + # 应用校准偏移 + calibrated_data = raw_data.copy() + head_pose = raw_data['head_pose'].copy() + angle=head_pose['rotation'] - self.head_pose_offset['rotation'] + # 减去基准值(零点偏移) + head_pose['rotation'] = ((angle + 180) % 360) - 180 + head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] + head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] + + calibrated_data['head_pose'] = head_pose + return calibrated_data + @staticmethod + def _checksum(data: bytes) -> int: + return sum(data[:-1]) & 0xFF + + def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]: + if len(data) != 11: + logger.debug(f'无效数据包长度: {len(data)}') + return None + if data[0] != 0x55: + logger.debug(f'错误的包头: 0x{data[0]:02X}') + return None + if self._checksum(data) != data[-1]: + logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}') + return None + packet_type = data[1] + vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)] + if packet_type == 0x53: # 姿态角,单位0.01° + pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数 + # 使用第一段代码里的比例系数 + k_angle = 180.0 + roll = -round(rxl / 32768.0 * k_angle,2) + pitch = -round(pitchl / 32768.0 * k_angle,2) + yaw = -round(yawl / 32768.0 * k_angle,2) + temp = temp / 100.0 + self.last_data = { + 'roll': roll, + 'pitch': pitch, + 'yaw': yaw, + 'temperature': temp + } + # print(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}') + return self.last_data + else: + # logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}') + return None + + def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: + if not self.ser or not getattr(self.ser, 'is_open', False): + # logger.warning('IMU串口未连接,尝试重新连接...') + self._connect() + return { + 'head_pose': { + 'rotation': self.last_data['yaw'], + 'tilt': self.last_data['roll'], + 'pitch': self.last_data['pitch'] + }, + 'temperature': self.last_data['temperature'], + 'timestamp': datetime.now().isoformat() + } + try: + bytes_waiting = self.ser.in_waiting + if bytes_waiting: + # logger.debug(f'串口缓冲区待读字节: {bytes_waiting}') + chunk = self.ser.read(bytes_waiting) + # logger.debug(f'读取到字节: {len(chunk)}') + self.buffer.extend(chunk) + while len(self.buffer) >= 11: + if self.buffer[0] != 0x55: + dropped = self.buffer.pop(0) + logger.debug(f'丢弃无效字节: 0x{dropped:02X}') + continue + packet = bytes(self.buffer[:11]) + parsed = self._parse_packet(packet) + del self.buffer[:11] + if parsed is not None: + raw = { + 'head_pose': { + 'rotation': parsed['yaw'], # rotation = roll + 'tilt': parsed['roll'], # tilt = yaw + 'pitch': parsed['pitch'] # pitch = pitch + }, + 'temperature': parsed['temperature'], + 'timestamp': datetime.now().isoformat() + } + # logger.debug(f'映射后的头部姿态: {raw}') + return self.apply_calibration(raw) if apply_calibration else raw + raw = { + 'head_pose': { + 'rotation': self.last_data['yaw'], + 'tilt': self.last_data['roll'], + 'pitch': self.last_data['pitch'] + }, + 'temperature': self.last_data['temperature'], + 'timestamp': datetime.now().isoformat() + } + return self.apply_calibration(raw) if apply_calibration else raw + except Exception as e: + logger.error(f'IMU数据读取异常: {e}', exc_info=True) + raw = { + 'head_pose': { + 'rotation': self.last_data['yaw'], + 'tilt': self.last_data['roll'], + 'pitch': self.last_data['pitch'] + }, + 'temperature': self.last_data['temperature'], + 'timestamp': datetime.now().isoformat() + } + return self.apply_calibration(raw) if apply_calibration else raw + + def __del__(self): + try: + if self.ser and getattr(self.ser, 'is_open', False): + self.ser.close() + logger.info('IMU设备串口已关闭') + except Exception: + pass + + +class MockIMUDevice: + """模拟IMU设备""" + + def __init__(self): + self.noise_level = 0.1 + self.calibration_data = None # 校准数据 + self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} # 头部姿态零点偏移 + + def set_calibration(self, calibration: Dict[str, Any]): + """设置校准数据""" + self.calibration_data = calibration + if 'head_pose_offset' in calibration: + self.head_pose_offset = calibration['head_pose_offset'] + + def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: + """应用校准:将当前姿态减去初始偏移,得到相对姿态""" + if not raw_data or 'head_pose' not in raw_data: + return raw_data + + calibrated_data = raw_data.copy() + head_pose = raw_data['head_pose'].copy() + head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation'] + head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] + head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] + calibrated_data['head_pose'] = head_pose + return calibrated_data + + def read_data(self, apply_calibration: bool = True) -> Dict[str, Any]: + """读取IMU数据""" + # 生成头部姿态角度数据,角度范围(-90°, +90°) + # 使用正弦波模拟自然的头部运动,添加随机噪声 + import time + current_time = time.time() + + # 旋转角(左旋为负,右旋为正) + rotation_angle = 30 * np.sin(current_time * 0.5) + np.random.normal(0, self.noise_level * 5) + rotation_angle = np.clip(rotation_angle, -90, 90) + + # 倾斜角(左倾为负,右倾为正) + tilt_angle = 20 * np.sin(current_time * 0.3 + np.pi/4) + np.random.normal(0, self.noise_level * 5) + tilt_angle = np.clip(tilt_angle, -90, 90) + + # 俯仰角(俯角为负,仰角为正) + pitch_angle = 15 * np.sin(current_time * 0.7 + np.pi/2) + np.random.normal(0, self.noise_level * 5) + pitch_angle = np.clip(pitch_angle, -90, 90) + + # 生成原始数据 + raw_data = { + 'head_pose': { + 'rotation': rotation_angle, # 旋转角:左旋(-), 右旋(+) + 'tilt': tilt_angle, # 倾斜角:左倾(-), 右倾(+) + 'pitch': pitch_angle # 俯仰角:俯角(-), 仰角(+) + }, + 'timestamp': datetime.now().isoformat() + } + # 应用校准并返回 + return self.apply_calibration(raw_data) if apply_calibration else raw_data + + +class IMUManager(BaseDevice): + """IMU传感器管理器""" + + def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): + """ + 初始化IMU管理器 + + Args: + socketio: SocketIO实例 + config_manager: 配置管理器实例 + """ + # 配置管理 + self.config_manager = config_manager or ConfigManager() + config = self.config_manager.get_device_config('imu') + + super().__init__("imu", config) + + # 保存socketio实例 + self._socketio = socketio + + # 设备配置 + self.port = config.get('port', 'COM7') + self.baudrate = config.get('baudrate', 9600) + self.device_type = config.get('device_type', 'mock') # 'real' 或 'mock' + self.use_mock = config.get('use_mock', False) # 保持向后兼容 + # IMU设备实例 + self.imu_device = None + + # 推流相关 + self.imu_streaming = False + self.imu_thread = None + + # 统计信息 + self.data_count = 0 + self.error_count = 0 + + # 校准相关 + self.is_calibrated = False + self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} + + # 数据缓存 + self.data_buffer = deque(maxlen=100) + self.last_valid_data = None + + self.logger.info(f"IMU管理器初始化完成 - 端口: {self.port}, 设备类型: {self.device_type}") + + def initialize(self) -> bool: + """ + 初始化IMU设备 + + Returns: + bool: 初始化是否成功 + """ + try: + self.logger.info(f"正在初始化IMU设备...") + + # 使用构造函数中已加载的配置,避免并发读取配置文件 + self.logger.info(f"使用已加载配置: port={self.port}, baudrate={self.baudrate}, device_type={self.device_type}") + + # 根据配置选择真实设备或模拟设备 + # 优先使用device_type配置,如果没有则使用use_mock配置(向后兼容) + use_real_device = (self.device_type == 'real') or (not self.use_mock) + + if use_real_device: + self.logger.info(f"使用真实IMU设备 - 端口: {self.port}, 波特率: {self.baudrate}") + self.imu_device = RealIMUDevice(self.port, self.baudrate) + + # 检查真实设备是否连接成功 + if self.imu_device.ser is None: + self.logger.error(f"IMU设备连接失败: 无法打开串口 {self.port}") + self.is_connected = False + self.imu_device = None + return False + else: + self.logger.info("使用模拟IMU设备") + self.imu_device = MockIMUDevice() + + self.is_connected = True + self._device_info.update({ + 'port': self.port, + 'baudrate': self.baudrate, + 'use_mock': self.use_mock + }) + + self.logger.info("IMU初始化成功") + return True + + except Exception as e: + self.logger.error(f"IMU初始化失败: {e}") + self.is_connected = False + self.imu_device = None + return False + + def _quick_calibrate_imu(self) -> Dict[str, Any]: + """ + 快速IMU零点校准(以当前姿态为基准) + + Returns: + Dict[str, Any]: 校准结果 + """ + try: + if not self.imu_device: + return {'status': 'error', 'error': 'IMU设备未初始化'} + + self.logger.info('开始IMU快速零点校准...') + + # 直接读取一次原始数据作为校准偏移量 + raw_data = self.imu_device.read_data(apply_calibration=False) + if not raw_data or 'head_pose' not in raw_data: + return {'status': 'error', 'error': '无法读取IMU原始数据'} + + # 使用当前姿态作为零点偏移 + self.head_pose_offset = { + 'rotation': raw_data['head_pose']['rotation'], + 'tilt': raw_data['head_pose']['tilt'], + 'pitch': raw_data['head_pose']['pitch'] + } + + # 应用校准到设备 + calibration_data = {'head_pose_offset': self.head_pose_offset} + self.imu_device.set_calibration(calibration_data) + + self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}') + return { + 'status': 'success', + 'head_pose_offset': self.head_pose_offset + } + + except Exception as e: + self.logger.error(f'IMU快速校准失败: {e}') + return {'status': 'error', 'error': str(e)} + + def calibrate(self) -> bool: + """ + 校准IMU传感器 + + Returns: + bool: 校准是否成功 + """ + try: + if not self.is_connected or not self.imu_device: + if not self.initialize(): + self.logger.error("IMU设备未连接") + return False + + # 使用快速校准方法 + result = self._quick_calibrate_imu() + + if result['status'] == 'success': + self.is_calibrated = True + self.logger.info("IMU校准成功") + return True + else: + self.logger.error(f"IMU校准失败: {result.get('error', '未知错误')}") + return False + + except Exception as e: + self.logger.error(f"IMU校准失败: {e}") + return False + + + + def start_streaming(self) -> bool: + """ + 开始IMU数据流 + + Args: + socketio: SocketIO实例,用于数据推送 + + Returns: + bool: 启动是否成功 + """ + try: + if not self.is_connected or not self.imu_device: + if not self.initialize(): + return False + + if self.imu_streaming: + self.logger.warning("IMU数据流已在运行") + return True + + # 启动前进行快速校准 + if not self.is_calibrated: + self.logger.info("启动前进行快速零点校准...") + self._quick_calibrate_imu() + + self.imu_streaming = True + self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) + self.imu_thread.start() + + self.logger.info("IMU数据流启动成功") + return True + + except Exception as e: + self.logger.error(f"IMU数据流启动失败: {e}") + self.imu_streaming = False + return False + + def stop_streaming(self) -> bool: + """ + 停止IMU数据流 + + Returns: + bool: 停止是否成功 + """ + try: + self.imu_streaming = False + + if self.imu_thread and self.imu_thread.is_alive(): + self.imu_thread.join(timeout=3.0) + + self.logger.info("IMU数据流已停止") + return True + + except Exception as e: + self.logger.error(f"停止IMU数据流失败: {e}") + return False + + def _imu_streaming_thread(self): + """ + IMU数据流工作线程 + """ + self.logger.info("IMU数据流工作线程启动") + + while self.imu_streaming: + try: + if self.imu_device: + # 读取IMU数据 + data = self.imu_device.read_data(apply_calibration=True) + + if data: + # 缓存数据 + # self.data_buffer.append(data) + # self.last_valid_data = data + + # 发送数据到前端 + if self._socketio: + self._socketio.emit('imu_data', data, namespace='/devices') + + # 更新统计 + self.data_count += 1 + else: + self.error_count += 1 + + time.sleep(0.02) # 50Hz采样率 + + except Exception as e: + self.logger.error(f"IMU数据流处理异常: {e}") + self.error_count += 1 + time.sleep(0.1) + + self.logger.info("IMU数据流工作线程结束") + + + + def get_status(self) -> Dict[str, Any]: + """ + 获取设备状态 + + Returns: + Dict[str, Any]: 设备状态信息 + """ + status = super().get_status() + status.update({ + 'port': self.port, + 'baudrate': self.baudrate, + 'is_streaming': self.imu_streaming, + 'is_calibrated': self.is_calibrated, + 'data_count': self.data_count, + 'error_count': self.error_count, + 'buffer_size': len(self.data_buffer), + 'has_data': self.last_valid_data is not None, + 'head_pose_offset': self.head_pose_offset, + 'device_type': 'mock' if self.use_mock else 'real' + }) + return status + + def get_latest_data(self) -> Optional[Dict[str, float]]: + """ + 获取最新的IMU数据 + + Returns: + Optional[Dict[str, float]]: 最新数据,无数据返回None + """ + return self.last_valid_data.copy() if self.last_valid_data else None + + def collect_head_pose_data(self, duration: int = 10) -> List[Dict[str, Any]]: + """ + 收集头部姿态数据 + + Args: + duration: 收集时长(秒) + + Returns: + List[Dict[str, Any]]: 收集到的数据列表 + """ + collected_data = [] + + if not self.is_connected or not self.imu_device: + self.logger.error("IMU设备未连接") + return collected_data + + self.logger.info(f"开始收集头部姿态数据,时长: {duration}秒") + + start_time = time.time() + while time.time() - start_time < duration: + try: + data = self.imu_device.read_data(apply_calibration=True) + if data: + # 添加时间戳 + data['timestamp'] = time.time() + collected_data.append(data) + + time.sleep(0.02) # 50Hz采样率 + + except Exception as e: + self.logger.error(f"数据收集异常: {e}") + break + + self.logger.info(f"头部姿态数据收集完成,共收集 {len(collected_data)} 个样本") + return collected_data + + def disconnect(self): + """ + 断开IMU设备连接 + """ + try: + self.stop_streaming() + + if self.imu_device: + self.imu_device = None + + self.is_connected = False + self.logger.info("IMU设备已断开连接") + + except Exception as e: + self.logger.error(f"断开IMU设备连接失败: {e}") + + def reload_config(self) -> bool: + """ + 重新加载设备配置 + + Returns: + bool: 重新加载是否成功 + """ + try: + self.logger.info("正在重新加载IMU配置...") + + + + # 获取最新配置 + config = self.config_manager.get_device_config('imu') + + # 更新配置属性 + self.port = config.get('port', 'COM7') + self.baudrate = config.get('baudrate', 9600) + self.device_type = config.get('device_type', 'mock') + self.use_mock = config.get('use_mock', False) + + # 更新数据缓存队列大小 + buffer_size = config.get('buffer_size', 100) + if buffer_size != self.data_buffer.maxlen: + # 保存当前数据 + current_data = list(self.data_buffer) + # 创建新缓冲区 + self.data_buffer = deque(maxlen=buffer_size) + # 恢复数据(保留最新的数据) + for data in current_data[-buffer_size:]: + self.data_buffer.append(data) + + self.logger.info(f"IMU配置重新加载成功 - 端口: {self.port}, 波特率: {self.baudrate}, 设备类型: {self.device_type}") + return True + + except Exception as e: + self.logger.error(f"重新加载IMU配置失败: {e}") + return False + + def cleanup(self): + """ + 清理资源 + """ + try: + self.disconnect() + + # 清理缓冲区 + self.data_buffer.clear() + + # 重置状态 + self.is_calibrated = False + self.last_valid_data = None + self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0} + + super().cleanup() + self.logger.info("IMU资源清理完成") + + except Exception as e: + self.logger.error(f"清理IMU资源失败: {e}") \ No newline at end of file diff --git a/backend/devices/utils/config_manager.py b/backend/devices/utils/config_manager.py index 9760e75a..8b5c9f19 100644 --- a/backend/devices/utils/config_manager.py +++ b/backend/devices/utils/config_manager.py @@ -206,6 +206,7 @@ class ConfigManager: 'baudrate': self.config.getint('DEVICES', 'imu_baudrate', fallback=9600), 'timeout': self.config.getfloat('DEVICES', 'imu_timeout', fallback=1.0), 'calibration_samples': self.config.getint('DEVICES', 'imu_calibration_samples', fallback=100), + 'mac_address': self.config.get('DEVICES', 'imu_mac_address', fallback=''), } def _get_pressure_config(self) -> Dict[str, Any]: @@ -352,9 +353,10 @@ class ConfigManager: Args: config_data: IMU配置数据 { - 'device_type': 'real' | 'mock', + 'device_type': 'real' | 'mock' | 'ble', 'port': 'COM6', - 'baudrate': 9600 + 'baudrate': 9600, + 'mac_address': 'ef:3c:1a:0a:fe:02' } Returns: @@ -368,6 +370,8 @@ class ConfigManager: self.set_config_value('DEVICES', 'imu_port', config_data['port']) if 'baudrate' in config_data: self.set_config_value('DEVICES', 'imu_baudrate', str(config_data['baudrate'])) + if 'mac_address' in config_data: + self.set_config_value('DEVICES', 'imu_mac_address', config_data['mac_address']) # 保存配置 self.save_config()