IMU校准角度归一化功能提交修改

This commit is contained in:
zhaozilong12 2025-08-21 17:35:09 +08:00
parent 1deb2425b8
commit 8a2319e108
7 changed files with 161 additions and 115 deletions

2
.gitignore vendored
View File

@ -21420,3 +21420,5 @@ frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js
backend/devices/utils/config.ini

View File

@ -67,7 +67,6 @@ class RealIMUDevice:
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data:
@ -76,15 +75,14 @@ class RealIMUDevice:
# 应用校准偏移
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
angle=head_pose['rotation'] - self.head_pose_offset['rotation']
# 减去基准值(零点偏移)
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['rotation'] = ((angle + 180) % 360) - 180
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
@staticmethod
def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF
@ -115,7 +113,7 @@ class RealIMUDevice:
'yaw': yaw,
'temperature': temp
}
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
# print(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data
else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
@ -356,37 +354,16 @@ class IMUManager(BaseDevice):
self.logger.info('开始IMU快速零点校准...')
# 收集校准样本
calibration_samples = []
sample_count = 50 # 减少样本数量以加快校准速度
# 直接读取一次原始数据作为校准偏移量
raw_data = self.imu_device.read_data(apply_calibration=False)
if not raw_data or 'head_pose' not in raw_data:
return {'status': 'error', 'error': '无法读取IMU原始数据'}
for i in range(sample_count):
try:
# 读取原始数据(不应用校准)
raw_data = self.imu_device.read_data(apply_calibration=False)
if raw_data and 'head_pose' in raw_data:
calibration_samples.append(raw_data['head_pose'])
time.sleep(0.02) # 20ms间隔
except Exception as e:
self.logger.warning(f'校准样本采集失败: {e}')
continue
if len(calibration_samples) < sample_count * 0.7:
return {
'status': 'error',
'error': f'校准样本不足: {len(calibration_samples)}/{sample_count}'
}
# 计算平均值作为零点偏移
rotation_sum = sum(sample['rotation'] for sample in calibration_samples)
tilt_sum = sum(sample['tilt'] for sample in calibration_samples)
pitch_sum = sum(sample['pitch'] for sample in calibration_samples)
count = len(calibration_samples)
# 使用当前姿态作为零点偏移
self.head_pose_offset = {
'rotation': rotation_sum / count,
'tilt': tilt_sum / count,
'pitch': pitch_sum / count
'rotation': raw_data['head_pose']['rotation'],
'tilt': raw_data['head_pose']['tilt'],
'pitch': raw_data['head_pose']['pitch']
}
# 应用校准到设备
@ -396,8 +373,7 @@ class IMUManager(BaseDevice):
self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}')
return {
'status': 'success',
'head_pose_offset': self.head_pose_offset,
'samples_used': count
'head_pose_offset': self.head_pose_offset
}
except Exception as e:
@ -504,8 +480,8 @@ class IMUManager(BaseDevice):
if data:
# 缓存数据
self.data_buffer.append(data)
self.last_valid_data = data
# self.data_buffer.append(data)
# self.last_valid_data = data
# 发送数据到前端
if self._socketio:

View File

@ -29,7 +29,7 @@ depth_range_max = 1700
[DEVICES]
imu_device_type = real
imu_port = COM3
imu_port = COM8
imu_baudrate = 9600
pressure_device_type = real
pressure_use_mock = False

View File

@ -947,7 +947,7 @@ class AppServer:
self.logger.error(f'校准设备失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/devices/imu/calibrate', methods=['POST'])
@self.app.route('/api/devices/calibrate/imu', methods=['POST'])
def calibrate_imu():
"""校准IMU"""
try:

View File

@ -1,79 +1,53 @@
import ctypes
import time
import numpy as np
from PIL import Image
import colorsys
# === DLL 加载 ===
dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll")
def get_unique_colors(image_path):
img = Image.open(image_path).convert("RGB")
unique_colors = list(set(img.getdata()))
return unique_colors
# === DLL 函数声明 ===
dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int]
dll.SMiTSenseUsb_Init.restype = ctypes.c_int
def get_representative_colors(colors, n=12):
# 按亮度排序并均匀抽取
def brightness(rgb):
r, g, b = rgb
return 0.2126*r + 0.7152*g + 0.0722*b
colors.sort(key=brightness)
total = len(colors)
if total <= n:
return colors
step = total / n
return [colors[int(i*step)] for i in range(n)]
dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)]
dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int
def sort_colors_by_hue(colors):
# 转为 HSV并按色相排序
def rgb_to_hue(rgb):
r, g, b = [x/255.0 for x in rgb]
h, s, v = colorsys.rgb_to_hsv(r, g, b)
return h
return sorted(colors, key=rgb_to_hue)
dll.SMiTSenseUsb_OpenAndStart.argtypes = [
ctypes.c_int,
ctypes.POINTER(ctypes.c_uint16),
ctypes.POINTER(ctypes.c_uint16)
]
dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int
def show_color_preview(colors, width=600, height_per_color=50):
n = len(colors)
height = n * height_per_color
img = Image.new("RGB", (width, height))
for i, color in enumerate(colors):
for y in range(i*height_per_color, (i+1)*height_per_color):
for x in range(width):
img.putpixel((x, y), color)
img.show()
dll.SMiTSenseUsb_GetLatestFrame.argtypes = [
ctypes.POINTER(ctypes.c_uint16),
ctypes.c_int
]
dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int
dll.SMiTSenseUsb_StopAndClose.argtypes = []
dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int
# === 初始化设备 ===
ret = dll.SMiTSenseUsb_Init(0)
if ret != 0:
raise RuntimeError(f"Init failed: {ret}")
count = ctypes.c_int()
ret = dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count))
if ret != 0 or count.value == 0:
raise RuntimeError("No devices found")
# 打开设备
rows = ctypes.c_uint16()
cols = ctypes.c_uint16()
ret = dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols))
if ret != 0:
raise RuntimeError("OpenAndStart failed")
rows_val, cols_val = rows.value, cols.value
frame_size = rows_val * cols_val
buf_type = ctypes.c_uint16 * frame_size
buf = buf_type()
# 创建一个 NumPy 数组视图,复用内存
data_array = np.ctypeslib.as_array(buf).reshape((rows_val, cols_val))
print(f"设备已打开: {rows_val}x{cols_val}")
try:
while True:
ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
time.sleep(1)
# while True:
# ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
# if ret == 0:
# # data_array 已经复用缓冲区内存,每次直接访问即可
# # 例如打印最大值和前5行前5列的数据
# print("最大压力值:", data_array.max())
# print("前5x5数据:\n", data_array[:5, :5])
# else:
# print("读取数据帧失败")
# time.sleep(1) # 每秒读取一帧
except KeyboardInterrupt:
print("退出中...")
finally:
dll.SMiTSenseUsb_StopAndClose()
print("设备已关闭")
if __name__ == "__main__":
image_path = r"D:\项目资料\技术文档资料\中康项目资料\11.png"
colors = get_unique_colors(image_path)
rep_colors = get_representative_colors(colors, 12)
sorted_colors = sort_colors_by_hue(rep_colors)
print("12 个代表性颜色(按彩虹顺序):")
for i, color in enumerate(sorted_colors, 1):
print(f"{i}: {color}")
show_color_preview(sorted_colors)

View File

@ -59,7 +59,7 @@ def parse_packet(data):
# else:
# return f"未知包类型: {packet_type:#04x}"
def read_imu(port='COM6', baudrate=9600):
def read_imu(port='COM8', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray()

View File

@ -0,0 +1,94 @@
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import pykinect_azure as pykinect
class FemtoBoltContourViewer:
def __init__(self, depth_min=900, depth_max=1100):
self.depth_min = depth_min
self.depth_max = depth_max
# 自定义离散彩虹色,层次明显
colors = [
'darkblue', 'blue', 'cyan', 'lime', 'yellow',
'orange', 'red', 'darkred'
]
self.cmap = ListedColormap(colors)
self.device_handle = None
self.pykinect = None
self.config = None
def _load_sdk(self):
"""加载并初始化 FemtoBolt SDK"""
base_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "bin", "k4a.dll")
self.pykinect = pykinect
self.pykinect.initialize_libraries(track_body=False, module_k4a_path=dll_path)
def _configure_device(self):
"""配置 FemtoBolt 深度相机"""
self.config = self.pykinect.default_configuration
self.config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15
self.config.synchronized_images_only = False
self.config.color_resolution = 0
self.device_handle = self.pykinect.start_device(config=self.config)
def run(self):
self._load_sdk()
self._configure_device()
plt.ion() # 打开交互模式
fig, ax = plt.subplots(figsize=(7, 7))
print("FemtoBolt 深度相机启动成功,关闭窗口或 Ctrl+C 退出")
# 设置离散等高线层次
levels = np.linspace(self.depth_min, self.depth_max, len(self.cmap.colors) + 1)
try:
while plt.fignum_exists(fig.number): # 窗口存在才继续
capture = self.device_handle.update()
if capture is None:
continue
ret, depth_image = capture.get_depth_image()
if not ret or depth_image is None:
continue
depth = depth_image.astype(np.uint16)
# 限制深度范围
depth[depth > self.depth_max] = 0
depth[depth < self.depth_min] = 0
depth = depth[0:350, 0:350]
# 屏蔽无效值
depth_masked = np.ma.masked_equal(depth, 0)
# 背景灰色
background = np.ones_like(depth) * 0.5
# 绘制
ax.clear()
ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
ax.grid(True, which='both', axis='both', color='white',
linestyle='--', linewidth=1, zorder=0)
ax.contourf(depth_masked, levels=levels, cmap=self.cmap,
vmin=self.depth_min, vmax=self.depth_max,
origin='upper', zorder=2)
plt.pause(0.05)
except KeyboardInterrupt:
print("检测到退出信号,结束程序")
finally:
if self.device_handle:
self.device_handle.stop()
self.device_handle.close()
plt.close(fig)
if __name__ == "__main__":
viewer = FemtoBoltContourViewer(depth_min=900, depth_max=1100)
viewer.run()