This commit is contained in:
limengnan 2025-08-21 18:33:22 +08:00
commit a5e9ee749c
9 changed files with 239 additions and 193 deletions

2
.gitignore vendored
View File

@ -21420,3 +21420,5 @@ frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js frontend/src/renderer/src/services/api.js
frontend/src/renderer/src/services/api.js
backend/devices/utils/config.ini

View File

@ -12,7 +12,7 @@ import base64
import numpy as np import numpy as np
from typing import Optional, Dict, Any, Tuple from typing import Optional, Dict, Any, Tuple
import logging import logging
from collections import deque import queue
import gc import gc
try: try:
@ -60,7 +60,7 @@ class CameraManager(BaseDevice):
# 流控制 # 流控制
self.streaming_thread = None self.streaming_thread = None
# 减小缓存长度保留最近2帧即可避免累计占用 # 减小缓存长度保留最近2帧即可避免累计占用
self.frame_cache = deque(maxlen=int(config.get('frame_cache_len', 2))) self.frame_cache = queue.Queue(maxsize=int(config.get('frame_cache_len', 2)))
self.last_frame = None self.last_frame = None
self.frame_count = 0 self.frame_count = 0
self.dropped_frames = 0 self.dropped_frames = 0
@ -82,11 +82,8 @@ class CameraManager(BaseDevice):
'dropped_frames': 0 'dropped_frames': 0
} }
# 全局帧缓存(用于录制) # 全局帧队列(用于录制)
self.global_frame_cache = {} self.frame_queue = queue.Queue(maxsize=10) # 最大长度10自动丢弃旧帧
self.frame_cache_lock = threading.Lock()
self.max_cache_size = 10
self.cache_timeout = 5.0 # 5秒超时
# OpenCV优化开关 # OpenCV优化开关
try: try:
@ -334,15 +331,29 @@ class CameraManager(BaseDevice):
# 重置丢帧计数 # 重置丢帧计数
self.dropped_frames = 0 self.dropped_frames = 0
# 保存原始帧到全局缓存(用于录制) # 保存原始帧到队列(用于录制)
self._save_frame_to_cache(frame, 'camera') try:
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Full:
# 队列满时丢弃最旧的帧,添加新帧
try:
self.frame_queue.get_nowait() # 移除最旧的帧
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Empty:
pass # 队列为空,忽略
# 处理帧(降采样以优化传输负载) # 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame) processed_frame = self._process_frame(frame)
# 缓存帧(不复制,减少内存占用) # # 缓存帧(不复制,减少内存占用)
self.last_frame = processed_frame # self.last_frame = processed_frame
self.frame_cache.append(processed_frame) # self.frame_cache.append(processed_frame)
# 发送帧数据 # 发送帧数据
self._send_frame_data(processed_frame) self._send_frame_data(processed_frame)
@ -530,15 +541,7 @@ class CameraManager(BaseDevice):
self.logger.error(f"捕获图像异常: {e}") self.logger.error(f"捕获图像异常: {e}")
return None return None
def get_latest_frame(self) -> Optional[np.ndarray]:
"""
获取最新帧
Returns:
Optional[np.ndarray]: 最新帧无帧返回None
"""
# 对外提供拷贝,内部保持原对象,避免重复持有
return self.last_frame.copy() if self.last_frame is not None else None
def disconnect(self): def disconnect(self):
""" """
@ -574,12 +577,20 @@ class CameraManager(BaseDevice):
pass pass
self.cap = None self.cap = None
self.frame_cache.clear() # 清理帧缓存
while not self.frame_cache.empty():
try:
self.frame_cache.get_nowait()
except queue.Empty:
break
self.last_frame = None self.last_frame = None
# 清理全局帧缓存 # 清理帧队列
with self.frame_cache_lock: while not self.frame_queue.empty():
self.global_frame_cache.clear() try:
self.frame_queue.get_nowait()
except queue.Empty:
break
super().cleanup() super().cleanup()
self.logger.info("相机资源清理完成") self.logger.info("相机资源清理完成")
@ -618,48 +629,34 @@ class CameraManager(BaseDevice):
self.logger.error(f'保存帧到缓存失败: {e}') self.logger.error(f'保存帧到缓存失败: {e}')
def _get_latest_frame_from_cache(self, frame_type='camera'): def _get_latest_frame_from_cache(self, frame_type='camera'):
"""缓存获取最新帧""" """队列获取最新帧"""
try: try:
with self.frame_cache_lock: if self.frame_queue.empty():
if frame_type not in self.global_frame_cache: self.logger.debug('帧队列为空')
self.logger.debug(f'缓存中不存在帧类型: {frame_type}, 可用类型: {list(self.global_frame_cache.keys())}') return None, None
return None, None
if not self.global_frame_cache[frame_type]:
self.logger.debug(f'帧类型 {frame_type} 的缓存为空')
return None, None
# 清理过期帧
self._cleanup_expired_frames()
if not self.global_frame_cache[frame_type]:
self.logger.debug(f'清理过期帧后,帧类型 {frame_type} 的缓存为空')
return None, None
# 获取最新帧
latest_timestamp = max(self.global_frame_cache[frame_type].keys())
frame_data = self.global_frame_cache[frame_type][latest_timestamp]
return frame_data['frame'].copy(), frame_data['timestamp']
except Exception as e:
self.logger.error(f'从缓存获取帧失败: {e}')
return None, None
def _cleanup_expired_frames(self):
"""清理过期的缓存帧"""
try:
current_time = time.time()
for frame_type in list(self.global_frame_cache.keys()): # 获取队列中的所有帧,保留最新的一个
expired_keys = [] frames = []
for timestamp in self.global_frame_cache[frame_type].keys(): while not self.frame_queue.empty():
if current_time - timestamp > self.cache_timeout: try:
expired_keys.append(timestamp) frames.append(self.frame_queue.get_nowait())
except queue.Empty:
break
if not frames:
return None, None
# 获取最新帧(最后一个)
latest_frame = frames[-1]
# 将最新帧重新放回队列
try:
self.frame_queue.put_nowait(latest_frame)
except queue.Full:
pass # 队列满时忽略
return latest_frame['frame'].copy(), latest_frame['timestamp']
# 删除过期帧
for key in expired_keys:
del self.global_frame_cache[frame_type][key]
except Exception as e: except Exception as e:
self.logger.error(f'清理过期帧失败: {e}') self.logger.error(f'从队列获取帧失败: {e}')
return None, None

View File

@ -67,7 +67,6 @@ class RealIMUDevice:
if 'head_pose_offset' in calibration: if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset'] self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}') logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量""" """应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data: if not raw_data or 'head_pose' not in raw_data:
@ -76,15 +75,14 @@ class RealIMUDevice:
# 应用校准偏移 # 应用校准偏移
calibrated_data = raw_data.copy() calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy() head_pose = raw_data['head_pose'].copy()
angle=head_pose['rotation'] - self.head_pose_offset['rotation']
# 减去基准值(零点偏移) # 减去基准值(零点偏移)
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation'] head_pose['rotation'] = ((angle + 180) % 360) - 180
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt'] head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch'] head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose calibrated_data['head_pose'] = head_pose
return calibrated_data return calibrated_data
@staticmethod @staticmethod
def _checksum(data: bytes) -> int: def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF return sum(data[:-1]) & 0xFF
@ -115,7 +113,7 @@ class RealIMUDevice:
'yaw': yaw, 'yaw': yaw,
'temperature': temp 'temperature': temp
} }
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}') # print(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data return self.last_data
else: else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}') # logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
@ -356,37 +354,16 @@ class IMUManager(BaseDevice):
self.logger.info('开始IMU快速零点校准...') self.logger.info('开始IMU快速零点校准...')
# 收集校准样本 # 直接读取一次原始数据作为校准偏移量
calibration_samples = [] raw_data = self.imu_device.read_data(apply_calibration=False)
sample_count = 50 # 减少样本数量以加快校准速度 if not raw_data or 'head_pose' not in raw_data:
return {'status': 'error', 'error': '无法读取IMU原始数据'}
for i in range(sample_count): # 使用当前姿态作为零点偏移
try:
# 读取原始数据(不应用校准)
raw_data = self.imu_device.read_data(apply_calibration=False)
if raw_data and 'head_pose' in raw_data:
calibration_samples.append(raw_data['head_pose'])
time.sleep(0.02) # 20ms间隔
except Exception as e:
self.logger.warning(f'校准样本采集失败: {e}')
continue
if len(calibration_samples) < sample_count * 0.7:
return {
'status': 'error',
'error': f'校准样本不足: {len(calibration_samples)}/{sample_count}'
}
# 计算平均值作为零点偏移
rotation_sum = sum(sample['rotation'] for sample in calibration_samples)
tilt_sum = sum(sample['tilt'] for sample in calibration_samples)
pitch_sum = sum(sample['pitch'] for sample in calibration_samples)
count = len(calibration_samples)
self.head_pose_offset = { self.head_pose_offset = {
'rotation': rotation_sum / count, 'rotation': raw_data['head_pose']['rotation'],
'tilt': tilt_sum / count, 'tilt': raw_data['head_pose']['tilt'],
'pitch': pitch_sum / count 'pitch': raw_data['head_pose']['pitch']
} }
# 应用校准到设备 # 应用校准到设备
@ -396,8 +373,7 @@ class IMUManager(BaseDevice):
self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}') self.logger.info(f'IMU快速校准完成: {self.head_pose_offset}')
return { return {
'status': 'success', 'status': 'success',
'head_pose_offset': self.head_pose_offset, 'head_pose_offset': self.head_pose_offset
'samples_used': count
} }
except Exception as e: except Exception as e:
@ -504,8 +480,8 @@ class IMUManager(BaseDevice):
if data: if data:
# 缓存数据 # 缓存数据
self.data_buffer.append(data) # self.data_buffer.append(data)
self.last_valid_data = data # self.last_valid_data = data
# 发送数据到前端 # 发送数据到前端
if self._socketio: if self._socketio:

View File

@ -173,14 +173,17 @@ class RecordingManager:
# 初始化屏幕录制写入器 # 初始化屏幕录制写入器
# record_size = self.screen_region[2:4] if self.screen_region else self.screen_size # record_size = self.screen_region[2:4] if self.screen_region else self.screen_size
# print('屏幕写入器的宽高..............',record_size) # print('屏幕写入器的宽高..............',record_size)
self.screen_video_writer = cv2.VideoWriter( # self.screen_video_writer = cv2.VideoWriter(
screen_video_path, fourcc, fps, (self.screen_size[0],self.screen_size[1]) # screen_video_path, fourcc, fps, (self.screen_size[0],self.screen_size[1])
) # )
if self.screen_video_writer.isOpened(): # 检查屏幕视频写入器状态(仅在初始化时)
if self.screen_video_writer and self.screen_video_writer.isOpened():
self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}') self.logger.info(f'屏幕视频写入器初始化成功: {screen_video_path}')
else: elif self.screen_video_writer:
self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}') self.logger.error(f'屏幕视频写入器初始化失败: {screen_video_path}')
else:
self.logger.info('屏幕录制功能暂时禁用')
# 重置停止事件 # 重置停止事件
self.recording_stop_event.clear() self.recording_stop_event.clear()
@ -195,13 +198,13 @@ class RecordingManager:
) )
self.feet_recording_thread.start() self.feet_recording_thread.start()
if self.screen_video_writer: # if self.screen_video_writer:
self.screen_recording_thread = threading.Thread( # self.screen_recording_thread = threading.Thread(
target=self._screen_recording_thread, # target=self._screen_recording_thread,
daemon=True, # daemon=True,
name='ScreenRecordingThread' # name='ScreenRecordingThread'
) # )
self.screen_recording_thread.start() # self.screen_recording_thread.start()
result['success'] = True result['success'] = True
result['recording_start_time'] = self.recording_start_time.isoformat() result['recording_start_time'] = self.recording_start_time.isoformat()

View File

@ -29,7 +29,7 @@ depth_range_max = 1700
[DEVICES] [DEVICES]
imu_device_type = real imu_device_type = real
imu_port = COM3 imu_port = COM8
imu_baudrate = 9600 imu_baudrate = 9600
pressure_device_type = real pressure_device_type = real
pressure_use_mock = False pressure_use_mock = False

View File

@ -947,7 +947,7 @@ class AppServer:
self.logger.error(f'校准设备失败: {e}') self.logger.error(f'校准设备失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500 return jsonify({'success': False, 'error': str(e)}), 500
@self.app.route('/api/devices/imu/calibrate', methods=['POST']) @self.app.route('/api/devices/calibrate/imu', methods=['POST'])
def calibrate_imu(): def calibrate_imu():
"""校准IMU""" """校准IMU"""
try: try:

View File

@ -1,79 +1,53 @@
import ctypes from PIL import Image
import time import colorsys
import numpy as np
# === DLL 加载 === def get_unique_colors(image_path):
dll = ctypes.WinDLL(r"D:\BodyBalanceEvaluation\backend\dll\smitsense\SMiTSenseUsbWrapper.dll") img = Image.open(image_path).convert("RGB")
unique_colors = list(set(img.getdata()))
return unique_colors
# === DLL 函数声明 === def get_representative_colors(colors, n=12):
dll.SMiTSenseUsb_Init.argtypes = [ctypes.c_int] # 按亮度排序并均匀抽取
dll.SMiTSenseUsb_Init.restype = ctypes.c_int def brightness(rgb):
r, g, b = rgb
return 0.2126*r + 0.7152*g + 0.0722*b
colors.sort(key=brightness)
total = len(colors)
if total <= n:
return colors
step = total / n
return [colors[int(i*step)] for i in range(n)]
dll.SMiTSenseUsb_ScanDevices.argtypes = [ctypes.POINTER(ctypes.c_int)] def sort_colors_by_hue(colors):
dll.SMiTSenseUsb_ScanDevices.restype = ctypes.c_int # 转为 HSV并按色相排序
def rgb_to_hue(rgb):
r, g, b = [x/255.0 for x in rgb]
h, s, v = colorsys.rgb_to_hsv(r, g, b)
return h
return sorted(colors, key=rgb_to_hue)
dll.SMiTSenseUsb_OpenAndStart.argtypes = [ def show_color_preview(colors, width=600, height_per_color=50):
ctypes.c_int, n = len(colors)
ctypes.POINTER(ctypes.c_uint16), height = n * height_per_color
ctypes.POINTER(ctypes.c_uint16) img = Image.new("RGB", (width, height))
]
dll.SMiTSenseUsb_OpenAndStart.restype = ctypes.c_int for i, color in enumerate(colors):
for y in range(i*height_per_color, (i+1)*height_per_color):
for x in range(width):
img.putpixel((x, y), color)
img.show()
dll.SMiTSenseUsb_GetLatestFrame.argtypes = [ if __name__ == "__main__":
ctypes.POINTER(ctypes.c_uint16), image_path = r"D:\项目资料\技术文档资料\中康项目资料\11.png"
ctypes.c_int
] colors = get_unique_colors(image_path)
dll.SMiTSenseUsb_GetLatestFrame.restype = ctypes.c_int rep_colors = get_representative_colors(colors, 12)
sorted_colors = sort_colors_by_hue(rep_colors)
dll.SMiTSenseUsb_StopAndClose.argtypes = []
dll.SMiTSenseUsb_StopAndClose.restype = ctypes.c_int print("12 个代表性颜色(按彩虹顺序):")
for i, color in enumerate(sorted_colors, 1):
# === 初始化设备 === print(f"{i}: {color}")
ret = dll.SMiTSenseUsb_Init(0)
if ret != 0: show_color_preview(sorted_colors)
raise RuntimeError(f"Init failed: {ret}")
count = ctypes.c_int()
ret = dll.SMiTSenseUsb_ScanDevices(ctypes.byref(count))
if ret != 0 or count.value == 0:
raise RuntimeError("No devices found")
# 打开设备
rows = ctypes.c_uint16()
cols = ctypes.c_uint16()
ret = dll.SMiTSenseUsb_OpenAndStart(0, ctypes.byref(rows), ctypes.byref(cols))
if ret != 0:
raise RuntimeError("OpenAndStart failed")
rows_val, cols_val = rows.value, cols.value
frame_size = rows_val * cols_val
buf_type = ctypes.c_uint16 * frame_size
buf = buf_type()
# 创建一个 NumPy 数组视图,复用内存
data_array = np.ctypeslib.as_array(buf).reshape((rows_val, cols_val))
print(f"设备已打开: {rows_val}x{cols_val}")
try:
while True:
ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
time.sleep(1)
# while True:
# ret = dll.SMiTSenseUsb_GetLatestFrame(buf, frame_size)
# if ret == 0:
# # data_array 已经复用缓冲区内存,每次直接访问即可
# # 例如打印最大值和前5行前5列的数据
# print("最大压力值:", data_array.max())
# print("前5x5数据:\n", data_array[:5, :5])
# else:
# print("读取数据帧失败")
# time.sleep(1) # 每秒读取一帧
except KeyboardInterrupt:
print("退出中...")
finally:
dll.SMiTSenseUsb_StopAndClose()
print("设备已关闭")

View File

@ -59,7 +59,7 @@ def parse_packet(data):
# else: # else:
# return f"未知包类型: {packet_type:#04x}" # return f"未知包类型: {packet_type:#04x}"
def read_imu(port='COM6', baudrate=9600): def read_imu(port='COM8', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1) ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray() buffer = bytearray()

View File

@ -0,0 +1,94 @@
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import pykinect_azure as pykinect
class FemtoBoltContourViewer:
def __init__(self, depth_min=900, depth_max=1100):
self.depth_min = depth_min
self.depth_max = depth_max
# 自定义离散彩虹色,层次明显
colors = [
'darkblue', 'blue', 'cyan', 'lime', 'yellow',
'orange', 'red', 'darkred'
]
self.cmap = ListedColormap(colors)
self.device_handle = None
self.pykinect = None
self.config = None
def _load_sdk(self):
"""加载并初始化 FemtoBolt SDK"""
base_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_dir, "..", "dll", "femtobolt", "bin", "k4a.dll")
self.pykinect = pykinect
self.pykinect.initialize_libraries(track_body=False, module_k4a_path=dll_path)
def _configure_device(self):
"""配置 FemtoBolt 深度相机"""
self.config = self.pykinect.default_configuration
self.config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15
self.config.synchronized_images_only = False
self.config.color_resolution = 0
self.device_handle = self.pykinect.start_device(config=self.config)
def run(self):
self._load_sdk()
self._configure_device()
plt.ion() # 打开交互模式
fig, ax = plt.subplots(figsize=(7, 7))
print("FemtoBolt 深度相机启动成功,关闭窗口或 Ctrl+C 退出")
# 设置离散等高线层次
levels = np.linspace(self.depth_min, self.depth_max, len(self.cmap.colors) + 1)
try:
while plt.fignum_exists(fig.number): # 窗口存在才继续
capture = self.device_handle.update()
if capture is None:
continue
ret, depth_image = capture.get_depth_image()
if not ret or depth_image is None:
continue
depth = depth_image.astype(np.uint16)
# 限制深度范围
depth[depth > self.depth_max] = 0
depth[depth < self.depth_min] = 0
depth = depth[0:350, 0:350]
# 屏蔽无效值
depth_masked = np.ma.masked_equal(depth, 0)
# 背景灰色
background = np.ones_like(depth) * 0.5
# 绘制
ax.clear()
ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
ax.grid(True, which='both', axis='both', color='white',
linestyle='--', linewidth=1, zorder=0)
ax.contourf(depth_masked, levels=levels, cmap=self.cmap,
vmin=self.depth_min, vmax=self.depth_max,
origin='upper', zorder=2)
plt.pause(0.05)
except KeyboardInterrupt:
print("检测到退出信号,结束程序")
finally:
if self.device_handle:
self.device_handle.stop()
self.device_handle.close()
plt.close(fig)
if __name__ == "__main__":
viewer = FemtoBoltContourViewer(depth_min=900, depth_max=1100)
viewer.run()