接入头部姿态IMU

This commit is contained in:
zhaozilong12 2025-08-12 14:33:20 +08:00
parent 625e372c11
commit a93a7fa712
13 changed files with 6737 additions and 214 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -23,6 +23,9 @@ from typing import Dict, List, Optional, Any, Tuple
from concurrent.futures import ThreadPoolExecutor
import logging
# 添加串口通信支持
import serial
# matplotlib相关导入用于深度图渲染
try:
from matplotlib.colors import LinearSegmentedColormap
@ -75,8 +78,12 @@ class DeviceManager:
# 推流状态和线程
self.camera_streaming = False
self.femtobolt_streaming = False
self.imu_streaming = False
self.pressure_streaming = False
self.camera_streaming_thread = None
self.femtobolt_streaming_thread = None
self.imu_thread = None
self.pressure_thread = None
self.streaming_stop_event = threading.Event()
# 全局帧缓存机制
@ -271,15 +278,40 @@ class DeviceManager:
def _init_imu(self):
"""初始化IMU传感器"""
logger.info('开始初始化IMU传感器...')
try:
# 这里应该连接实际的IMU设备
# 目前使用模拟数据
self.imu_device = MockIMUDevice()
# 从config.ini读取串口配置
config = configparser.ConfigParser()
# 优先读取根目录config.ini否则读取backend/config.ini
root_config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
app_root_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
logger.debug(f'尝试读取配置文件: {root_config_path}, {app_root_config_path}')
read_files = config.read([app_root_config_path, root_config_path], encoding='utf-8')
logger.debug(f'成功读取的配置文件: {read_files}')
if not read_files:
logger.warning('未能读取到config.ini将使用默认串口配置COM7@9600')
imu_port = config.get('DEVICES', 'imu_port', fallback='COM7')
imu_baudrate = config.getint('DEVICES', 'baudrate', fallback=9600)
logger.info(f'从配置文件读取IMU串口配置: {imu_port}@{imu_baudrate}')
# 初始化真实IMU设备
logger.debug('创建RealIMUDevice实例...')
self.imu_device = RealIMUDevice(port=imu_port, baudrate=imu_baudrate)
# 测试读取数据
logger.debug('测试IMU设备数据读取...')
test_data = self.imu_device.read_data()
logger.debug(f'IMU设备测试数据: {test_data}')
self.device_status['imu'] = True
logger.info('IMU传感器初始化成功模拟')
logger.info(f'IMU传感器初始化成功真实设备): {imu_port}@{imu_baudrate}')
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
logger.error(f'IMU传感器初始化失败: {e}', exc_info=True)
self.imu_device = None
self.device_status['imu'] = False
def _init_pressure_sensor(self):
"""初始化压力传感器"""
@ -385,33 +417,22 @@ class DeviceManager:
samples = []
for _ in range(100):
data = self.imu_device.read_data()
samples.append(data)
if data and 'head_pose' in data:
samples.append(data)
time.sleep(0.01)
# 计算零点偏移
accel_offset = {
'x': np.mean([s['accel']['x'] for s in samples]),
'y': np.mean([s['accel']['y'] for s in samples]),
'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿
}
gyro_offset = {
'x': np.mean([s['gyro']['x'] for s in samples]),
'y': np.mean([s['gyro']['y'] for s in samples]),
'z': np.mean([s['gyro']['z'] for s in samples])
}
if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行校准'}
# 计算头部姿态零点偏移(正立状态为标准零位)
head_pose_offset = {
'rotation': np.mean([s['head_pose']['rotation'] for s in samples if 'head_pose' in s]),
'tilt': np.mean([s['head_pose']['tilt'] for s in samples if 'head_pose' in s]),
'pitch': np.mean([s['head_pose']['pitch'] for s in samples if 'head_pose' in s])
'rotation': np.mean([s['head_pose']['rotation'] for s in samples]),
'tilt': np.mean([s['head_pose']['tilt'] for s in samples]),
'pitch': np.mean([s['head_pose']['pitch'] for s in samples])
}
calibration = {
'status': 'success',
'accel_offset': accel_offset,
'gyro_offset': gyro_offset,
'head_pose_offset': head_pose_offset, # 头部姿态零点偏移
'timestamp': datetime.now().isoformat()
}
@ -947,8 +968,8 @@ class DeviceManager:
if capture is not None:
ret, depth_image = capture.get_depth_image()
height2, width2 = depth_image.shape[:2]
logger.debug(f'FemtoBolt原始帧宽: {width2}')
logger.debug(f'FemtoBolt原始帧高: {height2}')
# logger.debug(f'FemtoBolt原始帧宽: {width2}')
# logger.debug(f'FemtoBolt原始帧高: {height2}')
if ret and depth_image is not None:
# 读取config.ini中的深度范围配置
@ -963,6 +984,14 @@ class DeviceManager:
depth_range_max = None
# 使用matplotlib渲染深度图参考display_x.py
if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# 在子线程中切换到非交互后端避免GUI警告
import matplotlib
try:
if matplotlib.get_backend().lower() != 'agg':
matplotlib.use('Agg', force=True)
logger.debug('切换matplotlib后端为Agg以适配子线程渲染')
except Exception:
pass
depth_image[depth_image > depth_range_max] = 0
depth_image[depth_image < depth_range_min] = 0
background = np.ones_like(depth_image) * 0.5
@ -972,21 +1001,19 @@ class DeviceManager:
'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue',
'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue']
mcmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
# plt.figure(figsize=(7, 7))
plt.figure(figsize=(width2/100, height2/100), dpi=100)
plt.imshow(background, origin='lower', cmap='gray', alpha=0.3)
plt.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=1, zorder=0)
plt.contourf(depth_masked, levels=200, cmap=mcmap, vmin=depth_range_min, vmax=depth_range_max, origin='upper', zorder=2)
# plt.axis('off')
plt.tight_layout(pad=0)
plt.draw()
plt_canvas = plt.gca().figure.canvas
plt_canvas.draw()
img = np.frombuffer(plt_canvas.tostring_rgb(), dtype=np.uint8)
img = img.reshape(plt_canvas.get_width_height()[::-1] + (3,))
plt.clf()
depth_colored = img
fig = plt.figure(figsize=(width2/100, height2/100), dpi=100)
ax = fig.add_subplot(111)
ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
ax.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=1, zorder=0)
ax.contourf(depth_masked, levels=200, cmap=mcmap, vmin=depth_range_min, vmax=depth_range_max, origin='upper', zorder=2)
fig.tight_layout(pad=0)
try:
fig.canvas.draw()
img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
depth_colored = img
finally:
plt.close(fig)
else:
# 如果没有matplotlib则使用原有OpenCV伪彩色映射
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
@ -996,16 +1023,16 @@ class DeviceManager:
depth_colored[mask_outside] = [0, 0, 0]
height, width = depth_colored.shape[:2]
logger.debug(f'FemtoBolt帧宽: {width}')
logger.debug(f'FemtoBolt帧高: {height}')
# logger.debug(f'FemtoBolt帧宽: {width}')
# logger.debug(f'FemtoBolt帧高: {height}')
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored = depth_colored[:, left:right]
height1, width1 = depth_colored.shape[:2]
logger.debug(f'FemtoBolt帧裁剪完以后得宽: {width1}')
logger.debug(f'FemtoBolt帧裁剪完以后得宽: {height1}')
# logger.debug(f'FemtoBolt帧裁剪完以后得宽: {width1}')
# logger.debug(f'FemtoBolt帧裁剪完以后得宽: {height1}')
# 保存处理好的身体帧到全局缓存
self._save_frame_to_cache(depth_colored.copy(), 'femtobolt')
@ -1041,40 +1068,34 @@ class DeviceManager:
logger.info('IMU头部姿态数据推流线程已启动')
try:
loop_count = 0
while self.imu_streaming and self.socketio:
try:
loop_count += 1
# 从IMU设备读取数据
imu_data = self.imu_device.read_data()
if imu_data and 'head_pose' in imu_data:
# 直接使用设备提供的头部姿态数据
# 直接使用设备提供的头部姿态数据,减少数据包装
head_pose = imu_data['head_pose']
# 构建完整的头部姿态数据
head_pose_data = {
'rotation': head_pose['rotation'], # 旋转角:左旋(-), 右旋(+)
'tilt': head_pose['tilt'], # 倾斜角:左倾(-), 右倾(+)
'pitch': head_pose['pitch'], # 俯仰角:俯角(-), 仰角(+)
'temperature': imu_data.get('temperature', 25),
'timestamp': imu_data['timestamp']
}
# 通过WebSocket发送头部姿态数据
# 优化:直接发送最精简的数据格式,避免重复时间戳
self.socketio.emit('imu_data', {
'head_pose': head_pose_data,
'timestamp': datetime.now().isoformat()
'rotation': head_pose.get('rotation'), # 旋转角:左旋(-), 右旋(+)
'tilt': head_pose.get('tilt'), # 倾斜角:左倾(-), 右倾(+)
'pitch': head_pose.get('pitch'), # 俯仰角:俯角(-), 仰角(+)
})
# 控制数据发送频率10Hz
time.sleep(0.1)
# 优化提高数据发送频率到30Hz降低延时
time.sleep(0.033)
except Exception as e:
logger.error(f'IMU数据推流异常: {e}')
time.sleep(0.1)
# 减少异常日志的详细程度
logger.warning(f'IMU数据推流异常: {e}')
time.sleep(0.033)
except Exception as e:
logger.error(f'IMU推流线程异常: {e}')
logger.error(f'IMU推流线程异常: {e}', exc_info=True)
finally:
logger.info('IMU头部姿态数据推流线程已结束')
@ -1090,7 +1111,7 @@ class DeviceManager:
if pressure_data and 'foot_pressure' in pressure_data:
foot_pressure = pressure_data['foot_pressure']
logger.error(f"压力传感器数据{foot_pressure}")
# logger.error(f"压力传感器数据{foot_pressure}")
# 获取各区域压力值
left_front = foot_pressure['left_front']
left_rear = foot_pressure['left_rear']
@ -1856,6 +1877,161 @@ class DeviceManager:
except Exception as e:
logger.error(f'清理过期帧失败: {e}')
class RealIMUDevice:
"""真实IMU设备通过串口读取姿态数据"""
def __init__(self, port: str = 'COM7', baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.buffer = bytearray()
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}')
self._connect()
def _connect(self):
try:
logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}')
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
if hasattr(self.ser, 'reset_input_buffer'):
try:
self.ser.reset_input_buffer()
logger.debug('已清空串口输入缓冲区')
except Exception as e:
logger.debug(f'重置串口输入缓冲区失败: {e}')
logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps')
except Exception as e:
logger.error(f'IMU设备连接失败: {e}', exc_info=True)
self.ser = None
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.calibration_data:
return raw_data
if 'head_pose' in raw_data:
before = raw_data['head_pose'].copy()
raw_data['head_pose']['rotation'] -= self.head_pose_offset.get('rotation', 0)
raw_data['head_pose']['tilt'] -= self.head_pose_offset.get('tilt', 0)
raw_data['head_pose']['pitch'] -= self.head_pose_offset.get('pitch', 0)
logger.debug(f"校准前: {before}, 校准后: {raw_data['head_pose']}")
return raw_data
@staticmethod
def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF
def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]:
if len(data) != 11:
logger.debug(f'无效数据包长度: {len(data)}')
return None
if data[0] != 0x55:
logger.debug(f'错误的包头: 0x{data[0]:02X}')
return None
if self._checksum(data) != data[-1]:
logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}')
return None
packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01°
roll, pitch, yaw, temp = vals
roll /= 100.0
pitch /= 100.0
yaw /= 100.0
temp = temp / 100.0
self.last_data = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'temperature': temp
}
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data
else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
return None
def read_data(self) -> Dict[str, Any]:
if not self.ser or not getattr(self.ser, 'is_open', False):
logger.warning('IMU串口未连接尝试重新连接...')
self._connect()
return {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
try:
bytes_waiting = self.ser.in_waiting
if bytes_waiting:
# logger.debug(f'串口缓冲区待读字节: {bytes_waiting}')
chunk = self.ser.read(bytes_waiting)
# logger.debug(f'读取到字节: {len(chunk)}')
self.buffer.extend(chunk)
while len(self.buffer) >= 11:
if self.buffer[0] != 0x55:
dropped = self.buffer.pop(0)
logger.debug(f'丢弃无效字节: 0x{dropped:02X}')
continue
packet = bytes(self.buffer[:11])
parsed = self._parse_packet(packet)
del self.buffer[:11]
if parsed is not None:
raw = {
'head_pose': {
'rotation': parsed['roll'], # rotation = roll
'tilt': parsed['yaw'], # tilt = yaw
'pitch': parsed['pitch'] # pitch = pitch
},
'temperature': parsed['temperature'],
'timestamp': datetime.now().isoformat()
}
# logger.debug(f'映射后的头部姿态: {raw}')
return self.apply_calibration(raw)
raw = {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw)
except Exception as e:
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
raw = {
'head_pose': {
'rotation': self.last_data['roll'],
'tilt': self.last_data['yaw'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw)
def __del__(self):
try:
if self.ser and getattr(self.ser, 'is_open', False):
self.ser.close()
logger.info('IMU设备串口已关闭')
except Exception:
pass
class MockIMUDevice:
"""模拟IMU设备"""

57
backend/lib_fpms_usb.h Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#define __DLL_EXPORTS__
#ifdef __DLL_EXPORTS__
#define DLLAPI __declspec(dllexport)
#else
#define DLLAPI __declspec(dllimport)
#endif
#include <windows.h>
#include <cstdint>
#include <vector>
using namespace std;
typedef void* SM_HANDLE;
typedef struct _FPMS_DEVICE
{
uint16_t mn;
std::string sn;
uint16_t fwVersion;
uint8_t protoVer;
uint16_t pid;
uint16_t vid;
uint16_t rows;
uint16_t cols;
} FPMS_DEVICE_T;
extern "C"
{
DLLAPI
int WINAPI fpms_usb_init(int debugFlag);
DLLAPI
int WINAPI fpms_usb_get_device_list(std::vector<FPMS_DEVICE_T>& gDevList);
DLLAPI
int WINAPI fpms_usb_open(FPMS_DEVICE_T dev, SM_HANDLE& gHandle);
DLLAPI
int WINAPI fpms_usb_read_frame(SM_HANDLE gHandle, uint16_t* frame);
DLLAPI
int WINAPI fpms_usb_config_sensitivity(SM_HANDLE gHandle, uint8_t bWriteFlash, const uint8_t level);
DLLAPI
int WINAPI fpms_usb_get_sensitivity(SM_HANDLE gHandle, uint8_t& level);
DLLAPI
int WINAPI fpms_usb_close(SM_HANDLE gHandle);
}

View File

@ -1,108 +1,126 @@
# import cv2
import cv2
# class CameraViewer:
# def __init__(self, device_index=0):
# self.device_index = device_index
# self.window_name = "Camera Viewer"
class CameraViewer:
def __init__(self, device_index=0):
self.device_index = device_index
self.window_name = "Camera Viewer"
# def start_stream(self):
# cap = cv2.VideoCapture(self.device_index)
# if not cap.isOpened():
# print(f"无法打开摄像头设备 {self.device_index}")
# return
def start_stream(self):
cap = cv2.VideoCapture(self.device_index)
if not cap.isOpened():
print(f"无法打开摄像头设备 {self.device_index}")
return
# cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
# while True:
# ret, frame = cap.read()
# if not ret:
# print("无法获取视频帧")
# break
while True:
ret, frame = cap.read()
if not ret:
print("无法获取视频帧")
break
# cv2.imshow(self.window_name, frame)
cv2.imshow(self.window_name, frame)
# if cv2.waitKey(1) & 0xFF == ord('q'):
# break
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# cap.release()
# cv2.destroyAllWindows()
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
# 修改这里的数字可以切换不同摄像头设备
viewer = CameraViewer(device_index=3)
viewer.start_stream()
# import ctypes
# from ctypes import c_int, c_uint16, c_uint8, c_char, c_char_p, Structure, POINTER, byref
# # 设备结构体对应wrapper中FPMS_DEVICE_C
# class FPMS_DEVICE_C(Structure):
# _pack_ = 1
# _fields_ = [
# ("mn", c_uint16),
# ("sn", c_char * 64),
# ("fwVersion", c_uint16),
# ("protoVer", c_uint8),
# ("pid", c_uint16),
# ("vid", c_uint16),
# ("rows", c_uint16),
# ("cols", c_uint16),
# ]
# # 加载DLL
# dll_path = r"D:\BodyBalanceEvaluation\backend\SMiTSenseUsbWrapper.dll"
# dll = ctypes.windll.LoadLibrary(dll_path)
# # 函数原型声明
# # int fpms_usb_init_c(int debugFlag);
# dll.fpms_usb_init_c.argtypes = [c_int]
# dll.fpms_usb_init_c.restype = c_int
# dll.fpms_usb_get_device_list_c.argtypes = [POINTER(FPMS_DEVICE_C), c_int]
# dll.fpms_usb_get_device_list_c.restype = c_int
# dll.fpms_usb_open_c.argtypes = [POINTER(FPMS_DEVICE_C), POINTER(ctypes.c_void_p)]
# dll.fpms_usb_open_c.restype = c_int
# # int fpms_usb_read_frame_c(void* handle, uint16_t* frame);
# dll.fpms_usb_read_frame_c.argtypes = [ctypes.c_void_p, POINTER(c_uint16)]
# dll.fpms_usb_read_frame_c.restype = c_int
# # int fpms_usb_close_c(void* handle);
# dll.fpms_usb_close_c.argtypes = [ctypes.c_void_p]
# dll.fpms_usb_close_c.restype = c_int
# # 其他函数如果需要可以类似声明
# def main():
# # 初始化
# ret = dll.fpms_usb_init_c(0)
# print(f"fpms_usb_init_c 返回值: {ret}")
# if ret != 0:
# print("初始化失败")
# return
# MAX_DEVICES = 8
# devices = (FPMS_DEVICE_C * MAX_DEVICES)() # 创建数组
# count = dll.fpms_usb_get_device_list_c(devices, MAX_DEVICES)
# print(f"设备数量: {count}")
# if count <= 0:
# print("未找到设备或错误")
# return
# for i in range(count):
# dev = devices[i]
# print(f"设备{i}: mn={dev.mn}, sn={dev.sn.decode(errors='ignore').rstrip(chr(0))}, fwVersion={dev.fwVersion}")
# # 打开第一个设备
# handle = ctypes.c_void_p()
# ret = dll.fpms_usb_open_c(byref(devices[0]), byref(handle))
# print(f"fpms_usb_open_c 返回值: {ret}")
# if ret != 0:
# print("打开设备失败")
# return
# # 假设帧大小是 rows * cols
# rows = devices[0].rows
# cols = devices[0].cols
# frame_size = rows * cols
# frame_buffer = (c_uint16 * frame_size)()
# ret = dll.fpms_usb_read_frame_c(handle, frame_buffer)
# print(f"fpms_usb_read_frame_c 返回值: {ret}")
# if ret == 0:
# # 打印前10个数据看看
# print("帧数据前10个点:", list(frame_buffer[:10]))
# else:
# print("读取帧失败")
# # 关闭设备
# ret = dll.fpms_usb_close_c(handle)
# print(f"fpms_usb_close_c 返回值: {ret}")
# if __name__ == "__main__":
# # 修改这里的数字可以切换不同摄像头设备
# viewer = CameraViewer(device_index=1)
# viewer.start_stream()
# import os
# import pefile
# def list_dll_exports(dll_path):
# """解析 DLL 并返回导出的函数列表"""
# try:
# pe = pefile.PE(dll_path)
# exports = []
# if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
# for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
# if exp.name:
# exports.append(exp.name.decode('utf-8'))
# else:
# exports.append(f"Ordinal_{exp.ordinal}")
# return exports
# except Exception as e:
# print(f"[错误] 无法解析 {dll_path}: {e}")
# return []
# def scan_directory_for_dll_functions(directory):
# """扫描目录下所有 DLL 文件并解析导出函数"""
# results = {}
# for root, _, files in os.walk(directory):
# for file in files:
# if file.lower().endswith(".dll"):
# dll_path = os.path.join(root, file)
# print(f"\n正在解析: {dll_path}")
# funcs = list_dll_exports(dll_path)
# results[dll_path] = funcs
# for func in funcs:
# print(f" {func}")
# return results
# if __name__ == "__main__":
# folder_path = r"D:\BodyBalanceEvaluation\backend\tests" # 这里改成你的 DLL 文件目录
# scan_directory_for_dll_functions(folder_path)
import ctypes
dll_path = r"D:\BodyBalanceEvaluation\backend\tests\SMiTSenseUsb-F3.0.dll"
mydll = ctypes.WinDLL(dll_path)
class FPMS_DEVICE_C(ctypes.Structure):
_fields_ = [
("mn", ctypes.c_uint8),
("sn", ctypes.c_char * 32),
("swVersion", ctypes.c_char * 32),
("rows", ctypes.c_uint16),
("cols", ctypes.c_uint16)
]
# 声明
mydll.fpms_usb_init.argtypes = [ctypes.c_int]
mydll.fpms_usb_init.restype = ctypes.c_int
mydll.fpms_usb_get_device_list.argtypes = [
ctypes.POINTER(FPMS_DEVICE_C),
ctypes.POINTER(ctypes.c_int)
]
mydll.fpms_usb_get_device_list.restype = ctypes.c_int
# 初始化
print("init:", mydll.fpms_usb_init(0))
# 获取设备列表
device_count = ctypes.c_int()
devices = (FPMS_DEVICE_C * 10)()
res = mydll.fpms_usb_get_device_list(devices, ctypes.byref(device_count))
print("get_device_list 返回值:", res, "设备数量:", device_count.value)
# 打印设备信息
for i in range(device_count.value):
dev = devices[i]
print(f"[设备 {i}] mn={dev.mn}, sn={dev.sn.decode(errors='ignore')}, "
f"swVersion={dev.swVersion.decode(errors='ignore')}, rows={dev.rows}, cols={dev.cols}")
# main()

View File

@ -0,0 +1,81 @@
import serial
import time
def checksum(data):
return sum(data[:-1]) & 0xFF
def parse_packet(data):
if len(data) != 11:
return None
if data[0] != 0x55:
return None
if checksum(data) != data[-1]:
print("校验失败")
return None
packet_type = data[1]
# 将后8字节分成4个16位有符号整数小端序
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x51: # 加速度单位0.001g
ax, ay, az, temp = vals
ax /= 1000
ay /= 1000
az /= 1000
temp = temp / 100 # 温度单位摄氏度
# return f"加速度 (g): x={ax:.3f}, y={ay:.3f}, z={az:.3f}, 温度={temp:.2f}℃"
elif packet_type == 0x52: # 角速度单位0.01°/s
wx, wy, wz, temp = vals
wx /= 100
wy /= 100
wz /= 100
temp = temp / 100
# return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃"
elif packet_type == 0x53: # 姿态角单位0.01°
roll, pitch, yaw, temp = vals
roll /= 100
pitch /= 100
yaw /= 100
temp = temp / 100
return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}"
elif packet_type == 0x54: # 磁力计单位uT
mx, my, mz, temp = vals
temp = temp / 100
# return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃"
elif packet_type == 0x56: # 气压单位Pa
p1, p2, p3, temp = vals
pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100 # 简单合成大部分IMU气压是3字节这里简单示范
temp = temp / 100
# return f"气压 (Pa): pressure={pressure:.2f}, 温度={temp:.2f}℃"
else:
return f"未知包类型: {packet_type}"
def read_imu(port='COM6', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray()
try:
while True:
bytes_waiting = ser.in_waiting
if bytes_waiting:
data = ser.read(bytes_waiting)
buffer.extend(data)
# 解析buffer中所有完整包
while len(buffer) >= 11:
if buffer[0] != 0x55:
buffer.pop(0)
continue
packet = buffer[:11]
result = parse_packet(packet)
if result:
print(result)
buffer = buffer[11:]
time.sleep(0.01)
except KeyboardInterrupt:
print("程序终止")
finally:
ser.close()
if __name__ == "__main__":
read_imu(port='COM8', baudrate=9600)

View File

@ -19,9 +19,11 @@ camera_index = 0
camera_width = 640
camera_height = 480
camera_fps = 30
imu_port = COM3
imu_port = COM8
imu_baudrate = 9600
pressure_port = COM4
[DETECTION]
default_duration = 60
sampling_rate = 30

View File

@ -42,6 +42,12 @@ def setup_debug_logging():
logging.getLogger('socketio').setLevel(logging.DEBUG)
logging.getLogger('engineio').setLevel(logging.DEBUG)
# 禁用第三方库的详细日志
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('PIL.PngImagePlugin').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.info('调试日志已启用')
return logger

View File

@ -974,66 +974,67 @@ const headPoseMaxValues = ref({
const headPoseHistory = ref([])
const headPoseData = ref({})
// IMUDOM
let lastIMUUpdateTs = 0
const IMU_MIN_INTERVAL_MS = 33 // 30Hz
let lastIMUValues = { rotation: null, tilt: null, pitch: null }
const IMU_CHANGE_EPS = 0.1 // 0.1°
//
const isTrackingMaxValues = ref(false)
// IMU姿
function handleIMUData(data) {
try {
if (data && data.head_pose) {
const headPose = data.head_pose
if (!data) return
// 姿
// console.log('🎯 IMU姿:', {
// rotation: headPose.rotation, // (-), (+)
// tilt: headPose.tilt, // (-), (+)
// pitch: headPose.pitch // (-), (+)
// })
if (rotationCharts) {
rotationCharts.setOption({
series: [{
data: [{
value: headPose.rotation.toFixed(1)
}]
}]
})
}
if (pitchCharts) {
pitchCharts.setOption({
series: [{
data: [{
value: headPose.pitch.toFixed(1)
}]
}]
})
}
if (tiltCharts) {
tiltCharts.setOption({
series: [{
data: [{
value: headPose.tilt.toFixed(1)
}]
}]
})
}
//
// console.log(`📐 姿 - : ${headPose.rotation.toFixed(1)}°, : ${headPose.tilt.toFixed(1)}°, : ${headPose.pitch.toFixed(1)}°`)
//
// 1) { rotation, tilt, pitch }
// 2) { head_pose: { rotation, tilt, pitch } }
const rotation = (data.rotation ?? (data.head_pose && data.head_pose.rotation))
const tilt = (data.tilt ?? (data.head_pose && data.head_pose.tilt))
const pitch = (data.pitch ?? (data.head_pose && data.head_pose.pitch))
//
// if (isTrackingMaxValues.value) {
updateHeadPoseMaxValues(headPose)
// }
//
//
//
// updateHeadPoseChart({
// rotation: headPose.rotation,
// tilt: headPose.tilt,
// pitch: headPose.pitch
// })
if (rotation === undefined || tilt === undefined || pitch === undefined) {
return
}
const now = (typeof performance !== 'undefined' && performance.now) ? performance.now() : Date.now()
// DOM
const tooSoon = (now - lastIMUUpdateTs) < IMU_MIN_INTERVAL_MS
const notSignificant = (
lastIMUValues.rotation !== null && Math.abs(rotation - lastIMUValues.rotation) < IMU_CHANGE_EPS &&
lastIMUValues.tilt !== null && Math.abs(tilt - lastIMUValues.tilt) < IMU_CHANGE_EPS &&
lastIMUValues.pitch !== null && Math.abs(pitch - lastIMUValues.pitch) < IMU_CHANGE_EPS
)
if (tooSoon && notSignificant) return
lastIMUUpdateTs = now
lastIMUValues = { rotation, tilt, pitch }
const rVal = Math.round(rotation * 10) / 10
const pVal = Math.round(pitch * 10) / 10
const tVal = Math.round(tilt * 10) / 10
if (rotationCharts) {
rotationCharts.setOption({
series: [{ data: [{ value: rVal }] }]
}, true, true)
}
if (pitchCharts) {
pitchCharts.setOption({
series: [{ data: [{ value: pVal }] }]
}, true, true)
}
if (tiltCharts) {
tiltCharts.setOption({
series: [{ data: [{ value: tVal }] }]
}, true, true)
}
// 使
updateHeadPoseMaxValues({ rotation, tilt, pitch })
} catch (error) {
console.error('❌ 处理IMU数据失败:', error)
}