足底压力仪优化提交

This commit is contained in:
zhaozilong12 2025-08-18 18:30:49 +08:00
parent 4e89ab6107
commit c53521eebf
17 changed files with 591 additions and 688 deletions

View File

@ -53,10 +53,14 @@ class CameraManager(BaseDevice):
self.fps = config.get('fps', 30)
self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG')
# 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输)
self._tx_max_width = int(config.get('tx_max_width', 640))
# 流控制
self.streaming_thread = None
self.frame_cache = deque(maxlen=10)
# 减小缓存长度保留最近2帧即可避免累计占用
self.frame_cache = deque(maxlen=int(config.get('frame_cache_len', 2)))
self.last_frame = None
self.frame_count = 0
self.dropped_frames = 0
@ -77,6 +81,12 @@ class CameraManager(BaseDevice):
'actual_fps': 0,
'dropped_frames': 0
}
# OpenCV优化开关
try:
cv2.setUseOptimized(True)
except Exception:
pass
self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}")
@ -90,7 +100,7 @@ class CameraManager(BaseDevice):
try:
self.logger.info(f"正在初始化相机设备 {self.device_index}...")
# 尝试多个后端
# 尝试多个后端Windows下优先MSMF/DShow
backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY]
for backend in backends:
@ -127,7 +137,10 @@ class CameraManager(BaseDevice):
self.logger.error(f"相机初始化失败: {e}")
self.is_connected = False
if self.cap:
self.cap.release()
try:
self.cap.release()
except Exception:
pass
self.cap = None
return False
@ -151,8 +164,11 @@ class CameraManager(BaseDevice):
# 设置帧率
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# 设置缓冲区大小
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size)
# 设置缓冲区大小(部分后端不生效)
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size)
except Exception:
pass
# 获取实际设置的值
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
@ -199,7 +215,7 @@ class CameraManager(BaseDevice):
# 读取几帧来稳定相机
for i in range(5):
ret, frame = self.cap.read()
ret, _ = self.cap.read()
if not ret:
self.logger.warning(f"校准时读取第{i+1}帧失败")
@ -253,7 +269,9 @@ class CameraManager(BaseDevice):
self.is_streaming = False
if self.streaming_thread and self.streaming_thread.is_alive():
# 等待线程退出
self.streaming_thread.join(timeout=3.0)
self.streaming_thread = None
self.logger.info("相机流已停止")
return True
@ -269,8 +287,13 @@ class CameraManager(BaseDevice):
self.logger.info("相机流工作线程启动")
reconnect_attempts = 0
# 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积
frame_interval = 1.0 / max(self.fps, 1)
next_tick = time.time()
while self.is_streaming:
loop_start = time.time()
try:
if not self.cap or not self.cap.isOpened():
if reconnect_attempts < self.max_reconnect_attempts:
@ -292,33 +315,51 @@ class CameraManager(BaseDevice):
self.dropped_frames += 1
if self.dropped_frames > 10:
self.logger.warning(f"连续丢帧过多: {self.dropped_frames}")
# 仅在异常情况下触发一次GC避免高频强制GC
try:
gc.collect()
except Exception:
pass
self.dropped_frames = 0
time.sleep(0.01)
# 防止空转占满CPU
time.sleep(0.005)
continue
# 重置丢帧计数
self.dropped_frames = 0
# 处理帧
# 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame)
# 缓存帧
self.last_frame = processed_frame.copy()
self.frame_cache.append(processed_frame)
# 缓存帧(不复制,减少内存占用)
# self.last_frame = processed_frame
# self.frame_cache.append(processed_frame)
# 发送帧数据
self._send_frame_data(processed_frame)
# 更新统计
self._update_statistics()
# 内存管理
if self.frame_count % 30 == 0:
gc.collect()
# 主动释放局部引用帮助GC更快识别可回收对象
del frame
# 注意processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身
# 限速保证不超过目标FPS减小发送端积压
now = time.time()
# 下一个tick基于固定间隔前移避免误差累积
next_tick += frame_interval
sleep_time = next_tick - now
if sleep_time > 0:
time.sleep(sleep_time)
else:
# 如果处理耗时超过间隔,纠正节拍器,避免持续为负
next_tick = now
except Exception as e:
self.logger.error(f"相机流处理异常: {e}")
time.sleep(0.1)
# 小退避,避免异常情况下空转
time.sleep(0.02)
self.logger.info("相机流工作线程结束")
@ -333,15 +374,14 @@ class CameraManager(BaseDevice):
np.ndarray: 处理后的帧
"""
try:
# 调整大小以优化传输
if frame.shape[1] > 640:
scale_factor = 640 / frame.shape[1]
new_width = 640
new_height = int(frame.shape[0] * scale_factor)
frame = cv2.resize(frame, (new_width, new_height))
# 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU
h, w = frame.shape[:2]
if w > self._tx_max_width:
scale = self._tx_max_width / float(w)
new_w = self._tx_max_width
new_h = int(h * scale)
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
return frame
except Exception as e:
self.logger.error(f"处理帧失败: {e}")
return frame
@ -353,13 +393,21 @@ class CameraManager(BaseDevice):
Args:
frame: 视频帧
"""
# 将临时对象局部化,并在 finally 中删除引用,加速回收
buffer = None
frame_bytes = None
frame_data = None
try:
# 编码为JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
_, buffer = cv2.imencode('.jpg', frame, encode_param)
ok, buffer = cv2.imencode('.jpg', frame, encode_param)
if not ok or buffer is None:
self.logger.warning("帧JPEG编码失败")
return
# 转换为base64
frame_data = base64.b64encode(buffer).decode('utf-8')
# 转换为bytes再做base64减少中间numpy对象的长生命周期
frame_bytes = buffer.tobytes()
frame_data = base64.b64encode(frame_bytes).decode('utf-8')
# 发送数据
data = {
@ -374,6 +422,11 @@ class CameraManager(BaseDevice):
except Exception as e:
self.logger.error(f"发送帧数据失败: {e}")
finally:
# 显式删除临时大对象的引用,避免在高吞吐下堆积
del buffer
del frame_bytes
del frame_data
def _update_statistics(self):
"""
@ -405,7 +458,11 @@ class CameraManager(BaseDevice):
"""
try:
if self.cap:
self.cap.release()
try:
self.cap.release()
except Exception:
pass
self.cap = None
time.sleep(1.0) # 等待设备释放
@ -471,6 +528,7 @@ class CameraManager(BaseDevice):
Returns:
Optional[np.ndarray]: 最新帧无帧返回None
"""
# 对外提供拷贝,内部保持原对象,避免重复持有
return self.last_frame.copy() if self.last_frame is not None else None
def disconnect(self):
@ -481,7 +539,10 @@ class CameraManager(BaseDevice):
self.stop_streaming()
if self.cap:
self.cap.release()
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
@ -498,7 +559,10 @@ class CameraManager(BaseDevice):
self.stop_streaming()
if self.cap:
self.cap.release()
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.frame_cache.clear()
@ -508,4 +572,4 @@ class CameraManager(BaseDevice):
self.logger.info("相机资源清理完成")
except Exception as e:
self.logger.error(f"清理相机资源失败: {e}")
self.logger.error(f"清理相机资源失败: {e}")

View File

@ -98,8 +98,31 @@ class FemtoBoltManager(BaseDevice):
self.max_reconnect_attempts = 3
self.reconnect_delay = 3.0
# 发送频率控制(内存优化)
self.send_fps = self.config.get('send_fps', 20) # 默认20FPS发送
self._min_send_interval = 1.0 / self.send_fps if self.send_fps > 0 else 0.05
self._last_send_time = 0
# 编码参数缓存(避免每帧创建数组)
self._encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), int(self.config.get('jpeg_quality', 80))]
# 预计算伽马LUT避免每帧计算
self._gamma_lut = None
self._current_gamma = None
self._update_gamma_lut()
# 预生成网格背景(避免每帧创建)
self._grid_bg = None
self._grid_size = (480, 640) # 默认尺寸
self.logger.info("FemtoBolt管理器初始化完成")
def _update_gamma_lut(self):
"""更新伽马校正查找表"""
if self._current_gamma != self.gamma_value:
self._gamma_lut = np.array([((i / 255.0) ** self.gamma_value) * 255 for i in range(256)]).astype("uint8")
self._current_gamma = self.gamma_value
def initialize(self) -> bool:
"""
初始化FemtoBolt设备
@ -391,7 +414,7 @@ class FemtoBoltManager(BaseDevice):
capture.release()
else:
self.logger.warning(f"时无法获取第{i+1}")
self.logger.warning(f"时无法获取第{i+1}")
time.sleep(0.1)
@ -464,162 +487,160 @@ class FemtoBoltManager(BaseDevice):
try:
while self.is_streaming:
# 发送频率限制
now = time.time()
if now - self._last_send_time < self._min_send_interval:
time.sleep(0.001)
continue
if self.device_handle and self._socketio:
try:
capture = self.device_handle.update()
if capture is not None:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 使用与device_manager.py相同的处理逻辑
depth_image = depth_image.copy()
# === 生成灰色背景 + 白色网格 ===
rows, cols = depth_image.shape[:2]
background = np.ones((rows, cols, 3), dtype=np.uint8) * 128
cell_size = 50
grid_color = (255, 255, 255)
grid_bg = np.zeros_like(background)
for x in range(0, cols, cell_size):
cv2.line(grid_bg, (x, 0), (x, rows), grid_color, 1)
for y in range(0, rows, cell_size):
cv2.line(grid_bg, (0, y), (cols, y), grid_color, 1)
mask_grid = (grid_bg.sum(axis=2) > 0)
background[mask_grid] = grid_bg[mask_grid]
# === 处理深度图满足区间的部分 ===
depth_clipped = depth_image.copy()
depth_clipped[depth_clipped < self.depth_range_min] = 0
depth_clipped[depth_clipped > self.depth_range_max] = 0
depth_normalized = np.clip(depth_clipped, self.depth_range_min, self.depth_range_max)
depth_normalized = ((depth_normalized - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8)
# 对比度和伽马校正
alpha, beta, gamma = 1.5, 0, 0.8
depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=alpha, beta=beta)
lut = np.array([((i / 255.0) ** gamma) * 255 for i in range(256)]).astype("uint8")
depth_normalized = cv2.LUT(depth_normalized, lut)
# 伪彩色
depth_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET)
# 将有效深度覆盖到灰色背景上
mask_valid = (depth_clipped > 0)
for c in range(3):
background[:, :, c][mask_valid] = depth_colored[:, :, c][mask_valid]
depth_colored_final = background
# 裁剪宽度
height, width = depth_colored_final.shape[:2]
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored_final = depth_colored_final[:, left:right]
# 缓存图像
self.last_depth_frame = depth_colored_final.copy()
self.depth_frame_cache.append(depth_colored_final.copy())
# 推送SocketIO
success, buffer = cv2.imencode('.jpg', depth_colored_final, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
if success and self._socketio:
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
# 发送到femtobolt命名空间使用前端期望的事件名和数据格式
self._socketio.emit('femtobolt_frame', {
'depth_image': jpg_as_text,
'frame_count': frame_count,
'timestamp': time.time(),
'fps': self.actual_fps,
'device_id': self.device_id,
'depth_range': {
'min': self.depth_range_min,
'max': self.depth_range_max
}
}, namespace='/devices')
frame_count += 1
try:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 确保二维数据
if depth_image.ndim == 3 and depth_image.shape[2] == 1:
depth_image = depth_image[:, :, 0]
# 更新统计
self._update_statistics()
else:
time.sleep(0.01)
rows, cols = depth_image.shape[:2]
# 生成或复用网格背景
if (self._grid_bg is None) or (self._grid_size != (rows, cols)):
bg = np.ones((rows, cols, 3), dtype=np.uint8) * 128
cell_size = 50
grid_color = (255, 255, 255)
grid = np.zeros_like(bg)
for x in range(0, cols, cell_size):
cv2.line(grid, (x, 0), (x, rows), grid_color, 1)
for y in range(0, rows, cell_size):
cv2.line(grid, (0, y), (cols, y), grid_color, 1)
mask_grid = (grid.sum(axis=2) > 0)
bg[mask_grid] = grid[mask_grid]
self._grid_bg = bg
self._grid_size = (rows, cols)
# 深度范围过滤 + 归一化
depth_clipped = np.where(
(depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max),
depth_image, 0
)
if np.max(depth_clipped) > 0:
depth_normalized = ((depth_clipped - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8)
else:
depth_normalized = np.zeros((rows, cols), dtype=np.uint8)
# 对比度与伽马
depth_normalized = cv2.convertScaleAbs(depth_normalized, alpha=1.5, beta=0)
if self._gamma_lut is None or self._current_gamma != self.gamma_value:
self._update_gamma_lut()
depth_gamma = cv2.LUT(depth_normalized, self._gamma_lut)
# 伪彩色
depth_colored = cv2.applyColorMap(depth_gamma, cv2.COLORMAP_JET)
# 合成到背景避免copy使用背景副本
background = self._grid_bg.copy()
mask_valid = (depth_clipped > 0)
for c in range(3):
channel = background[:, :, c]
channel[mask_valid] = depth_colored[:, :, c][mask_valid]
depth_colored_final = background
# 裁剪宽度
height, width = depth_colored_final.shape[:2]
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored_final = depth_colored_final[:, left:right]
# 推送SocketIO
success, buffer = cv2.imencode('.jpg', depth_colored_final, self._encode_param)
if success and self._socketio:
jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8')
self._socketio.emit('femtobolt_frame', {
'depth_image': jpg_as_text,
'frame_count': frame_count,
'timestamp': now,
'fps': self.actual_fps,
'device_id': self.device_id,
'depth_range': {
'min': self.depth_range_min,
'max': self.depth_range_max
}
}, namespace='/devices')
frame_count += 1
self._last_send_time = now
# 更新统计
self._update_statistics()
else:
time.sleep(0.005)
except Exception as e:
# 捕获处理过程中出现异常,记录并继续
self.logger.error(f"FemtoBolt捕获处理错误: {e}")
finally:
# 无论处理成功与否都应释放capture以回收内存:contentReference[oaicite:3]{index=3}
try:
if hasattr(capture, 'release'):
capture.release()
except Exception:
pass
else:
time.sleep(0.01)
time.sleep(0.005)
except Exception as e:
self.logger.error(f'FemtoBolt帧推送失败: {e}')
time.sleep(0.1)
time.sleep(0.05)
time.sleep(1/30) # 30 FPS
# 降低空转CPU
time.sleep(0.001)
except Exception as e:
self.logger.error(f"FemtoBolt流处理异常: {e}")
finally:
self.is_streaming = False
self.logger.info("FemtoBolt流工作线程结束")
def _process_depth_image(self, depth_image) -> np.ndarray:
"""
处理深度图像
Args:
depth_image: 原始深度图像
Returns:
np.ndarray: 处理后的深度图像
"""
try:
# 确保输入是numpy数组
if not isinstance(depth_image, np.ndarray):
self.logger.error(f"输入的深度图像不是numpy数组: {type(depth_image)}")
return np.zeros((480, 640, 3), dtype=np.uint8)
# 深度范围过滤
mask = (depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max)
filtered_depth = np.where(mask, depth_image, 0)
# 归一化到0-255
if np.max(filtered_depth) > 0:
normalized = ((filtered_depth - self.depth_range_min) /
(self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8)
normalized = ((filtered_depth - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) * 255).astype(np.uint8)
else:
normalized = np.zeros_like(filtered_depth, dtype=np.uint8)
# 对比度增强
enhanced = cv2.convertScaleAbs(normalized, alpha=self.contrast_factor, beta=0)
# 伽马校正
gamma_corrected = np.power(enhanced / 255.0, self.gamma_value) * 255
gamma_corrected = gamma_corrected.astype(np.uint8)
if self._gamma_lut is None or self._current_gamma != self.gamma_value:
self._update_gamma_lut()
gamma_corrected = cv2.LUT(enhanced, self._gamma_lut)
# 伪彩色映射
if self.use_pseudo_color:
colored = cv2.applyColorMap(gamma_corrected, cv2.COLORMAP_JET)
else:
colored = cv2.cvtColor(gamma_corrected, cv2.COLOR_GRAY2BGR)
return colored
return colored
except Exception as e:
self.logger.error(f"处理深度图像失败: {e}")
return np.zeros((480, 640, 3), dtype=np.uint8)
def _send_depth_data(self, depth_image: np.ndarray, color_image: Optional[np.ndarray] = None):
"""
发送深度数据
Args:
depth_image: 深度图像
color_image: 彩色图像可选
"""
try:
# 压缩深度图像
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 85]
_, depth_buffer = cv2.imencode('.jpg', depth_image, encode_param)
depth_data = base64.b64encode(depth_buffer).decode('utf-8')
_, depth_buffer = cv2.imencode('.jpg', depth_image, self._encode_param)
depth_data = base64.b64encode(memoryview(depth_buffer).tobytes()).decode('utf-8')
# 准备发送数据
send_data = {
'timestamp': time.time(),
'frame_count': self.frame_count,
@ -633,18 +654,15 @@ class FemtoBoltManager(BaseDevice):
'last_update': time.strftime('%H:%M:%S')
}
# 添加彩色图像(如果有)
if color_image is not None:
_, color_buffer = cv2.imencode('.jpg', color_image, encode_param)
color_data = base64.b64encode(color_buffer).decode('utf-8')
_, color_buffer = cv2.imencode('.jpg', color_image, self._encode_param)
color_data = base64.b64encode(memoryview(color_buffer).tobytes()).decode('utf-8')
send_data['color_image'] = color_data
# 发送到SocketIO
self._socketio.emit('femtobolt_frame', send_data, namespace='/devices')
self._socketio.emit('femtobolt_frame', send_data, namespace='/devices')
except Exception as e:
self.logger.error(f"发送深度数据失败: {e}")
def _update_statistics(self):
"""
更新性能统计
@ -848,4 +866,4 @@ class FemtoBoltManager(BaseDevice):
self.logger.info("FemtoBolt资源清理完成")
except Exception as e:
self.logger.error(f"清理FemtoBolt资源失败: {e}")
self.logger.error(f"清理FemtoBolt资源失败: {e}")

View File

@ -45,6 +45,20 @@ except ImportError:
logger.warning("matplotlib不可用将使用简化的压力图像生成")
# 定义 C 结构体
class FPMS_DEVICE_INFO(ctypes.Structure):
_fields_ = [
("mn", ctypes.c_uint16),
("sn", ctypes.c_char * 64),
("fwVersion", ctypes.c_uint16),
("protoVer", ctypes.c_uint8),
("pid", ctypes.c_uint16),
("vid", ctypes.c_uint16),
("rows", ctypes.c_uint16),
("cols", ctypes.c_uint16),
]
class RealPressureDevice:
"""真实SMiTSense压力传感器设备"""
@ -63,11 +77,11 @@ class RealPressureDevice:
self.frame_size = 0
self.buf = None
# 设置DLL路径 - 使用正确的DLL文件名
# 设置DLL路径 - 使用Wrapper.dll
if dll_path is None:
# 尝试多个可能的DLL文件名
dll_candidates = [
os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'SMiTSenseUsbWrapper.dll'),
os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'Wrapper.dll'),
os.path.join(os.path.dirname(__file__), '..', 'dll', 'smitsense', 'SMiTSenseUsb-F3.0.dll')
]
dll_path = None
@ -97,31 +111,24 @@ class RealPressureDevice:
raise FileNotFoundError(f"DLL文件未找到: {self.dll_path}")
# 加载DLL
self.dll = ctypes.WinDLL(self.dll_path)
self.dll = ctypes.CDLL(self.dll_path)
logger.info(f"成功加载DLL: {self.dll_path}")
# 设置函数签名基于testsmit.py的工作代码
self.dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int]
self.dll.SMiTSenseUsb_Init.restype = ctypes.c_int
# 设置函数签名基于test22new.py的工作代码
self.dll.fpms_usb_init_wrap.argtypes = [ctypes.c_int]
self.dll.fpms_usb_init_wrap.restype = ctypes.c_int
self.dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)]
self.dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int
self.dll.fpms_usb_get_device_list_wrap.argtypes = [ctypes.POINTER(FPMS_DEVICE_INFO), ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
self.dll.fpms_usb_get_device_list_wrap.restype = ctypes.c_int
self.dll.SMiTSenseUsb_OpenAndStart.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_uint16),
ctypes.POINTER(ctypes.c_uint16)
]
self.dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int
self.dll.fpms_usb_open_wrap.argtypes = [ctypes.c_int, ctypes.POINTER(ctypes.c_uint64)]
self.dll.fpms_usb_open_wrap.restype = ctypes.c_int
self.dll.SMiTSenseUsb_GetLatestFrame.argtypes = [
ctypes.POINTER(ctypes.c_uint16),
ctypes.c_int
]
self.dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int
self.dll.fpms_usb_read_frame_wrap.argtypes = [ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint16), ctypes.c_size_t]
self.dll.fpms_usb_read_frame_wrap.restype = ctypes.c_int
self.dll.SMiTSenseUsb_StopAndClose.argtypes = []
self.dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int
self.dll.fpms_usb_close_wrap.argtypes = [ctypes.c_uint64]
self.dll.fpms_usb_close_wrap.restype = ctypes.c_int
logger.info("DLL函数签名设置完成")
@ -133,27 +140,30 @@ class RealPressureDevice:
"""初始化设备连接"""
try:
# 初始化USB连接
ret = self.dll.SMiTSenseUsb_Init(0)
if ret != 0:
raise RuntimeError(f"USB初始化失败: {ret}")
if self.dll.fpms_usb_init_wrap(0) != 0:
raise RuntimeError("初始化失败")
# 扫描设备
# 获取设备列表
count = ctypes.c_int()
ret = self.dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count))
if ret != 0 or count.value == 0:
raise RuntimeError(f"设备扫描失败或未找到设备: {ret}, count: {count.value}")
devs = (FPMS_DEVICE_INFO * 10)()
r = self.dll.fpms_usb_get_device_list_wrap(devs, 10, ctypes.byref(count))
if r != 0 or count.value == 0:
raise RuntimeError(f"未检测到设备: {r}, count: {count.value}")
logger.info(f"发现 {count.value} 个SMiTSense设备")
logger.info(f"检测到设备数量: {count.value}")
dev = devs[0]
self.rows, self.cols = dev.rows, dev.cols
logger.info(f"使用设备 SN={dev.sn.decode(errors='ignore')} {self.rows}x{self.cols}")
# 打开并启动第一个设备
rows = ctypes.c_uint16()
cols = ctypes.c_uint16()
ret = self.dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols))
if ret != 0:
raise RuntimeError(f"设备启动失败: {ret}")
# 打开设备
self.device_handle = ctypes.c_uint64()
r = self.dll.fpms_usb_open_wrap(0, ctypes.byref(self.device_handle))
if r != 0:
raise RuntimeError("设备打开失败")
self.rows = rows.value
self.cols = cols.value
logger.info(f"设备已打开, 句柄 = {self.device_handle.value}")
# 准备数据缓冲区
self.frame_size = self.rows * self.cols
self.buf_type = ctypes.c_uint16 * self.frame_size
self.buf = self.buf_type()
@ -173,9 +183,9 @@ class RealPressureDevice:
return self._get_empty_data()
# 读取原始压力数据
ret = self.dll.SMiTSenseUsb_GetLatestFrame(self.buf, self.frame_size)
if ret != 0:
logger.warning(f"读取数据帧失败: {ret}")
r = self.dll.fpms_usb_read_frame_wrap(self.device_handle.value, self.buf, self.frame_size)
if r != 0:
logger.warning(f"读取帧失败, code= {r}")
return self._get_empty_data()
# 转换为numpy数组
@ -248,10 +258,10 @@ class RealPressureDevice:
right_total_pct = float((right_total_abs / total_abs * 100) if total_abs > 0 else 0)
# 前后占比(相对于各自单足总压)
left_front_pct = float((left_front / left_total_abs * 100) if left_total_abs > 0 else 0)
left_rear_pct = float((left_rear / left_total_abs * 100) if left_total_abs > 0 else 0)
right_front_pct = float((right_front / right_total_abs * 100) if right_total_abs > 0 else 0)
right_rear_pct = float((right_rear / right_total_abs * 100) if right_total_abs > 0 else 0)
left_front_pct = float((left_front / total_abs * 100) if total_abs > 0 else 0)
left_rear_pct = float((left_rear / total_abs * 100) if total_abs > 0 else 0)
right_front_pct = float((right_front / total_abs * 100) if total_abs > 0 else 0)
right_rear_pct = float((right_rear / total_abs * 100) if total_abs > 0 else 0)
return {
'left_front': round(left_front_pct),
@ -284,7 +294,7 @@ class RealPressureDevice:
return ""
def _generate_heatmap_image(self, raw_data) -> str:
"""生成基于原始数据的热力图OpenCV实现固定范围映射效果与matplotlib一致"""
"""生成基于原始数据的热力图OpenCV实现自适应归一化,黑色背景"""
try:
import cv2
import numpy as np
@ -292,12 +302,20 @@ class RealPressureDevice:
from io import BytesIO
from PIL import Image
# 固定映射范围(与 matplotlib vmin/vmax 一致)
vmin, vmax = 0, 1000
norm_data = np.clip((raw_data - vmin) / (vmax - vmin) * 255, 0, 255).astype(np.uint8)
# 自适应归一化基于test22new.py的方法2
vmin = 10 # 最小阈值,低于此值显示为黑色
dmin, dmax = np.min(raw_data), np.max(raw_data)
norm_data = np.clip((raw_data - dmin) / max(dmax - dmin, 1) * 255, 0, 255).astype(np.uint8)
# 应用 jet 颜色映射
heatmap = cv2.applyColorMap(norm_data, cv2.COLORMAP_JET)
# 将低于阈值的区域设置为黑色
heatmap[raw_data <= vmin] = (0, 0, 0)
# 放大图像以便更好地显示细节
rows, cols = raw_data.shape
heatmap = cv2.resize(heatmap, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
# OpenCV 生成的是 BGR转成 RGB
heatmap_rgb = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
@ -363,6 +381,10 @@ class RealPressureDevice:
ax.text(2, 0.5, '左足', ha='center', va='center', fontsize=12, weight='bold')
ax.text(8, 0.5, '右足', ha='center', va='center', fontsize=12, weight='bold')
# 设置图形背景为黑色
fig.patch.set_facecolor('black')
ax.set_facecolor('black')
# 保存为base64
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight', dpi=100, facecolor='black')
@ -394,8 +416,8 @@ class RealPressureDevice:
def close(self):
"""显式关闭压力传感器连接"""
try:
if self.is_connected and self.dll:
self.dll.SMiTSenseUsb_StopAndClose()
if self.is_connected and self.dll and self.device_handle:
self.dll.fpms_usb_close_wrap(self.device_handle.value)
self.is_connected = False
logger.info('SMiTSense压力传感器连接已关闭')
except Exception as e:

View File

@ -15,7 +15,7 @@ backup_interval = 24
max_backups = 7
[CAMERA]
device_index = 0
device_index = 3
width = 1280
height = 720
fps = 30
@ -24,8 +24,8 @@ fps = 30
color_resolution = 1080P
depth_mode = NFOV_UNBINNED
fps = 30
depth_range_min = 1200
depth_range_max = 1500
depth_range_min = 1400
depth_range_max = 1700
[DEVICES]
imu_device_type = real

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
backend/tests/Wrapper.dll Normal file

Binary file not shown.

View File

@ -0,0 +1,57 @@
#pragma once
#define __DLL_EXPORTS__
#ifdef __DLL_EXPORTS__
#define DLLAPI __declspec(dllexport)
#else
#define DLLAPI __declspec(dllimport)
#endif
#include <windows.h>
#include <cstdint>
#include <vector>
using namespace std;
typedef void* SM_HANDLE;
typedef struct _FPMS_DEVICE
{
uint16_t mn;
std::string sn;
uint16_t fwVersion;
uint8_t protoVer;
uint16_t pid;
uint16_t vid;
uint16_t rows;
uint16_t cols;
} FPMS_DEVICE_T;
extern "C"
{
DLLAPI
int WINAPI fpms_usb_init(int debugFlag);
DLLAPI
int WINAPI fpms_usb_get_device_list(std::vector<FPMS_DEVICE_T>& gDevList);
DLLAPI
int WINAPI fpms_usb_open(FPMS_DEVICE_T dev, SM_HANDLE& gHandle);
DLLAPI
int WINAPI fpms_usb_read_frame(SM_HANDLE gHandle, uint16_t* frame);
DLLAPI
int WINAPI fpms_usb_config_sensitivity(SM_HANDLE gHandle, uint8_t bWriteFlash, const uint8_t level);
DLLAPI
int WINAPI fpms_usb_get_sensitivity(SM_HANDLE gHandle, uint8_t& level);
DLLAPI
int WINAPI fpms_usb_close(SM_HANDLE gHandle);
}

79
backend/tests/test111.py Normal file
View File

@ -0,0 +1,79 @@
import ctypes
import time
import numpy as np
# === DLL 加载 ===
dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll")
# === DLL 函数声明 ===
dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int]
dll.SMiTSenseUsb_Init.restype = ctypes.c_int
dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)]
dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int
dll.SMiTSenseUsb_OpenAndStart.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_uint16),
ctypes.POINTER(ctypes.c_uint16)
]
dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int
dll.SMiTSenseUsb_GetLatestFrame.argtypes = [
ctypes.POINTER(ctypes.c_uint16),
ctypes.c_int
]
dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int
dll.SMiTSenseUsb_StopAndClose.argtypes = []
dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int
# === 初始化设备 ===
ret = dll.SMiTSenseUsb_Init(0)
if ret != 0:
raise RuntimeError(f"Init failed: {ret}")
count = ctypes.c_int()
ret = dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count))
if ret != 0 or count.value == 0:
raise RuntimeError("No devices found")
# 打开设备
rows = ctypes.c_uint16()
cols = ctypes.c_uint16()
ret = dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols))
if ret != 0:
raise RuntimeError("OpenAndStart failed")
rows_val, cols_val = rows.value, cols.value
frame_size = rows_val * cols_val
buf_type = ctypes.c_uint16 * frame_size
buf = buf_type()
# 创建一个 NumPy 数组视图,复用内存
data_array = np.ctypeslib.as_array(buf).reshape((rows_val, cols_val))
print(f"设备已打开: {rows_val}x{cols_val}")
try:
while True:
ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
time.sleep(1)
# while True:
# ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
# if ret == 0:
# # data_array 已经复用缓冲区内存,每次直接访问即可
# # 例如打印最大值和前5行前5列的数据
# print("最大压力值:", data_array.max())
# print("前5x5数据:\n", data_array[:5, :5])
# else:
# print("读取数据帧失败")
# time.sleep(1) # 每秒读取一帧
except KeyboardInterrupt:
print("退出中...")
finally:
dll.SMiTSenseUsb_StopAndClose()
print("设备已关闭")

142
backend/tests/test22new.py Normal file
View File

@ -0,0 +1,142 @@
import ctypes
import numpy as np
import cv2
import sys
import time
# ------------------- DLL 加载 -------------------
dll = ctypes.CDLL(r"D:\BodyBalanceEvaluation\backend\tests\Wrapper.dll")
# 定义 C 结构体
class FPMS_DEVICE_INFO(ctypes.Structure):
_fields_ = [
("mn", ctypes.c_uint16),
("sn", ctypes.c_char * 64),
("fwVersion", ctypes.c_uint16),
("protoVer", ctypes.c_uint8),
("pid", ctypes.c_uint16),
("vid", ctypes.c_uint16),
("rows", ctypes.c_uint16),
("cols", ctypes.c_uint16),
]
# 函数声明
dll.fpms_usb_init_wrap.argtypes = [ctypes.c_int]
dll.fpms_usb_init_wrap.restype = ctypes.c_int
dll.fpms_usb_get_device_list_wrap.argtypes = [ctypes.POINTER(FPMS_DEVICE_INFO), ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
dll.fpms_usb_get_device_list_wrap.restype = ctypes.c_int
dll.fpms_usb_open_wrap.argtypes = [ctypes.c_int, ctypes.POINTER(ctypes.c_uint64)]
dll.fpms_usb_open_wrap.restype = ctypes.c_int
dll.fpms_usb_read_frame_wrap.argtypes = [ctypes.c_uint64, ctypes.POINTER(ctypes.c_uint16), ctypes.c_size_t]
dll.fpms_usb_read_frame_wrap.restype = ctypes.c_int
dll.fpms_usb_close_wrap.argtypes = [ctypes.c_uint64]
dll.fpms_usb_close_wrap.restype = ctypes.c_int
# ------------------- 初始化 -------------------
if dll.fpms_usb_init_wrap(0) != 0:
print("初始化失败")
sys.exit(1)
# 获取设备列表
count = ctypes.c_int()
devs = (FPMS_DEVICE_INFO * 10)()
r = dll.fpms_usb_get_device_list_wrap(devs, 10, ctypes.byref(count))
if r != 0 or count.value == 0:
print("未检测到设备")
sys.exit(1)
print("检测到设备数量:", count.value)
dev = devs[0]
rows, cols = dev.rows, dev.cols
print(f"使用设备 SN={dev.sn.decode(errors='ignore')} {rows}x{cols}")
# 打开设备
handle = ctypes.c_uint64()
r = dll.fpms_usb_open_wrap(0, ctypes.byref(handle))
if r != 0:
print("设备打开失败")
sys.exit(1)
print("设备已打开, 句柄 =", handle.value)
# ------------------- 循环读取帧数据 -------------------
buf_len = rows * cols
FrameArray = (ctypes.c_uint16 * buf_len)()
vmin, vmax = 100, 1000 # 根据实际传感器数据范围调整
try:
while True:
r = dll.fpms_usb_read_frame_wrap(handle.value, FrameArray, buf_len)
if r != 0:
print("读取帧失败, code=", r)
time.sleep(0.05)
continue
# 转 numpy
data = np.frombuffer(FrameArray, dtype=np.uint16).reshape((rows, cols))
# ------------------- 方法1: 固定线性映射 -------------------
norm1 = np.clip((data - vmin) / (vmax - vmin) * 255, 0, 255).astype(np.uint8)
heatmap1 = cv2.applyColorMap(norm1, cv2.COLORMAP_JET)
heatmap1[data <= vmin] = (0,0,0)
heatmap1 = cv2.resize(heatmap1, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
cv2.imshow("方法1: 固定线性", heatmap1)
# ------------------- 方法2: 自适应归一化 -------------------
dmin, dmax = np.min(data), np.max(data)
norm2 = np.clip((data - dmin) / max(dmax - dmin, 1) * 255, 0, 255).astype(np.uint8)
heatmap2 = cv2.applyColorMap(norm2, cv2.COLORMAP_JET)
heatmap2[data <= vmin] = (0,0,0)
heatmap2 = cv2.resize(heatmap2, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
cv2.imshow("方法2: 自适应归一化", heatmap2)
# ------------------- 方法3: 对数映射 -------------------
log_data = np.log1p(data - vmin)
log_max = np.max(log_data)
norm3 = np.clip(log_data / max(log_max, 1) * 255, 0, 255).astype(np.uint8)
heatmap3 = cv2.applyColorMap(norm3, cv2.COLORMAP_JET)
heatmap3[data <= vmin] = (0,0,0)
heatmap3 = cv2.resize(heatmap3, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
cv2.imshow("方法3: 对数映射", heatmap3)
# ------------------- 方法4: 平方根映射 -------------------
sqrt_data = np.sqrt(np.clip(data - vmin, 0, None))
sqrt_max = np.max(sqrt_data)
norm4 = np.clip(sqrt_data / max(sqrt_max, 1) * 255, 0, 255).astype(np.uint8)
heatmap4 = cv2.applyColorMap(norm4, cv2.COLORMAP_JET)
heatmap4[data <= vmin] = (0,0,0)
heatmap4 = cv2.resize(heatmap4, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
cv2.imshow("方法4: 平方根映射", heatmap4)
# ------------------- 方法5: 分段颜色映射 -------------------
heatmap5 = np.zeros((rows, cols, 3), dtype=np.uint8)
# 定义压力区间和对应颜色 (B,G,R)
bins = [vmin, 250, 400, 600, 800, vmax]
colors = [
(0,0,0),
(0,0,255), # 蓝
(0,255,0), # 绿
(0,255,255), # 黄
(255,0,0), # 红
]
for i in range(len(bins)-1):
mask = (data > bins[i]) & (data <= bins[i+1])
heatmap5[mask] = colors[i]
heatmap5 = cv2.resize(heatmap5, (cols*4, rows*4), interpolation=cv2.INTER_NEAREST)
cv2.imshow("方法5: 分段映射", heatmap5)
# ------------------- 退出 -------------------
if cv2.waitKey(1) & 0xFF == 27: # ESC 退出
break
except KeyboardInterrupt:
pass
# ------------------- 清理 -------------------
dll.fpms_usb_close_wrap(handle.value)
cv2.destroyAllWindows()
print("已退出")

View File

@ -1,479 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SMiTSense足部压力传感器DLL测试程序
测试SMiTSenseUsb-F3.0.dll的Python接口调用
"""
import ctypes
import os
import sys
import time
from ctypes import Structure, c_int, c_float, c_char_p, c_void_p, c_uint32, POINTER, byref
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class SMiTSensePressureSensor:
"""
SMiTSense足部压力传感器Python接口类
"""
def __init__(self, dll_path=None):
"""
初始化SMiTSense压力传感器
Args:
dll_path: DLL文件路径如果为None则使用默认路径
"""
self.dll = None
self.device_handle = None
self.is_connected = False
# 设置DLL路径
if dll_path is None:
dll_path = os.path.join(os.path.dirname(__file__), 'SMiTSenseUsb-F3.0.dll')
self.dll_path = dll_path
self._load_dll()
def _load_dll(self):
"""
加载SMiTSense DLL并设置函数签名
"""
try:
if not os.path.exists(self.dll_path):
raise FileNotFoundError(f"DLL文件未找到: {self.dll_path}")
# 加载DLL
self.dll = ctypes.WinDLL(self.dll_path)
logger.info(f"成功加载DLL: {self.dll_path}")
# 设置函数签名
self._setup_function_signatures()
except Exception as e:
logger.error(f"加载DLL失败: {e}")
raise
def _setup_function_signatures(self):
"""
设置DLL函数的参数类型和返回类型
"""
try:
# fpms_usb_init - 初始化USB连接
self.dll.fpms_usb_init.argtypes = []
self.dll.fpms_usb_init.restype = c_int
# fpms_usb_get_device_list - 获取设备列表
self.dll.fpms_usb_get_device_list.argtypes = [POINTER(c_int)]
self.dll.fpms_usb_get_device_list.restype = c_int
# fpms_usb_open - 打开设备
self.dll.fpms_usb_open.argtypes = [c_int]
self.dll.fpms_usb_open.restype = c_void_p
# fpms_usb_close - 关闭设备
self.dll.fpms_usb_close.argtypes = [c_void_p]
self.dll.fpms_usb_close.restype = c_int
# fpms_usb_read_frame - 读取压力数据帧
self.dll.fpms_usb_read_frame.argtypes = [c_void_p, POINTER(c_float), c_int]
self.dll.fpms_usb_read_frame.restype = c_int
# fpms_usb_get_sensitivity - 获取灵敏度
self.dll.fpms_usb_get_sensitivity.argtypes = [c_void_p, POINTER(c_float)]
self.dll.fpms_usb_get_sensitivity.restype = c_int
# fpms_usb_config_sensitivity - 配置灵敏度
self.dll.fpms_usb_config_sensitivity.argtypes = [c_void_p, c_float]
self.dll.fpms_usb_config_sensitivity.restype = c_int
logger.info("DLL函数签名设置完成")
except AttributeError as e:
logger.error(f"设置函数签名失败,可能是函数名不匹配: {e}")
raise
def initialize(self):
"""
初始化USB连接
Returns:
bool: 初始化是否成功
"""
try:
result = self.dll.fpms_usb_init()
if result == 0:
logger.info("USB连接初始化成功")
return True
else:
logger.error(f"USB连接初始化失败错误码: {result}")
return False
except Exception as e:
logger.error(f"初始化异常: {e}")
return False
def get_device_list(self):
"""
获取可用设备列表
Returns:
list: 设备ID列表
"""
try:
device_count = c_int()
result = self.dll.fpms_usb_get_device_list(byref(device_count))
if result == 0:
count = device_count.value
logger.info(f"发现 {count} 个SMiTSense设备")
return list(range(count))
else:
logger.error(f"获取设备列表失败,错误码: {result}")
return []
except Exception as e:
logger.error(f"获取设备列表异常: {e}")
return []
def connect(self, device_id=0):
"""
连接到指定设备
Args:
device_id: 设备ID默认为0
Returns:
bool: 连接是否成功
"""
try:
self.device_handle = self.dll.fpms_usb_open(device_id)
if self.device_handle:
self.is_connected = True
logger.info(f"成功连接到设备 {device_id}")
return True
else:
logger.error(f"连接设备 {device_id} 失败")
return False
except Exception as e:
logger.error(f"连接设备异常: {e}")
return False
def disconnect(self):
"""
断开设备连接
Returns:
bool: 断开是否成功
"""
try:
if self.device_handle and self.is_connected:
result = self.dll.fpms_usb_close(self.device_handle)
if result == 0:
self.is_connected = False
self.device_handle = None
logger.info("设备连接已断开")
return True
else:
logger.error(f"断开连接失败,错误码: {result}")
return False
else:
logger.warning("设备未连接,无需断开")
return True
except Exception as e:
logger.error(f"断开连接异常: {e}")
return False
def read_pressure_data(self, sensor_count=16):
"""
读取压力传感器数据
Args:
sensor_count: 传感器数量默认16个
Returns:
list: 压力值列表如果失败返回None
"""
try:
if not self.is_connected or not self.device_handle:
logger.error("设备未连接")
return None
# 创建压力数据缓冲区
pressure_data = (c_float * sensor_count)()
result = self.dll.fpms_usb_read_frame(
self.device_handle,
pressure_data,
sensor_count
)
if result == 0:
# 转换为Python列表
data_list = [pressure_data[i] for i in range(sensor_count)]
logger.debug(f"读取压力数据成功: {data_list}")
return data_list
else:
logger.error(f"读取压力数据失败,错误码: {result}")
return None
except Exception as e:
logger.error(f"读取压力数据异常: {e}")
return None
def get_sensitivity(self):
"""
获取传感器灵敏度
Returns:
float: 灵敏度值如果失败返回None
"""
try:
if not self.is_connected or not self.device_handle:
logger.error("设备未连接")
return None
sensitivity = c_float()
result = self.dll.fpms_usb_get_sensitivity(self.device_handle, byref(sensitivity))
if result == 0:
value = sensitivity.value
logger.info(f"当前灵敏度: {value}")
return value
else:
logger.error(f"获取灵敏度失败,错误码: {result}")
return None
except Exception as e:
logger.error(f"获取灵敏度异常: {e}")
return None
def set_sensitivity(self, sensitivity_value):
"""
设置传感器灵敏度
Args:
sensitivity_value: 灵敏度值
Returns:
bool: 设置是否成功
"""
try:
if not self.is_connected or not self.device_handle:
logger.error("设备未连接")
return False
result = self.dll.fpms_usb_config_sensitivity(
self.device_handle,
c_float(sensitivity_value)
)
if result == 0:
logger.info(f"灵敏度设置成功: {sensitivity_value}")
return True
else:
logger.error(f"设置灵敏度失败,错误码: {result}")
return False
except Exception as e:
logger.error(f"设置灵敏度异常: {e}")
return False
def get_foot_pressure_zones(self, pressure_data):
"""
将原始压力数据转换为足部区域压力
Args:
pressure_data: 原始压力数据列表
Returns:
dict: 足部各区域压力数据
"""
if not pressure_data or len(pressure_data) < 16:
return None
try:
# 假设16个传感器的布局可根据实际传感器布局调整
# 左脚前部: 传感器 0-3
# 左脚后部: 传感器 4-7
# 右脚前部: 传感器 8-11
# 右脚后部: 传感器 12-15
left_front = sum(pressure_data[0:4])
left_rear = sum(pressure_data[4:8])
right_front = sum(pressure_data[8:12])
right_rear = sum(pressure_data[12:16])
left_total = left_front + left_rear
right_total = right_front + right_rear
total_pressure = left_total + right_total
return {
'left_front': left_front,
'left_rear': left_rear,
'right_front': right_front,
'right_rear': right_rear,
'left_total': left_total,
'right_total': right_total,
'total_pressure': total_pressure,
'raw_data': pressure_data
}
except Exception as e:
logger.error(f"处理足部压力区域数据异常: {e}")
return None
def test_smitsense_sensor():
"""
测试SMiTSense压力传感器的主要功能
"""
logger.info("开始SMiTSense压力传感器测试")
# 创建传感器实例
sensor = SMiTSensePressureSensor()
try:
# 1. 初始化
logger.info("1. 初始化USB连接...")
if not sensor.initialize():
logger.error("初始化失败,测试终止")
return False
# 2. 获取设备列表
logger.info("2. 获取设备列表...")
devices = sensor.get_device_list()
if not devices:
logger.warning("未发现SMiTSense设备")
return False
logger.info(f"发现设备: {devices}")
# 3. 连接第一个设备
logger.info("3. 连接设备...")
if not sensor.connect(devices[0]):
logger.error("连接设备失败")
return False
# 4. 获取当前灵敏度
logger.info("4. 获取传感器灵敏度...")
sensitivity = sensor.get_sensitivity()
if sensitivity is not None:
logger.info(f"当前灵敏度: {sensitivity}")
# 5. 设置新的灵敏度(可选)
logger.info("5. 设置传感器灵敏度...")
if sensor.set_sensitivity(1.0):
logger.info("灵敏度设置成功")
# 6. 读取压力数据
logger.info("6. 开始读取压力数据...")
for i in range(10): # 读取10次数据
pressure_data = sensor.read_pressure_data()
if pressure_data:
# 转换为足部区域数据
foot_zones = sensor.get_foot_pressure_zones(pressure_data)
if foot_zones:
logger.info(f"{i+1}次读取:")
logger.info(f" 左脚前部: {foot_zones['left_front']:.2f}")
logger.info(f" 左脚后部: {foot_zones['left_rear']:.2f}")
logger.info(f" 右脚前部: {foot_zones['right_front']:.2f}")
logger.info(f" 右脚后部: {foot_zones['right_rear']:.2f}")
logger.info(f" 总压力: {foot_zones['total_pressure']:.2f}")
else:
logger.warning(f"{i+1}次读取: 数据处理失败")
else:
logger.warning(f"{i+1}次读取: 无数据")
time.sleep(0.1) # 等待100ms
logger.info("压力数据读取测试完成")
return True
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
return False
finally:
# 7. 断开连接
logger.info("7. 断开设备连接...")
sensor.disconnect()
logger.info("SMiTSense压力传感器测试结束")
def test_dll_functions():
"""
测试DLL函数的基本调用不需要实际硬件
"""
logger.info("开始DLL函数基本调用测试")
try:
sensor = SMiTSensePressureSensor()
logger.info("DLL加载成功")
# 测试初始化函数
logger.info("测试初始化函数...")
result = sensor.initialize()
logger.info(f"初始化结果: {result}")
# 测试获取设备列表
logger.info("测试获取设备列表...")
devices = sensor.get_device_list()
logger.info(f"设备列表: {devices}")
logger.info("DLL函数基本调用测试完成")
return True
except Exception as e:
logger.error(f"DLL函数测试失败: {e}")
return False
if __name__ == "__main__":
print("SMiTSense足部压力传感器DLL测试程序")
print("=" * 50)
# 检查DLL文件是否存在
dll_path = os.path.join(os.path.dirname(__file__), 'SMiTSenseUsb-F3.0.dll')
if not os.path.exists(dll_path):
print(f"错误: DLL文件不存在 - {dll_path}")
print("请确保SMiTSenseUsb-F3.0.dll文件在当前目录中")
sys.exit(1)
print(f"DLL文件路径: {dll_path}")
print()
# 运行测试
try:
# 首先测试DLL基本功能
print("1. DLL基本功能测试")
print("-" * 30)
if test_dll_functions():
print("✅ DLL基本功能测试通过")
else:
print("❌ DLL基本功能测试失败")
print()
# 然后测试完整的传感器功能(需要硬件)
print("2. 完整传感器功能测试(需要硬件连接)")
print("-" * 30)
if test_smitsense_sensor():
print("✅ 传感器功能测试通过")
else:
print("❌ 传感器功能测试失败(可能是硬件未连接)")
except KeyboardInterrupt:
print("\n用户中断测试")
except Exception as e:
print(f"\n测试过程中发生未处理的异常: {e}")
import traceback
traceback.print_exc()
print("\n测试程序结束")

View File

@ -4,7 +4,7 @@ import numpy as np
import cv2
# === DLL 加载 ===
dll = ctypes.WinDLL(r"D:\Trae_space\BodyBalanceEvaluation\backend\tests\SMiTSenseUsbWrapper.dll")
dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll")
dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int]
dll.SMiTSenseUsb_Init.restype = ctypes.c_int
@ -76,7 +76,7 @@ try:
break
else:
print("读取数据帧失败")
time.sleep(0.05) # 20 FPS
time.sleep(1) # 20 FPS
except KeyboardInterrupt:
pass
finally: