diff --git a/backend/devices/camera_manager.py b/backend/devices/camera_manager.py index a658ba51..e6fded32 100644 --- a/backend/devices/camera_manager.py +++ b/backend/devices/camera_manager.py @@ -53,10 +53,14 @@ class CameraManager(BaseDevice): self.fps = config.get('fps', 30) self.buffer_size = config.get('buffer_size', 1) self.fourcc = config.get('fourcc', 'MJPG') + + # 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输) + self._tx_max_width = int(config.get('tx_max_width', 640)) # 流控制 self.streaming_thread = None - self.frame_cache = deque(maxlen=10) + # 减小缓存长度,保留最近2帧即可,避免累计占用 + self.frame_cache = deque(maxlen=int(config.get('frame_cache_len', 2))) self.last_frame = None self.frame_count = 0 self.dropped_frames = 0 @@ -77,6 +81,12 @@ class CameraManager(BaseDevice): 'actual_fps': 0, 'dropped_frames': 0 } + + # OpenCV优化开关 + try: + cv2.setUseOptimized(True) + except Exception: + pass self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}") @@ -90,7 +100,7 @@ class CameraManager(BaseDevice): try: self.logger.info(f"正在初始化相机设备 {self.device_index}...") - # 尝试多个后端 + # 尝试多个后端(Windows下优先MSMF/DShow) backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY] for backend in backends: @@ -127,7 +137,10 @@ class CameraManager(BaseDevice): self.logger.error(f"相机初始化失败: {e}") self.is_connected = False if self.cap: - self.cap.release() + try: + self.cap.release() + except Exception: + pass self.cap = None return False @@ -151,8 +164,11 @@ class CameraManager(BaseDevice): # 设置帧率 self.cap.set(cv2.CAP_PROP_FPS, self.fps) - # 设置缓冲区大小 - self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size) + # 设置缓冲区大小(部分后端不生效) + try: + self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size) + except Exception: + pass # 获取实际设置的值 actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) @@ -199,7 +215,7 @@ class CameraManager(BaseDevice): # 读取几帧来稳定相机 for i in range(5): - ret, frame = self.cap.read() + ret, _ = self.cap.read() if not ret: self.logger.warning(f"校准时读取第{i+1}帧失败") @@ -253,7 +269,9 @@ class CameraManager(BaseDevice): self.is_streaming = False if self.streaming_thread and self.streaming_thread.is_alive(): + # 等待线程退出 self.streaming_thread.join(timeout=3.0) + self.streaming_thread = None self.logger.info("相机流已停止") return True @@ -269,8 +287,13 @@ class CameraManager(BaseDevice): self.logger.info("相机流工作线程启动") reconnect_attempts = 0 - + + # 基于目标FPS的简单节拍器,防止无上限地读取/编码/发送导致对象堆积 + frame_interval = 1.0 / max(self.fps, 1) + next_tick = time.time() + while self.is_streaming: + loop_start = time.time() try: if not self.cap or not self.cap.isOpened(): if reconnect_attempts < self.max_reconnect_attempts: @@ -292,33 +315,51 @@ class CameraManager(BaseDevice): self.dropped_frames += 1 if self.dropped_frames > 10: self.logger.warning(f"连续丢帧过多: {self.dropped_frames}") + # 仅在异常情况下触发一次GC,避免高频强制GC + try: + gc.collect() + except Exception: + pass self.dropped_frames = 0 - time.sleep(0.01) + # 防止空转占满CPU + time.sleep(0.005) continue # 重置丢帧计数 self.dropped_frames = 0 - # 处理帧 + # 处理帧(降采样以优化传输负载) processed_frame = self._process_frame(frame) - - # 缓存帧 - self.last_frame = processed_frame.copy() - self.frame_cache.append(processed_frame) + + # 缓存帧(不复制,减少内存占用) + # self.last_frame = processed_frame + # self.frame_cache.append(processed_frame) # 发送帧数据 self._send_frame_data(processed_frame) # 更新统计 self._update_statistics() - - # 内存管理 - if self.frame_count % 30 == 0: - gc.collect() - + + # 主动释放局部引用,帮助GC更快识别可回收对象 + del frame + # 注意:processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身 + + # 限速:保证不超过目标FPS,减小发送端积压 + now = time.time() + # 下一个tick基于固定间隔前移,避免误差累积 + next_tick += frame_interval + sleep_time = next_tick - now + if sleep_time > 0: + time.sleep(sleep_time) + else: + # 如果处理耗时超过间隔,纠正节拍器,避免持续为负 + next_tick = now + except Exception as e: self.logger.error(f"相机流处理异常: {e}") - time.sleep(0.1) + # 小退避,避免异常情况下空转 + time.sleep(0.02) self.logger.info("相机流工作线程结束") @@ -333,15 +374,14 @@ class CameraManager(BaseDevice): np.ndarray: 处理后的帧 """ try: - # 调整大小以优化传输 - if frame.shape[1] > 640: - scale_factor = 640 / frame.shape[1] - new_width = 640 - new_height = int(frame.shape[0] * scale_factor) - frame = cv2.resize(frame, (new_width, new_height)) - + # 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU) + h, w = frame.shape[:2] + if w > self._tx_max_width: + scale = self._tx_max_width / float(w) + new_w = self._tx_max_width + new_h = int(h * scale) + frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) return frame - except Exception as e: self.logger.error(f"处理帧失败: {e}") return frame @@ -353,13 +393,21 @@ class CameraManager(BaseDevice): Args: frame: 视频帧 """ + # 将临时对象局部化,并在 finally 中删除引用,加速回收 + buffer = None + frame_bytes = None + frame_data = None try: # 编码为JPEG encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] - _, buffer = cv2.imencode('.jpg', frame, encode_param) + ok, buffer = cv2.imencode('.jpg', frame, encode_param) + if not ok or buffer is None: + self.logger.warning("帧JPEG编码失败") + return - # 转换为base64 - frame_data = base64.b64encode(buffer).decode('utf-8') + # 转换为bytes再做base64,减少中间numpy对象的长生命周期 + frame_bytes = buffer.tobytes() + frame_data = base64.b64encode(frame_bytes).decode('utf-8') # 发送数据 data = { @@ -374,6 +422,11 @@ class CameraManager(BaseDevice): except Exception as e: self.logger.error(f"发送帧数据失败: {e}") + finally: + # 显式删除临时大对象的引用,避免在高吞吐下堆积 + del buffer + del frame_bytes + del frame_data def _update_statistics(self): """ @@ -405,7 +458,11 @@ class CameraManager(BaseDevice): """ try: if self.cap: - self.cap.release() + try: + self.cap.release() + except Exception: + pass + self.cap = None time.sleep(1.0) # 等待设备释放 @@ -471,6 +528,7 @@ class CameraManager(BaseDevice): Returns: Optional[np.ndarray]: 最新帧,无帧返回None """ + # 对外提供拷贝,内部保持原对象,避免重复持有 return self.last_frame.copy() if self.last_frame is not None else None def disconnect(self): @@ -481,7 +539,10 @@ class CameraManager(BaseDevice): self.stop_streaming() if self.cap: - self.cap.release() + try: + self.cap.release() + except Exception: + pass self.cap = None self.is_connected = False @@ -498,7 +559,10 @@ class CameraManager(BaseDevice): self.stop_streaming() if self.cap: - self.cap.release() + try: + self.cap.release() + except Exception: + pass self.cap = None self.frame_cache.clear() @@ -508,4 +572,4 @@ class CameraManager(BaseDevice): self.logger.info("相机资源清理完成") except Exception as e: - self.logger.error(f"清理相机资源失败: {e}") \ No newline at end of file + self.logger.error(f"清理相机资源失败: {e}") diff --git a/backend/devices/femtobolt_manager.py b/backend/devices/femtobolt_manager.py index 0da8c9ea..1b08aed7 100644 --- a/backend/devices/femtobolt_manager.py +++ b/backend/devices/femtobolt_manager.py @@ -98,8 +98,31 @@ class FemtoBoltManager(BaseDevice): self.max_reconnect_attempts = 3 self.reconnect_delay = 3.0 + # 发送频率控制(内存优化) + self.send_fps = self.config.get('send_fps', 20) # 默认20FPS发送 + self._min_send_interval = 1.0 / self.send_fps if self.send_fps > 0 else 0.05 + self._last_send_time = 0 + + # 编码参数缓存(避免每帧创建数组) + self._encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), int(self.config.get('jpeg_quality', 80))] + + # 预计算伽马LUT(避免每帧计算) + self._gamma_lut = None + self._current_gamma = None + self._update_gamma_lut() + + # 预生成网格背景(避免每帧创建) + self._grid_bg = None + self._grid_size = (480, 640) # 默认尺寸 + self.logger.info("FemtoBolt管理器初始化完成") + def _update_gamma_lut(self): + """更新伽马校正查找表""" + if self._current_gamma != self.gamma_value: + self._gamma_lut = np.array([((i / 255.0) ** self.gamma_value) * 255 for i in range(256)]).astype("uint8") + self._current_gamma = self.gamma_value + def initialize(self) -> bool: """ 初始化FemtoBolt设备 @@ -391,7 +414,7 @@ class FemtoBoltManager(BaseDevice): capture.release() else: - self.logger.warning(f"校准时无法获取第{i+1}帧") + self.logger.warning(f"校时时无法获取第{i+1}帧") time.sleep(0.1) @@ -464,162 +487,160 @@ class FemtoBoltManager(BaseDevice): try: while self.is_streaming: + # 发送频率限制 + now = time.time() + if now - self._last_send_time < self._min_send_interval: + time.sleep(0.001) + continue + if self.device_handle and self._socketio: try: capture = self.device_handle.update() if capture is not None: - ret, depth_image = capture.get_depth_image() - if ret and depth_image is not None: - - # 使用与device_manager.py相同的处理逻辑 - depth_image = depth_image.copy() - - # === 生成灰色背景 + 白色网格 === - rows, cols = depth_image.shape[:2] - background = np.ones((rows, cols, 3), dtype=np.uint8) * 128 - cell_size = 50 - grid_color = (255, 255, 255) - grid_bg = np.zeros_like(background) - for x in range(0, cols, cell_size): - cv2.line(grid_bg, (x, 0), (x, rows), grid_color, 1) - for y in range(0, rows, cell_size): - cv2.line(grid_bg, (0, y), (cols, y), grid_color, 1) - mask_grid = (grid_bg.sum(axis=2) > 0) - background[mask_grid] = grid_bg[mask_grid] - - # === 处理深度图满足区间的部分 === - depth_clipped = depth_image.copy() - depth_clipped[depth_clipped < self.depth_range_min] = 0 - depth_clipped[depth_clipped > self.depth_range_max] = 0 - depth_normalized = np.clip(depth_clipped, self.depth_range_min, self.depth_range_max) - depth_normalized = ((depth_normalized - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8) - - # 对比度和伽马校正 - alpha, beta, gamma = 1.5, 0, 0.8 - depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=alpha, beta=beta) - lut = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8") - depth_normalized = cv2.LUT(depth_normalized, lut) - - # 伪彩色 - depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET) - - # 将有效深度覆盖到灰色背景上 - mask_valid = (depth_clipped > 0) - for c in range(3): - background[:, :, c][mask_valid] = depth_colored[:, :, c][mask_valid] - - depth_colored_final = background - - # 裁剪宽度 - height, width = depth_colored_final.shape[:2] - target_width = height // 2 - if width > target_width: - left = (width - target_width) // 2 - right = left + target_width - depth_colored_final = depth_colored_final[:, left:right] - - # 缓存图像 - self.last_depth_frame = depth_colored_final.copy() - self.depth_frame_cache.append(depth_colored_final.copy()) - - # 推送SocketIO - success, buffer = cv2.imencode('.jpg', depth_colored_final, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) - if success and self._socketio: - jpg_as_text = base64.b64encode(buffer).decode('utf-8') - # 发送到femtobolt命名空间,使用前端期望的事件名和数据格式 - self._socketio.emit('femtobolt_frame', { - 'depth_image': jpg_as_text, - 'frame_count': frame_count, - 'timestamp': time.time(), - 'fps': self.actual_fps, - 'device_id': self.device_id, - 'depth_range': { - 'min': self.depth_range_min, - 'max': self.depth_range_max - } - }, namespace='/devices') - frame_count += 1 + try: + ret, depth_image = capture.get_depth_image() + if ret and depth_image is not None: + # 确保二维数据 + if depth_image.ndim == 3 and depth_image.shape[2] == 1: + depth_image = depth_image[:, :, 0] - # 更新统计 - self._update_statistics() - else: - time.sleep(0.01) + rows, cols = depth_image.shape[:2] + # 生成或复用网格背景 + if (self._grid_bg is None) or (self._grid_size != (rows, cols)): + bg = np.ones((rows, cols, 3), dtype=np.uint8) * 128 + cell_size = 50 + grid_color = (255, 255, 255) + grid = np.zeros_like(bg) + for x in range(0, cols, cell_size): + cv2.line(grid, (x, 0), (x, rows), grid_color, 1) + for y in range(0, rows, cell_size): + cv2.line(grid, (0, y), (cols, y), grid_color, 1) + mask_grid = (grid.sum(axis=2) > 0) + bg[mask_grid] = grid[mask_grid] + self._grid_bg = bg + self._grid_size = (rows, cols) + + # 深度范围过滤 + 归一化 + depth_clipped = np.where( + (depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max), + depth_image, 0 + ) + if np.max(depth_clipped) > 0: + depth_normalized = ((depth_clipped - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8) + else: + depth_normalized = np.zeros((rows, cols), dtype=np.uint8) + + # 对比度与伽马 + depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=1.5, beta=0) + if self._gamma_lut is None or self._current_gamma != self.gamma_value: + self._update_gamma_lut() + depth_gamma = cv2.LUT(depth_normalized, self._gamma_lut) + + # 伪彩色 + depth_colored = cv2.applyColorMap(depth_gamma, cv2.COLORMAP_JET) + + # 合成到背景(避免copy,使用背景副本) + background = self._grid_bg.copy() + mask_valid = (depth_clipped > 0) + for c in range(3): + channel = background[:, :, c] + channel[mask_valid] = depth_colored[:, :, c][mask_valid] + depth_colored_final = background + + # 裁剪宽度 + height, width = depth_colored_final.shape[:2] + target_width = height // 2 + if width > target_width: + left = (width - target_width) // 2 + right = left + target_width + depth_colored_final = depth_colored_final[:, left:right] + + # 推送SocketIO + success, buffer = cv2.imencode('.jpg', depth_colored_final, self._encode_param) + if success and self._socketio: + jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8') + self._socketio.emit('femtobolt_frame', { + 'depth_image': jpg_as_text, + 'frame_count': frame_count, + 'timestamp': now, + 'fps': self.actual_fps, + 'device_id': self.device_id, + 'depth_range': { + 'min': self.depth_range_min, + 'max': self.depth_range_max + } + }, namespace='/devices') + frame_count += 1 + self._last_send_time = now + + # 更新统计 + self._update_statistics() + else: + time.sleep(0.005) + except Exception as e: + # 捕获处理过程中出现异常,记录并继续 + self.logger.error(f"FemtoBolt捕获处理错误: {e}") + finally: + # 无论处理成功与否,都应释放capture以回收内存:contentReference[oaicite:3]{index=3} + try: + if hasattr(capture, 'release'): + capture.release() + except Exception: + pass else: - time.sleep(0.01) + time.sleep(0.005) except Exception as e: self.logger.error(f'FemtoBolt帧推送失败: {e}') - time.sleep(0.1) + time.sleep(0.05) - time.sleep(1/30) # 30 FPS + # 降低空转CPU + time.sleep(0.001) except Exception as e: self.logger.error(f"FemtoBolt流处理异常: {e}") finally: self.is_streaming = False self.logger.info("FemtoBolt流工作线程结束") - + def _process_depth_image(self, depth_image) -> np.ndarray: """ 处理深度图像 - - Args: - depth_image: 原始深度图像 - - Returns: - np.ndarray: 处理后的深度图像 """ try: - # 确保输入是numpy数组 if not isinstance(depth_image, np.ndarray): self.logger.error(f"输入的深度图像不是numpy数组: {type(depth_image)}") return np.zeros((480, 640, 3), dtype=np.uint8) - # 深度范围过滤 mask = (depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max) filtered_depth = np.where(mask, depth_image, 0) - # 归一化到0-255 if np.max(filtered_depth) > 0: - normalized = ((filtered_depth - self.depth_range_min) / - (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8) + normalized = ((filtered_depth - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8) else: normalized = np.zeros_like(filtered_depth, dtype=np.uint8) - - # 对比度增强 + enhanced = cv2.convertScaleAbs(normalized, alpha=self.contrast_factor, beta=0) - # 伽马校正 - gamma_corrected = np.power(enhanced / 255.0, self.gamma_value) * 255 - gamma_corrected = gamma_corrected.astype(np.uint8) + if self._gamma_lut is None or self._current_gamma != self.gamma_value: + self._update_gamma_lut() + gamma_corrected = cv2.LUT(enhanced, self._gamma_lut) - # 伪彩色映射 if self.use_pseudo_color: colored = cv2.applyColorMap(gamma_corrected, cv2.COLORMAP_JET) else: colored = cv2.cvtColor(gamma_corrected, cv2.COLOR_GRAY2BGR) - - return colored + return colored except Exception as e: self.logger.error(f"处理深度图像失败: {e}") return np.zeros((480, 640, 3), dtype=np.uint8) - + def _send_depth_data(self, depth_image: np.ndarray, color_image: Optional[np.ndarray] = None): - """ - 发送深度数据 - - Args: - depth_image: 深度图像 - color_image: 彩色图像(可选) - """ try: - # 压缩深度图像 - encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85] - _, depth_buffer = cv2.imencode('.jpg', depth_image, encode_param) - depth_data = base64.b64encode(depth_buffer).decode('utf-8') + _, depth_buffer = cv2.imencode('.jpg', depth_image, self._encode_param) + depth_data = base64.b64encode(memoryview(depth_buffer).tobytes()).decode('utf-8') - # 准备发送数据 send_data = { 'timestamp': time.time(), 'frame_count': self.frame_count, @@ -633,18 +654,15 @@ class FemtoBoltManager(BaseDevice): 'last_update': time.strftime('%H:%M:%S') } - # 添加彩色图像(如果有) if color_image is not None: - _, color_buffer = cv2.imencode('.jpg', color_image, encode_param) - color_data = base64.b64encode(color_buffer).decode('utf-8') + _, color_buffer = cv2.imencode('.jpg', color_image, self._encode_param) + color_data = base64.b64encode(memoryview(color_buffer).tobytes()).decode('utf-8') send_data['color_image'] = color_data - - # 发送到SocketIO - self._socketio.emit('femtobolt_frame', send_data, namespace='/devices') + self._socketio.emit('femtobolt_frame', send_data, namespace='/devices') except Exception as e: self.logger.error(f"发送深度数据失败: {e}") - + def _update_statistics(self): """ 更新性能统计 @@ -848,4 +866,4 @@ class FemtoBoltManager(BaseDevice): self.logger.info("FemtoBolt资源清理完成") except Exception as e: - self.logger.error(f"清理FemtoBolt资源失败: {e}") \ No newline at end of file + self.logger.error(f"清理FemtoBolt资源失败: {e}") diff --git a/backend/devices/pressure_manager.py b/backend/devices/pressure_manager.py index 713fb3cc..12551bc2 100644 --- a/backend/devices/pressure_manager.py +++ b/backend/devices/pressure_manager.py @@ -45,6 +45,20 @@ except ImportError: logger.warning("matplotlib不可用,将使用简化的压力图像生成") +# 定义 C 结构体 +class FPMS_DEVICE_INFO(ctypes.Structure): + _fields_ = [ + ("mn", ctypes.c_uint16), + ("sn", ctypes.c_char * 64), + ("fwVersion", ctypes.c_uint16), + ("protoVer", ctypes.c_uint8), + ("pid", ctypes.c_uint16), + ("vid", ctypes.c_uint16), + ("rows", ctypes.c_uint16), + ("cols", ctypes.c_uint16), + ] + + class RealPressureDevice: """真实SMiTSense压力传感器设备""" @@ -63,11 +77,11 @@ class RealPressureDevice: self.frame_size = 0 self.buf = None - # 设置DLL路径 - 使用正确的DLL文件名 + # 设置DLL路径 - 使用Wrapper.dll if dll_path is None: # 尝试多个可能的DLL文件名 dll_candidates = [ - os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'SMiTSenseUsbWrapper.dll'), + os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'Wrapper.dll'), os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'SMiTSenseUsb-F3.0.dll') ] dll_path = None @@ -97,31 +111,24 @@ class RealPressureDevice: raise FileNotFoundError(f"DLL文件未找到: {self.dll_path}") # 加载DLL - self.dll = ctypes.WinDLL(self.dll_path) + self.dll = ctypes.CDLL(self.dll_path) logger.info(f"成功加载DLL: {self.dll_path}") - # 设置函数签名(基于testsmit.py的工作代码) - self.dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int] - self.dll.SMiTSenseUsb_Init.restype = ctypes.c_int + # 设置函数签名(基于test22new.py的工作代码) + self.dll.fpms_usb_init_wrap.argtypes = [ctypes.c_int] + self.dll.fpms_usb_init_wrap.restype = ctypes.c_int - self.dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)] - self.dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int + self.dll.fpms_usb_get_device_list_wrap.argtypes = [ctypes.POINTER(FPMS_DEVICE_INFO), ctypes.c_int, ctypes.POINTER(ctypes.c_int)] + self.dll.fpms_usb_get_device_list_wrap.restype = ctypes.c_int - self.dll.SMiTSenseUsb_OpenAndStart.argtypes = [ - ctypes.c_int, - ctypes.POINTER(ctypes.c_uint16), - ctypes.POINTER(ctypes.c_uint16) - ] - self.dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int + self.dll.fpms_usb_open_wrap.argtypes = [ctypes.c_int, ctypes.POINTER(ctypes.c_uint64)] + self.dll.fpms_usb_open_wrap.restype = ctypes.c_int - self.dll.SMiTSenseUsb_GetLatestFrame.argtypes = [ - ctypes.POINTER(ctypes.c_uint16), - ctypes.c_int - ] - self.dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int + self.dll.fpms_usb_read_frame_wrap.argtypes = [ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint16), ctypes.c_size_t] + self.dll.fpms_usb_read_frame_wrap.restype = ctypes.c_int - self.dll.SMiTSenseUsb_StopAndClose.argtypes = [] - self.dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int + self.dll.fpms_usb_close_wrap.argtypes = [ctypes.c_uint64] + self.dll.fpms_usb_close_wrap.restype = ctypes.c_int logger.info("DLL函数签名设置完成") @@ -133,27 +140,30 @@ class RealPressureDevice: """初始化设备连接""" try: # 初始化USB连接 - ret = self.dll.SMiTSenseUsb_Init(0) - if ret != 0: - raise RuntimeError(f"USB初始化失败: {ret}") + if self.dll.fpms_usb_init_wrap(0) != 0: + raise RuntimeError("初始化失败") - # 扫描设备 + # 获取设备列表 count = ctypes.c_int() - ret = self.dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count)) - if ret != 0 or count.value == 0: - raise RuntimeError(f"设备扫描失败或未找到设备: {ret}, count: {count.value}") + devs = (FPMS_DEVICE_INFO * 10)() + r = self.dll.fpms_usb_get_device_list_wrap(devs, 10, ctypes.byref(count)) + if r != 0 or count.value == 0: + raise RuntimeError(f"未检测到设备: {r}, count: {count.value}") - logger.info(f"发现 {count.value} 个SMiTSense设备") + logger.info(f"检测到设备数量: {count.value}") + dev = devs[0] + self.rows, self.cols = dev.rows, dev.cols + logger.info(f"使用设备 SN={dev.sn.decode(errors='ignore')} {self.rows}x{self.cols}") - # 打开并启动第一个设备 - rows = ctypes.c_uint16() - cols = ctypes.c_uint16() - ret = self.dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols)) - if ret != 0: - raise RuntimeError(f"设备启动失败: {ret}") + # 打开设备 + self.device_handle = ctypes.c_uint64() + r = self.dll.fpms_usb_open_wrap(0, ctypes.byref(self.device_handle)) + if r != 0: + raise RuntimeError("设备打开失败") - self.rows = rows.value - self.cols = cols.value + logger.info(f"设备已打开, 句柄 = {self.device_handle.value}") + + # 准备数据缓冲区 self.frame_size = self.rows * self.cols self.buf_type = ctypes.c_uint16 * self.frame_size self.buf = self.buf_type() @@ -173,9 +183,9 @@ class RealPressureDevice: return self._get_empty_data() # 读取原始压力数据 - ret = self.dll.SMiTSenseUsb_GetLatestFrame(self.buf, self.frame_size) - if ret != 0: - logger.warning(f"读取数据帧失败: {ret}") + r = self.dll.fpms_usb_read_frame_wrap(self.device_handle.value, self.buf, self.frame_size) + if r != 0: + logger.warning(f"读取帧失败, code= {r}") return self._get_empty_data() # 转换为numpy数组 @@ -248,10 +258,10 @@ class RealPressureDevice: right_total_pct = float((right_total_abs / total_abs * 100) if total_abs > 0 else 0) # 前后占比(相对于各自单足总压) - left_front_pct = float((left_front / left_total_abs * 100) if left_total_abs > 0 else 0) - left_rear_pct = float((left_rear / left_total_abs * 100) if left_total_abs > 0 else 0) - right_front_pct = float((right_front / right_total_abs * 100) if right_total_abs > 0 else 0) - right_rear_pct = float((right_rear / right_total_abs * 100) if right_total_abs > 0 else 0) + left_front_pct = float((left_front / total_abs * 100) if total_abs > 0 else 0) + left_rear_pct = float((left_rear / total_abs * 100) if total_abs > 0 else 0) + right_front_pct = float((right_front / total_abs * 100) if total_abs > 0 else 0) + right_rear_pct = float((right_rear / total_abs * 100) if total_abs > 0 else 0) return { 'left_front': round(left_front_pct), @@ -284,7 +294,7 @@ class RealPressureDevice: return "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNkYPhfDwAChwGA60e6kgAAAABJRU5ErkJggg==" def _generate_heatmap_image(self, raw_data) -> str: - """生成基于原始数据的热力图(OpenCV实现,固定范围映射,效果与matplotlib一致)""" + """生成基于原始数据的热力图(OpenCV实现,自适应归一化,黑色背景)""" try: import cv2 import numpy as np @@ -292,12 +302,20 @@ class RealPressureDevice: from io import BytesIO from PIL import Image - # 固定映射范围(与 matplotlib vmin/vmax 一致) - vmin, vmax = 0, 1000 - norm_data = np.clip((raw_data - vmin) / (vmax - vmin) * 255, 0, 255).astype(np.uint8) + # 自适应归一化(基于test22new.py的方法2) + vmin = 10 # 最小阈值,低于此值显示为黑色 + dmin, dmax = np.min(raw_data), np.max(raw_data) + norm_data = np.clip((raw_data - dmin) / max(dmax - dmin, 1) * 255, 0, 255).astype(np.uint8) # 应用 jet 颜色映射 heatmap = cv2.applyColorMap(norm_data, cv2.COLORMAP_JET) + + # 将低于阈值的区域设置为黑色 + heatmap[raw_data <= vmin] = (0, 0, 0) + + # 放大图像以便更好地显示细节 + rows, cols = raw_data.shape + heatmap = cv2.resize(heatmap, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) # OpenCV 生成的是 BGR,转成 RGB heatmap_rgb = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB) @@ -363,6 +381,10 @@ class RealPressureDevice: ax.text(2, 0.5, '左足', ha='center', va='center', fontsize=12, weight='bold') ax.text(8, 0.5, '右足', ha='center', va='center', fontsize=12, weight='bold') + # 设置图形背景为黑色 + fig.patch.set_facecolor('black') + ax.set_facecolor('black') + # 保存为base64 buffer = BytesIO() plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, facecolor='black') @@ -394,8 +416,8 @@ class RealPressureDevice: def close(self): """显式关闭压力传感器连接""" try: - if self.is_connected and self.dll: - self.dll.SMiTSenseUsb_StopAndClose() + if self.is_connected and self.dll and self.device_handle: + self.dll.fpms_usb_close_wrap(self.device_handle.value) self.is_connected = False logger.info('SMiTSense压力传感器连接已关闭') except Exception as e: diff --git a/backend/devices/utils/config.ini b/backend/devices/utils/config.ini index 07e1f81e..5cf905ba 100644 --- a/backend/devices/utils/config.ini +++ b/backend/devices/utils/config.ini @@ -15,7 +15,7 @@ backup_interval = 24 max_backups = 7 [CAMERA] -device_index = 0 +device_index = 3 width = 1280 height = 720 fps = 30 @@ -24,8 +24,8 @@ fps = 30 color_resolution = 1080P depth_mode = NFOV_UNBINNED fps = 30 -depth_range_min = 1200 -depth_range_max = 1500 +depth_range_min = 1400 +depth_range_max = 1700 [DEVICES] imu_device_type = real diff --git a/backend/dll/smitsense/SMiTSenseUsb-F3.0.dll b/backend/dll/smitsense/SMiTSenseUsb-F3.0.dll index c647f5db..fa22027a 100644 Binary files a/backend/dll/smitsense/SMiTSenseUsb-F3.0.dll and b/backend/dll/smitsense/SMiTSenseUsb-F3.0.dll differ diff --git a/backend/dll/smitsense/SMiTSenseUsb-F3.0d.dll b/backend/dll/smitsense/SMiTSenseUsb-F3.0d.dll new file mode 100644 index 00000000..6a913a5e Binary files /dev/null and b/backend/dll/smitsense/SMiTSenseUsb-F3.0d.dll differ diff --git a/backend/dll/smitsense/SMiTSenseUsbWrapper.dll b/backend/dll/smitsense/SMiTSenseUsbWrapper.dll index 04eb4e48..cfa259cc 100644 Binary files a/backend/dll/smitsense/SMiTSenseUsbWrapper.dll and b/backend/dll/smitsense/SMiTSenseUsbWrapper.dll differ diff --git a/backend/dll/smitsense/Wrapper.dll b/backend/dll/smitsense/Wrapper.dll new file mode 100644 index 00000000..ac61219d Binary files /dev/null and b/backend/dll/smitsense/Wrapper.dll differ diff --git a/backend/tests/SMiTSenseUsb-F3.0.dll b/backend/tests/SMiTSenseUsb-F3.0.dll index c647f5db..fa22027a 100644 Binary files a/backend/tests/SMiTSenseUsb-F3.0.dll and b/backend/tests/SMiTSenseUsb-F3.0.dll differ diff --git a/backend/tests/SMiTSenseUsb-F3.0d.dll b/backend/tests/SMiTSenseUsb-F3.0d.dll new file mode 100644 index 00000000..6a913a5e Binary files /dev/null and b/backend/tests/SMiTSenseUsb-F3.0d.dll differ diff --git a/backend/tests/SMiTSenseUsbWrapper.dll b/backend/tests/SMiTSenseUsbWrapper.dll index 04eb4e48..fdb1766f 100644 Binary files a/backend/tests/SMiTSenseUsbWrapper.dll and b/backend/tests/SMiTSenseUsbWrapper.dll differ diff --git a/backend/tests/Wrapper.dll b/backend/tests/Wrapper.dll new file mode 100644 index 00000000..ac61219d Binary files /dev/null and b/backend/tests/Wrapper.dll differ diff --git a/backend/tests/lib_fpms_usb.h b/backend/tests/lib_fpms_usb.h new file mode 100644 index 00000000..0b1fe237 --- /dev/null +++ b/backend/tests/lib_fpms_usb.h @@ -0,0 +1,57 @@ +#pragma once +#define __DLL_EXPORTS__ + +#ifdef __DLL_EXPORTS__ +#define DLLAPI __declspec(dllexport) +#else +#define DLLAPI __declspec(dllimport) +#endif + +#include +#include +#include +using namespace std; + +typedef void* SM_HANDLE; + +typedef struct _FPMS_DEVICE +{ + uint16_t mn; + std::string sn; + uint16_t fwVersion; + uint8_t protoVer; + uint16_t pid; + uint16_t vid; + uint16_t rows; + uint16_t cols; + +} FPMS_DEVICE_T; + +extern "C" +{ + DLLAPI + int WINAPI fpms_usb_init(int debugFlag); + + DLLAPI + int WINAPI fpms_usb_get_device_list(std::vector& gDevList); + + DLLAPI + int WINAPI fpms_usb_open(FPMS_DEVICE_T dev, SM_HANDLE& gHandle); + + DLLAPI + int WINAPI fpms_usb_read_frame(SM_HANDLE gHandle, uint16_t* frame); + + DLLAPI + int WINAPI fpms_usb_config_sensitivity(SM_HANDLE gHandle, uint8_t bWriteFlash, const uint8_t level); + + DLLAPI + int WINAPI fpms_usb_get_sensitivity(SM_HANDLE gHandle, uint8_t& level); + + DLLAPI + int WINAPI fpms_usb_close(SM_HANDLE gHandle); +} + + + + + diff --git a/backend/tests/test111.py b/backend/tests/test111.py new file mode 100644 index 00000000..4680a9b1 --- /dev/null +++ b/backend/tests/test111.py @@ -0,0 +1,79 @@ +import ctypes +import time +import numpy as np + +# === DLL 加载 === +dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll") + +# === DLL 函数声明 === +dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int] +dll.SMiTSenseUsb_Init.restype = ctypes.c_int + +dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)] +dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int + +dll.SMiTSenseUsb_OpenAndStart.argtypes = [ + ctypes.c_int, + ctypes.POINTER(ctypes.c_uint16), + ctypes.POINTER(ctypes.c_uint16) +] +dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int + +dll.SMiTSenseUsb_GetLatestFrame.argtypes = [ + ctypes.POINTER(ctypes.c_uint16), + ctypes.c_int +] +dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int + +dll.SMiTSenseUsb_StopAndClose.argtypes = [] +dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int + +# === 初始化设备 === +ret = dll.SMiTSenseUsb_Init(0) +if ret != 0: + raise RuntimeError(f"Init failed: {ret}") + +count = ctypes.c_int() +ret = dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count)) +if ret != 0 or count.value == 0: + raise RuntimeError("No devices found") + +# 打开设备 +rows = ctypes.c_uint16() +cols = ctypes.c_uint16() +ret = dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols)) +if ret != 0: + raise RuntimeError("OpenAndStart failed") + +rows_val, cols_val = rows.value, cols.value +frame_size = rows_val * cols_val +buf_type = ctypes.c_uint16 * frame_size +buf = buf_type() + +# 创建一个 NumPy 数组视图,复用内存 +data_array = np.ctypeslib.as_array(buf).reshape((rows_val, cols_val)) + +print(f"设备已打开: {rows_val}x{cols_val}") + +try: + while True: + ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size) + time.sleep(1) + # while True: + # ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size) + # if ret == 0: + # # data_array 已经复用缓冲区内存,每次直接访问即可 + # # 例如打印最大值和前5行前5列的数据 + # print("最大压力值:", data_array.max()) + # print("前5x5数据:\n", data_array[:5, :5]) + # else: + # print("读取数据帧失败") + + # time.sleep(1) # 每秒读取一帧 + +except KeyboardInterrupt: + print("退出中...") + +finally: + dll.SMiTSenseUsb_StopAndClose() + print("设备已关闭") diff --git a/backend/tests/test22new.py b/backend/tests/test22new.py new file mode 100644 index 00000000..d54ec10c --- /dev/null +++ b/backend/tests/test22new.py @@ -0,0 +1,142 @@ +import ctypes +import numpy as np +import cv2 +import sys +import time + +# ------------------- DLL 加载 ------------------- +dll = ctypes.CDLL(r"D:\BodyBalanceEvaluation\backend\tests\Wrapper.dll") + +# 定义 C 结构体 +class FPMS_DEVICE_INFO(ctypes.Structure): + _fields_ = [ + ("mn", ctypes.c_uint16), + ("sn", ctypes.c_char * 64), + ("fwVersion", ctypes.c_uint16), + ("protoVer", ctypes.c_uint8), + ("pid", ctypes.c_uint16), + ("vid", ctypes.c_uint16), + ("rows", ctypes.c_uint16), + ("cols", ctypes.c_uint16), + ] + +# 函数声明 +dll.fpms_usb_init_wrap.argtypes = [ctypes.c_int] +dll.fpms_usb_init_wrap.restype = ctypes.c_int + +dll.fpms_usb_get_device_list_wrap.argtypes = [ctypes.POINTER(FPMS_DEVICE_INFO), ctypes.c_int, ctypes.POINTER(ctypes.c_int)] +dll.fpms_usb_get_device_list_wrap.restype = ctypes.c_int + +dll.fpms_usb_open_wrap.argtypes = [ctypes.c_int, ctypes.POINTER(ctypes.c_uint64)] +dll.fpms_usb_open_wrap.restype = ctypes.c_int + +dll.fpms_usb_read_frame_wrap.argtypes = [ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint16), ctypes.c_size_t] +dll.fpms_usb_read_frame_wrap.restype = ctypes.c_int + +dll.fpms_usb_close_wrap.argtypes = [ctypes.c_uint64] +dll.fpms_usb_close_wrap.restype = ctypes.c_int + +# ------------------- 初始化 ------------------- +if dll.fpms_usb_init_wrap(0) != 0: + print("初始化失败") + sys.exit(1) + +# 获取设备列表 +count = ctypes.c_int() +devs = (FPMS_DEVICE_INFO * 10)() +r = dll.fpms_usb_get_device_list_wrap(devs, 10, ctypes.byref(count)) +if r != 0 or count.value == 0: + print("未检测到设备") + sys.exit(1) + +print("检测到设备数量:", count.value) +dev = devs[0] +rows, cols = dev.rows, dev.cols +print(f"使用设备 SN={dev.sn.decode(errors='ignore')} {rows}x{cols}") + +# 打开设备 +handle = ctypes.c_uint64() +r = dll.fpms_usb_open_wrap(0, ctypes.byref(handle)) +if r != 0: + print("设备打开失败") + sys.exit(1) +print("设备已打开, 句柄 =", handle.value) + +# ------------------- 循环读取帧数据 ------------------- +buf_len = rows * cols +FrameArray = (ctypes.c_uint16 * buf_len)() + +vmin, vmax = 100, 1000 # 根据实际传感器数据范围调整 + +try: + while True: + r = dll.fpms_usb_read_frame_wrap(handle.value, FrameArray, buf_len) + if r != 0: + print("读取帧失败, code=", r) + time.sleep(0.05) + continue + + # 转 numpy + data = np.frombuffer(FrameArray, dtype=np.uint16).reshape((rows, cols)) + + # ------------------- 方法1: 固定线性映射 ------------------- + norm1 = np.clip((data - vmin) / (vmax - vmin) * 255, 0, 255).astype(np.uint8) + heatmap1 = cv2.applyColorMap(norm1, cv2.COLORMAP_JET) + heatmap1[data <= vmin] = (0,0,0) + heatmap1 = cv2.resize(heatmap1, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) + cv2.imshow("方法1: 固定线性", heatmap1) + + # ------------------- 方法2: 自适应归一化 ------------------- + dmin, dmax = np.min(data), np.max(data) + norm2 = np.clip((data - dmin) / max(dmax - dmin, 1) * 255, 0, 255).astype(np.uint8) + heatmap2 = cv2.applyColorMap(norm2, cv2.COLORMAP_JET) + heatmap2[data <= vmin] = (0,0,0) + heatmap2 = cv2.resize(heatmap2, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) + cv2.imshow("方法2: 自适应归一化", heatmap2) + + # ------------------- 方法3: 对数映射 ------------------- + log_data = np.log1p(data - vmin) + log_max = np.max(log_data) + norm3 = np.clip(log_data / max(log_max, 1) * 255, 0, 255).astype(np.uint8) + heatmap3 = cv2.applyColorMap(norm3, cv2.COLORMAP_JET) + heatmap3[data <= vmin] = (0,0,0) + heatmap3 = cv2.resize(heatmap3, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) + cv2.imshow("方法3: 对数映射", heatmap3) + + # ------------------- 方法4: 平方根映射 ------------------- + sqrt_data = np.sqrt(np.clip(data - vmin, 0, None)) + sqrt_max = np.max(sqrt_data) + norm4 = np.clip(sqrt_data / max(sqrt_max, 1) * 255, 0, 255).astype(np.uint8) + heatmap4 = cv2.applyColorMap(norm4, cv2.COLORMAP_JET) + heatmap4[data <= vmin] = (0,0,0) + heatmap4 = cv2.resize(heatmap4, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) + cv2.imshow("方法4: 平方根映射", heatmap4) + + # ------------------- 方法5: 分段颜色映射 ------------------- + heatmap5 = np.zeros((rows, cols, 3), dtype=np.uint8) + # 定义压力区间和对应颜色 (B,G,R) + bins = [vmin, 250, 400, 600, 800, vmax] + colors = [ + (0,0,0), + (0,0,255), # 蓝 + (0,255,0), # 绿 + (0,255,255), # 黄 + (255,0,0), # 红 + ] + for i in range(len(bins)-1): + mask = (data > bins[i]) & (data <= bins[i+1]) + heatmap5[mask] = colors[i] + heatmap5 = cv2.resize(heatmap5, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST) + cv2.imshow("方法5: 分段映射", heatmap5) + + # ------------------- 退出 ------------------- + if cv2.waitKey(1) & 0xFF == 27: # ESC 退出 + break + +except KeyboardInterrupt: + pass + +# ------------------- 清理 ------------------- +dll.fpms_usb_close_wrap(handle.value) +cv2.destroyAllWindows() +print("已退出") diff --git a/backend/tests/test_smitsense_dll.py b/backend/tests/test_smitsense_dll.py deleted file mode 100644 index 9b2d0cf7..00000000 --- a/backend/tests/test_smitsense_dll.py +++ /dev/null @@ -1,479 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -SMiTSense足部压力传感器DLL测试程序 -测试SMiTSenseUsb-F3.0.dll的Python接口调用 -""" - -import ctypes -import os -import sys -import time -from ctypes import Structure, c_int, c_float, c_char_p, c_void_p, c_uint32, POINTER, byref -import logging - -# 设置日志 -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -logger = logging.getLogger(__name__) - -class SMiTSensePressureSensor: - """ - SMiTSense足部压力传感器Python接口类 - """ - - def __init__(self, dll_path=None): - """ - 初始化SMiTSense压力传感器 - - Args: - dll_path: DLL文件路径,如果为None则使用默认路径 - """ - self.dll = None - self.device_handle = None - self.is_connected = False - - # 设置DLL路径 - if dll_path is None: - dll_path = os.path.join(os.path.dirname(__file__), 'SMiTSenseUsb-F3.0.dll') - - self.dll_path = dll_path - self._load_dll() - - def _load_dll(self): - """ - 加载SMiTSense DLL并设置函数签名 - """ - try: - if not os.path.exists(self.dll_path): - raise FileNotFoundError(f"DLL文件未找到: {self.dll_path}") - - # 加载DLL - self.dll = ctypes.WinDLL(self.dll_path) - logger.info(f"成功加载DLL: {self.dll_path}") - - # 设置函数签名 - self._setup_function_signatures() - - except Exception as e: - logger.error(f"加载DLL失败: {e}") - raise - - def _setup_function_signatures(self): - """ - 设置DLL函数的参数类型和返回类型 - """ - try: - # fpms_usb_init - 初始化USB连接 - self.dll.fpms_usb_init.argtypes = [] - self.dll.fpms_usb_init.restype = c_int - - # fpms_usb_get_device_list - 获取设备列表 - self.dll.fpms_usb_get_device_list.argtypes = [POINTER(c_int)] - self.dll.fpms_usb_get_device_list.restype = c_int - - # fpms_usb_open - 打开设备 - self.dll.fpms_usb_open.argtypes = [c_int] - self.dll.fpms_usb_open.restype = c_void_p - - # fpms_usb_close - 关闭设备 - self.dll.fpms_usb_close.argtypes = [c_void_p] - self.dll.fpms_usb_close.restype = c_int - - # fpms_usb_read_frame - 读取压力数据帧 - self.dll.fpms_usb_read_frame.argtypes = [c_void_p, POINTER(c_float), c_int] - self.dll.fpms_usb_read_frame.restype = c_int - - # fpms_usb_get_sensitivity - 获取灵敏度 - self.dll.fpms_usb_get_sensitivity.argtypes = [c_void_p, POINTER(c_float)] - self.dll.fpms_usb_get_sensitivity.restype = c_int - - # fpms_usb_config_sensitivity - 配置灵敏度 - self.dll.fpms_usb_config_sensitivity.argtypes = [c_void_p, c_float] - self.dll.fpms_usb_config_sensitivity.restype = c_int - - logger.info("DLL函数签名设置完成") - - except AttributeError as e: - logger.error(f"设置函数签名失败,可能是函数名不匹配: {e}") - raise - - def initialize(self): - """ - 初始化USB连接 - - Returns: - bool: 初始化是否成功 - """ - try: - result = self.dll.fpms_usb_init() - if result == 0: - logger.info("USB连接初始化成功") - return True - else: - logger.error(f"USB连接初始化失败,错误码: {result}") - return False - except Exception as e: - logger.error(f"初始化异常: {e}") - return False - - def get_device_list(self): - """ - 获取可用设备列表 - - Returns: - list: 设备ID列表 - """ - try: - device_count = c_int() - result = self.dll.fpms_usb_get_device_list(byref(device_count)) - - if result == 0: - count = device_count.value - logger.info(f"发现 {count} 个SMiTSense设备") - return list(range(count)) - else: - logger.error(f"获取设备列表失败,错误码: {result}") - return [] - except Exception as e: - logger.error(f"获取设备列表异常: {e}") - return [] - - def connect(self, device_id=0): - """ - 连接到指定设备 - - Args: - device_id: 设备ID,默认为0 - - Returns: - bool: 连接是否成功 - """ - try: - self.device_handle = self.dll.fpms_usb_open(device_id) - - if self.device_handle: - self.is_connected = True - logger.info(f"成功连接到设备 {device_id}") - return True - else: - logger.error(f"连接设备 {device_id} 失败") - return False - except Exception as e: - logger.error(f"连接设备异常: {e}") - return False - - def disconnect(self): - """ - 断开设备连接 - - Returns: - bool: 断开是否成功 - """ - try: - if self.device_handle and self.is_connected: - result = self.dll.fpms_usb_close(self.device_handle) - - if result == 0: - self.is_connected = False - self.device_handle = None - logger.info("设备连接已断开") - return True - else: - logger.error(f"断开连接失败,错误码: {result}") - return False - else: - logger.warning("设备未连接,无需断开") - return True - except Exception as e: - logger.error(f"断开连接异常: {e}") - return False - - def read_pressure_data(self, sensor_count=16): - """ - 读取压力传感器数据 - - Args: - sensor_count: 传感器数量,默认16个 - - Returns: - list: 压力值列表,如果失败返回None - """ - try: - if not self.is_connected or not self.device_handle: - logger.error("设备未连接") - return None - - # 创建压力数据缓冲区 - pressure_data = (c_float * sensor_count)() - - result = self.dll.fpms_usb_read_frame( - self.device_handle, - pressure_data, - sensor_count - ) - - if result == 0: - # 转换为Python列表 - data_list = [pressure_data[i] for i in range(sensor_count)] - logger.debug(f"读取压力数据成功: {data_list}") - return data_list - else: - logger.error(f"读取压力数据失败,错误码: {result}") - return None - - except Exception as e: - logger.error(f"读取压力数据异常: {e}") - return None - - def get_sensitivity(self): - """ - 获取传感器灵敏度 - - Returns: - float: 灵敏度值,如果失败返回None - """ - try: - if not self.is_connected or not self.device_handle: - logger.error("设备未连接") - return None - - sensitivity = c_float() - result = self.dll.fpms_usb_get_sensitivity(self.device_handle, byref(sensitivity)) - - if result == 0: - value = sensitivity.value - logger.info(f"当前灵敏度: {value}") - return value - else: - logger.error(f"获取灵敏度失败,错误码: {result}") - return None - - except Exception as e: - logger.error(f"获取灵敏度异常: {e}") - return None - - def set_sensitivity(self, sensitivity_value): - """ - 设置传感器灵敏度 - - Args: - sensitivity_value: 灵敏度值 - - Returns: - bool: 设置是否成功 - """ - try: - if not self.is_connected or not self.device_handle: - logger.error("设备未连接") - return False - - result = self.dll.fpms_usb_config_sensitivity( - self.device_handle, - c_float(sensitivity_value) - ) - - if result == 0: - logger.info(f"灵敏度设置成功: {sensitivity_value}") - return True - else: - logger.error(f"设置灵敏度失败,错误码: {result}") - return False - - except Exception as e: - logger.error(f"设置灵敏度异常: {e}") - return False - - def get_foot_pressure_zones(self, pressure_data): - """ - 将原始压力数据转换为足部区域压力 - - Args: - pressure_data: 原始压力数据列表 - - Returns: - dict: 足部各区域压力数据 - """ - if not pressure_data or len(pressure_data) < 16: - return None - - try: - # 假设16个传感器的布局(可根据实际传感器布局调整) - # 左脚前部: 传感器 0-3 - # 左脚后部: 传感器 4-7 - # 右脚前部: 传感器 8-11 - # 右脚后部: 传感器 12-15 - - left_front = sum(pressure_data[0:4]) - left_rear = sum(pressure_data[4:8]) - right_front = sum(pressure_data[8:12]) - right_rear = sum(pressure_data[12:16]) - - left_total = left_front + left_rear - right_total = right_front + right_rear - total_pressure = left_total + right_total - - return { - 'left_front': left_front, - 'left_rear': left_rear, - 'right_front': right_front, - 'right_rear': right_rear, - 'left_total': left_total, - 'right_total': right_total, - 'total_pressure': total_pressure, - 'raw_data': pressure_data - } - - except Exception as e: - logger.error(f"处理足部压力区域数据异常: {e}") - return None - - -def test_smitsense_sensor(): - """ - 测试SMiTSense压力传感器的主要功能 - """ - logger.info("开始SMiTSense压力传感器测试") - - # 创建传感器实例 - sensor = SMiTSensePressureSensor() - - try: - # 1. 初始化 - logger.info("1. 初始化USB连接...") - if not sensor.initialize(): - logger.error("初始化失败,测试终止") - return False - - # 2. 获取设备列表 - logger.info("2. 获取设备列表...") - devices = sensor.get_device_list() - if not devices: - logger.warning("未发现SMiTSense设备") - return False - - logger.info(f"发现设备: {devices}") - - # 3. 连接第一个设备 - logger.info("3. 连接设备...") - if not sensor.connect(devices[0]): - logger.error("连接设备失败") - return False - - # 4. 获取当前灵敏度 - logger.info("4. 获取传感器灵敏度...") - sensitivity = sensor.get_sensitivity() - if sensitivity is not None: - logger.info(f"当前灵敏度: {sensitivity}") - - # 5. 设置新的灵敏度(可选) - logger.info("5. 设置传感器灵敏度...") - if sensor.set_sensitivity(1.0): - logger.info("灵敏度设置成功") - - # 6. 读取压力数据 - logger.info("6. 开始读取压力数据...") - for i in range(10): # 读取10次数据 - pressure_data = sensor.read_pressure_data() - - if pressure_data: - # 转换为足部区域数据 - foot_zones = sensor.get_foot_pressure_zones(pressure_data) - - if foot_zones: - logger.info(f"第{i+1}次读取:") - logger.info(f" 左脚前部: {foot_zones['left_front']:.2f}") - logger.info(f" 左脚后部: {foot_zones['left_rear']:.2f}") - logger.info(f" 右脚前部: {foot_zones['right_front']:.2f}") - logger.info(f" 右脚后部: {foot_zones['right_rear']:.2f}") - logger.info(f" 总压力: {foot_zones['total_pressure']:.2f}") - else: - logger.warning(f"第{i+1}次读取: 数据处理失败") - else: - logger.warning(f"第{i+1}次读取: 无数据") - - time.sleep(0.1) # 等待100ms - - logger.info("压力数据读取测试完成") - return True - - except Exception as e: - logger.error(f"测试过程中发生异常: {e}") - return False - - finally: - # 7. 断开连接 - logger.info("7. 断开设备连接...") - sensor.disconnect() - logger.info("SMiTSense压力传感器测试结束") - - -def test_dll_functions(): - """ - 测试DLL函数的基本调用(不需要实际硬件) - """ - logger.info("开始DLL函数基本调用测试") - - try: - sensor = SMiTSensePressureSensor() - logger.info("DLL加载成功") - - # 测试初始化函数 - logger.info("测试初始化函数...") - result = sensor.initialize() - logger.info(f"初始化结果: {result}") - - # 测试获取设备列表 - logger.info("测试获取设备列表...") - devices = sensor.get_device_list() - logger.info(f"设备列表: {devices}") - - logger.info("DLL函数基本调用测试完成") - return True - - except Exception as e: - logger.error(f"DLL函数测试失败: {e}") - return False - - -if __name__ == "__main__": - print("SMiTSense足部压力传感器DLL测试程序") - print("=" * 50) - - # 检查DLL文件是否存在 - dll_path = os.path.join(os.path.dirname(__file__), 'SMiTSenseUsb-F3.0.dll') - if not os.path.exists(dll_path): - print(f"错误: DLL文件不存在 - {dll_path}") - print("请确保SMiTSenseUsb-F3.0.dll文件在当前目录中") - sys.exit(1) - - print(f"DLL文件路径: {dll_path}") - print() - - # 运行测试 - try: - # 首先测试DLL基本功能 - print("1. DLL基本功能测试") - print("-" * 30) - if test_dll_functions(): - print("✅ DLL基本功能测试通过") - else: - print("❌ DLL基本功能测试失败") - - print() - - # 然后测试完整的传感器功能(需要硬件) - print("2. 完整传感器功能测试(需要硬件连接)") - print("-" * 30) - if test_smitsense_sensor(): - print("✅ 传感器功能测试通过") - else: - print("❌ 传感器功能测试失败(可能是硬件未连接)") - - except KeyboardInterrupt: - print("\n用户中断测试") - except Exception as e: - print(f"\n测试过程中发生未处理的异常: {e}") - import traceback - traceback.print_exc() - - print("\n测试程序结束") \ No newline at end of file diff --git a/backend/tests/testsmit.py b/backend/tests/testsmit.py index 19d3960c..ec5508d4 100644 --- a/backend/tests/testsmit.py +++ b/backend/tests/testsmit.py @@ -4,7 +4,7 @@ import numpy as np import cv2 # === DLL 加载 === -dll = ctypes.WinDLL(r"D:\Trae_space\BodyBalanceEvaluation\backend\tests\SMiTSenseUsbWrapper.dll") +dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll") dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int] dll.SMiTSenseUsb_Init.restype = ctypes.c_int @@ -76,7 +76,7 @@ try: break else: print("读取数据帧失败") - time.sleep(0.05) # 20 FPS + time.sleep(1) # 20 FPS except KeyboardInterrupt: pass finally: