解决合并冲突,删除__pycache__文件

This commit is contained in:
root 2025-08-13 14:24:01 +08:00
commit 08d52e2c98
11 changed files with 54863 additions and 189 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -23,6 +23,9 @@ from typing import Dict, List, Optional, Any, Tuple
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
import logging import logging
# 添加串口通信支持
import serial
# matplotlib相关导入用于深度图渲染 # matplotlib相关导入用于深度图渲染
try: try:
from matplotlib.colors import LinearSegmentedColormap from matplotlib.colors import LinearSegmentedColormap
@ -75,8 +78,12 @@ class DeviceManager:
# 推流状态和线程 # 推流状态和线程
self.camera_streaming = False self.camera_streaming = False
self.femtobolt_streaming = False self.femtobolt_streaming = False
self.imu_streaming = False
self.pressure_streaming = False
self.camera_streaming_thread = None self.camera_streaming_thread = None
self.femtobolt_streaming_thread = None self.femtobolt_streaming_thread = None
self.imu_thread = None
self.pressure_thread = None
self.streaming_stop_event = threading.Event() self.streaming_stop_event = threading.Event()
# 全局帧缓存机制 # 全局帧缓存机制
@ -271,15 +278,40 @@ class DeviceManager:
def _init_imu(self): def _init_imu(self):
"""初始化IMU传感器""" """初始化IMU传感器"""
logger.info('开始初始化IMU传感器...')
try: try:
# 这里应该连接实际的IMU设备 # 从config.ini读取串口配置
# 目前使用模拟数据 config = configparser.ConfigParser()
self.imu_device = MockIMUDevice() # 优先读取根目录config.ini否则读取backend/config.ini
root_config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
app_root_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
logger.debug(f'尝试读取配置文件: {root_config_path}, {app_root_config_path}')
read_files = config.read([app_root_config_path, root_config_path], encoding='utf-8')
logger.debug(f'成功读取的配置文件: {read_files}')
if not read_files:
logger.warning('未能读取到config.ini将使用默认串口配置COM7@9600')
imu_port = config.get('DEVICES', 'imu_port', fallback='COM7')
imu_baudrate = config.getint('DEVICES', 'baudrate', fallback=9600)
logger.info(f'从配置文件读取IMU串口配置: {imu_port}@{imu_baudrate}')
# 初始化真实IMU设备
logger.debug('创建RealIMUDevice实例...')
self.imu_device = RealIMUDevice(port=imu_port, baudrate=imu_baudrate)
# 测试读取数据
logger.debug('测试IMU设备数据读取...')
test_data = self.imu_device.read_data()
logger.debug(f'IMU设备测试数据: {test_data}')
self.device_status['imu'] = True self.device_status['imu'] = True
logger.info('IMU传感器初始化成功模拟') logger.info(f'IMU传感器初始化成功真实设备): {imu_port}@{imu_baudrate}')
except Exception as e: except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}') logger.error(f'IMU传感器初始化失败: {e}', exc_info=True)
self.imu_device = None self.imu_device = None
self.device_status['imu'] = False
def _init_pressure_sensor(self): def _init_pressure_sensor(self):
"""初始化压力传感器""" """初始化压力传感器"""
@ -376,52 +408,60 @@ class DeviceManager:
return {'status': 'failed', 'error': str(e)} return {'status': 'failed', 'error': str(e)}
def _calibrate_imu(self) -> Dict[str, Any]: def _calibrate_imu(self) -> Dict[str, Any]:
"""校准IMU传感器""" """标准校准:采样较多帧,计算稳定零点偏移"""
if not self.imu_device: if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'} return {'status': 'failed', 'error': 'IMU设备未连接'}
try: try:
# 收集静态数据进行零点校准
samples = [] samples = []
for _ in range(100): for _ in range(100):
data = self.imu_device.read_data() data = self.imu_device.read_data()
samples.append(data) if data and 'head_pose' in data:
samples.append(data['head_pose'])
time.sleep(0.01) time.sleep(0.01)
if not samples:
# 计算零点偏移 return {'status': 'failed', 'error': '无法获取IMU数据进行校准'}
accel_offset = {
'x': np.mean([s['accel']['x'] for s in samples]),
'y': np.mean([s['accel']['y'] for s in samples]),
'z': np.mean([s['accel']['z'] for s in samples]) - 9.8 # 重力补偿
}
gyro_offset = {
'x': np.mean([s['gyro']['x'] for s in samples]),
'y': np.mean([s['gyro']['y'] for s in samples]),
'z': np.mean([s['gyro']['z'] for s in samples])
}
# 计算头部姿态零点偏移(正立状态为标准零位)
head_pose_offset = { head_pose_offset = {
'rotation': np.mean([s['head_pose']['rotation'] for s in samples if 'head_pose' in s]), 'rotation': float(np.mean([s['rotation'] for s in samples])),
'tilt': np.mean([s['head_pose']['tilt'] for s in samples if 'head_pose' in s]), 'tilt': float(np.mean([s['tilt'] for s in samples])),
'pitch': np.mean([s['head_pose']['pitch'] for s in samples if 'head_pose' in s]) 'pitch': float(np.mean([s['pitch'] for s in samples]))
} }
calibration = { calibration = {
'status': 'success', 'status': 'success',
'accel_offset': accel_offset, 'head_pose_offset': head_pose_offset,
'gyro_offset': gyro_offset,
'head_pose_offset': head_pose_offset, # 头部姿态零点偏移
'timestamp': datetime.now().isoformat() 'timestamp': datetime.now().isoformat()
} }
# 保存校准数据到设备实例
if hasattr(self.imu_device, 'set_calibration'): if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration) self.imu_device.set_calibration(calibration)
return calibration return calibration
except Exception as e:
return {'status': 'failed', 'error': str(e)}
def _quick_calibrate_imu(self) -> Dict[str, Any]:
"""快速校准:采样少量帧,以当前姿态为零点(用于每次推流启动)"""
if not self.imu_device:
return {'status': 'failed', 'error': 'IMU设备未连接'}
try:
samples = []
for _ in range(10): # 少量采样,加快启动
data = self.imu_device.read_data()
if data and 'head_pose' in data:
samples.append(data['head_pose'])
time.sleep(0.01)
if not samples:
return {'status': 'failed', 'error': '无法获取IMU数据进行快速校准'}
head_pose_offset = {
'rotation': float(np.median([s['rotation'] for s in samples])),
'tilt': float(np.median([s['tilt'] for s in samples])),
'pitch': float(np.median([s['pitch'] for s in samples]))
}
calibration = {
'status': 'success',
'head_pose_offset': head_pose_offset,
'timestamp': datetime.now().isoformat()
}
if hasattr(self.imu_device, 'set_calibration'):
self.imu_device.set_calibration(calibration)
return calibration
except Exception as e: except Exception as e:
return {'status': 'failed', 'error': str(e)} return {'status': 'failed', 'error': str(e)}
@ -640,6 +680,14 @@ class DeviceManager:
logger.error('IMU设备未初始化') logger.error('IMU设备未初始化')
return False return False
# 在启动推流前进行快速零点校准(自动以当前姿态为基准)
logger.info('正在进行IMU零点校准...')
calibration_result = self._quick_calibrate_imu()
if calibration_result.get('status') == 'success':
logger.info(f'IMU零点校准完成: {calibration_result["head_pose_offset"]}')
else:
logger.warning(f'IMU零点校准失败将使用默认零偏移: {calibration_result.get("error", "未知错误")}')
self.imu_streaming = True self.imu_streaming = True
self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True) self.imu_thread = threading.Thread(target=self._imu_streaming_thread, daemon=True)
self.imu_thread.start() self.imu_thread.start()
@ -947,8 +995,8 @@ class DeviceManager:
if capture is not None: if capture is not None:
ret, depth_image = capture.get_depth_image() ret, depth_image = capture.get_depth_image()
height2, width2 = depth_image.shape[:2] height2, width2 = depth_image.shape[:2]
logger.debug(f'FemtoBolt原始帧宽: {width2}') # logger.debug(f'FemtoBolt原始帧宽: {width2}')
logger.debug(f'FemtoBolt原始帧高: {height2}') # logger.debug(f'FemtoBolt原始帧高: {height2}')
if ret and depth_image is not None: if ret and depth_image is not None:
# 读取config.ini中的深度范围配置 # 读取config.ini中的深度范围配置
@ -963,6 +1011,14 @@ class DeviceManager:
depth_range_max = None depth_range_max = None
# 使用matplotlib渲染深度图参考display_x.py # 使用matplotlib渲染深度图参考display_x.py
if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None: if MATPLOTLIB_AVAILABLE and depth_range_min is not None and depth_range_max is not None:
# 在子线程中切换到非交互后端避免GUI警告
import matplotlib
try:
if matplotlib.get_backend().lower() != 'agg':
matplotlib.use('Agg', force=True)
logger.debug('切换matplotlib后端为Agg以适配子线程渲染')
except Exception:
pass
depth_image[depth_image > depth_range_max] = 0 depth_image[depth_image > depth_range_max] = 0
depth_image[depth_image < depth_range_min] = 0 depth_image[depth_image < depth_range_min] = 0
background = np.ones_like(depth_image) * 0.5 background = np.ones_like(depth_image) * 0.5
@ -972,21 +1028,19 @@ class DeviceManager:
'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue',
'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue']
mcmap = LinearSegmentedColormap.from_list("custom_cmap", colors) mcmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
# plt.figure(figsize=(7, 7)) fig = plt.figure(figsize=(width2/100, height2/100), dpi=100)
plt.figure(figsize=(width2/100, height2/100), dpi=100) ax = fig.add_subplot(111)
plt.imshow(background, origin='lower', cmap='gray', alpha=0.3) ax.imshow(background, origin='lower', cmap='gray', alpha=0.3)
plt.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=1, zorder=0) ax.grid(True, which='both', axis='both', color='white', linestyle='-', linewidth=1, zorder=0)
plt.contourf(depth_masked, levels=200, cmap=mcmap, vmin=depth_range_min, vmax=depth_range_max, origin='upper', zorder=2) ax.contourf(depth_masked, levels=200, cmap=mcmap, vmin=depth_range_min, vmax=depth_range_max, origin='upper', zorder=2)
# plt.axis('off') fig.tight_layout(pad=0)
plt.tight_layout(pad=0) try:
plt.draw() fig.canvas.draw()
img = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
plt_canvas = plt.gca().figure.canvas img = img.reshape(fig.canvas.get_width_height()[::-1] + (3,))
plt_canvas.draw() depth_colored = img
img = np.frombuffer(plt_canvas.tostring_rgb(), dtype=np.uint8) finally:
img = img.reshape(plt_canvas.get_width_height()[::-1] + (3,)) plt.close(fig)
plt.clf()
depth_colored = img
else: else:
# 如果没有matplotlib则使用原有OpenCV伪彩色映射 # 如果没有matplotlib则使用原有OpenCV伪彩色映射
depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max) depth_normalized = np.clip(depth_image, depth_range_min, depth_range_max)
@ -996,16 +1050,16 @@ class DeviceManager:
depth_colored[mask_outside] = [0, 0, 0] depth_colored[mask_outside] = [0, 0, 0]
height, width = depth_colored.shape[:2] height, width = depth_colored.shape[:2]
logger.debug(f'FemtoBolt帧宽: {width}') # logger.debug(f'FemtoBolt帧宽: {width}')
logger.debug(f'FemtoBolt帧高: {height}') # logger.debug(f'FemtoBolt帧高: {height}')
target_width = height // 2 target_width = height // 2
if width > target_width: if width > target_width:
left = (width - target_width) // 2 left = (width - target_width) // 2
right = left + target_width right = left + target_width
depth_colored = depth_colored[:, left:right] depth_colored = depth_colored[:, left:right]
height1, width1 = depth_colored.shape[:2] height1, width1 = depth_colored.shape[:2]
logger.debug(f'FemtoBolt帧裁剪完以后得宽: {width1}') # logger.debug(f'FemtoBolt帧裁剪完以后得宽: {width1}')
logger.debug(f'FemtoBolt帧裁剪完以后得宽: {height1}') # logger.debug(f'FemtoBolt帧裁剪完以后得宽: {height1}')
# 保存处理好的身体帧到全局缓存 # 保存处理好的身体帧到全局缓存
self._save_frame_to_cache(depth_colored.copy(), 'femtobolt') self._save_frame_to_cache(depth_colored.copy(), 'femtobolt')
@ -1041,40 +1095,49 @@ class DeviceManager:
logger.info('IMU头部姿态数据推流线程已启动') logger.info('IMU头部姿态数据推流线程已启动')
try: try:
loop_count = 0
while self.imu_streaming and self.socketio: while self.imu_streaming and self.socketio:
try: try:
loop_count += 1
# 从IMU设备读取数据 # 从IMU设备读取数据
imu_data = self.imu_device.read_data() imu_data = self.imu_device.read_data()
if imu_data and 'head_pose' in imu_data: if imu_data and 'head_pose' in imu_data:
# 直接使用设备提供的头部姿态数据 # 直接使用设备提供的头部姿态数据,减少数据包装
head_pose = imu_data['head_pose'] head_pose = imu_data['head_pose']
# logger.warning(f'推送数据{head_pose}')
# 构建完整的头部姿态数据 # 优化:直接发送最精简的数据格式,避免重复时间戳
head_pose_data = { rotation = head_pose.get('rotation')
'rotation': head_pose['rotation'], # 旋转角:左旋(-), 右旋(+) tilt = head_pose.get('tilt')
'tilt': head_pose['tilt'], # 倾斜角:左倾(-), 右倾(+) pitch = head_pose.get('pitch')
'pitch': head_pose['pitch'], # 俯仰角:俯角(-), 仰角(+) try:
rotation = round(float(rotation), 2) if rotation is not None else rotation
'temperature': imu_data.get('temperature', 25), except Exception:
'timestamp': imu_data['timestamp'] pass
} try:
tilt = round(float(tilt), 2) if tilt is not None else tilt
# 通过WebSocket发送头部姿态数据 except Exception:
pass
try:
pitch = round(float(pitch), 2) if pitch is not None else pitch
except Exception:
pass
self.socketio.emit('imu_data', { self.socketio.emit('imu_data', {
'head_pose': head_pose_data, 'rotation': rotation, # 旋转角:左旋(-), 右旋(+)
'timestamp': datetime.now().isoformat() 'tilt': tilt, # 倾斜角:左倾(-), 右倾(+)
'pitch': pitch, # 俯仰角:俯角(-), 仰角(+)
}) })
# 控制数据发送频率10Hz # 优化提高数据发送频率到30Hz降低延时
time.sleep(0.1) time.sleep(0.033)
except Exception as e: except Exception as e:
logger.error(f'IMU数据推流异常: {e}') # 减少异常日志的详细程度
time.sleep(0.1) logger.warning(f'IMU数据推流异常: {e}')
time.sleep(0.033)
except Exception as e: except Exception as e:
logger.error(f'IMU推流线程异常: {e}') logger.error(f'IMU推流线程异常: {e}', exc_info=True)
finally: finally:
logger.info('IMU头部姿态数据推流线程已结束') logger.info('IMU头部姿态数据推流线程已结束')
@ -1090,7 +1153,7 @@ class DeviceManager:
if pressure_data and 'foot_pressure' in pressure_data: if pressure_data and 'foot_pressure' in pressure_data:
foot_pressure = pressure_data['foot_pressure'] foot_pressure = pressure_data['foot_pressure']
logger.error(f"压力传感器数据{foot_pressure}") # logger.error(f"压力传感器数据{foot_pressure}")
# 获取各区域压力值 # 获取各区域压力值
left_front = foot_pressure['left_front'] left_front = foot_pressure['left_front']
left_rear = foot_pressure['left_rear'] left_rear = foot_pressure['left_rear']
@ -1851,11 +1914,174 @@ class DeviceManager:
for key in expired_keys: for key in expired_keys:
del self.frame_cache[frame_type][key] del self.frame_cache[frame_type][key]
if expired_keys: # if expired_keys:
logger.debug(f'清理了 {len(expired_keys)} 个过期帧: {frame_type}') # logger.debug(f'清理了 {len(expired_keys)} 个过期帧: {frame_type}')
except Exception as e: except Exception as e:
logger.error(f'清理过期帧失败: {e}') logger.error(f'清理过期帧失败: {e}')
class RealIMUDevice:
"""真实IMU设备通过串口读取姿态数据"""
def __init__(self, port: str = 'COM7', baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.ser = None
self.buffer = bytearray()
self.calibration_data = None
self.head_pose_offset = {'rotation': 0, 'tilt': 0, 'pitch': 0}
self.last_data = {
'roll': 0.0,
'pitch': 0.0,
'yaw': 0.0,
'temperature': 25.0
}
logger.debug(f'RealIMUDevice 初始化: port={self.port}, baudrate={self.baudrate}')
self._connect()
def _connect(self):
try:
logger.debug(f'尝试打开串口: {self.port} @ {self.baudrate}')
self.ser = serial.Serial(self.port, self.baudrate, timeout=1)
if hasattr(self.ser, 'reset_input_buffer'):
try:
self.ser.reset_input_buffer()
logger.debug('已清空串口输入缓冲区')
except Exception as e:
logger.debug(f'重置串口输入缓冲区失败: {e}')
logger.info(f'IMU设备连接成功: {self.port} @ {self.baudrate}bps')
except Exception as e:
logger.error(f'IMU设备连接失败: {e}', exc_info=True)
self.ser = None
def set_calibration(self, calibration: Dict[str, Any]):
self.calibration_data = calibration
if 'head_pose_offset' in calibration:
self.head_pose_offset = calibration['head_pose_offset']
logger.debug(f'应用IMU校准数据: {self.head_pose_offset}')
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准:将当前姿态减去初始偏移,得到相对于初始姿态的变化量"""
if not raw_data or 'head_pose' not in raw_data:
return raw_data
# 应用校准偏移
calibrated_data = raw_data.copy()
head_pose = raw_data['head_pose'].copy()
# 减去基准值(零点偏移)
head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return calibrated_data
@staticmethod
def _checksum(data: bytes) -> int:
return sum(data[:-1]) & 0xFF
def _parse_packet(self, data: bytes) -> Optional[Dict[str, float]]:
if len(data) != 11:
logger.debug(f'无效数据包长度: {len(data)}')
return None
if data[0] != 0x55:
logger.debug(f'错误的包头: 0x{data[0]:02X}')
return None
if self._checksum(data) != data[-1]:
logger.debug(f'校验和错误: 期望{self._checksum(data):02X}, 实际{data[-1]:02X}')
return None
packet_type = data[1]
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
if packet_type == 0x53: # 姿态角单位0.01°
pitchl, rxl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = -round(rxl / 32768.0 * k_angle,2)
pitch = -round(pitchl / 32768.0 * k_angle,2)
yaw = -round(yawl / 32768.0 * k_angle,2)
temp = temp / 100.0
self.last_data = {
'roll': roll,
'pitch': pitch,
'yaw': yaw,
'temperature': temp
}
# logger.debug(f'解析姿态角包: roll={roll}, pitch={pitch}, yaw={yaw}, temp={temp}')
return self.last_data
else:
# logger.debug(f'忽略的数据包类型: 0x{packet_type:02X}')
return None
def read_data(self) -> Dict[str, Any]:
if not self.ser or not getattr(self.ser, 'is_open', False):
logger.warning('IMU串口未连接尝试重新连接...')
self._connect()
return {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
try:
bytes_waiting = self.ser.in_waiting
if bytes_waiting:
# logger.debug(f'串口缓冲区待读字节: {bytes_waiting}')
chunk = self.ser.read(bytes_waiting)
# logger.debug(f'读取到字节: {len(chunk)}')
self.buffer.extend(chunk)
while len(self.buffer) >= 11:
if self.buffer[0] != 0x55:
dropped = self.buffer.pop(0)
logger.debug(f'丢弃无效字节: 0x{dropped:02X}')
continue
packet = bytes(self.buffer[:11])
parsed = self._parse_packet(packet)
del self.buffer[:11]
if parsed is not None:
raw = {
'head_pose': {
'rotation': parsed['yaw'], # rotation = roll
'tilt': parsed['roll'], # tilt = yaw
'pitch': parsed['pitch'] # pitch = pitch
},
'temperature': parsed['temperature'],
'timestamp': datetime.now().isoformat()
}
# logger.debug(f'映射后的头部姿态: {raw}')
return self.apply_calibration(raw)
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw)
except Exception as e:
logger.error(f'IMU数据读取异常: {e}', exc_info=True)
raw = {
'head_pose': {
'rotation': self.last_data['yaw'],
'tilt': self.last_data['roll'],
'pitch': self.last_data['pitch']
},
'temperature': self.last_data['temperature'],
'timestamp': datetime.now().isoformat()
}
return self.apply_calibration(raw)
def __del__(self):
try:
if self.ser and getattr(self.ser, 'is_open', False):
self.ser.close()
logger.info('IMU设备串口已关闭')
except Exception:
pass
class MockIMUDevice: class MockIMUDevice:
"""模拟IMU设备""" """模拟IMU设备"""
@ -1871,17 +2097,17 @@ class MockIMUDevice:
self.head_pose_offset = calibration['head_pose_offset'] self.head_pose_offset = calibration['head_pose_offset']
def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]: def apply_calibration(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""应用校准数据""" """应用校准:将当前姿态减去初始偏移,得到相对姿态"""
if not self.calibration_data: if not raw_data or 'head_pose' not in raw_data:
return raw_data return raw_data
# 应用头部姿态零点校准 calibrated_data = raw_data.copy()
if 'head_pose' in raw_data: head_pose = raw_data['head_pose'].copy()
raw_data['head_pose']['rotation'] -= self.head_pose_offset['rotation'] head_pose['rotation'] = head_pose['rotation'] - self.head_pose_offset['rotation']
raw_data['head_pose']['tilt'] -= self.head_pose_offset['tilt'] head_pose['tilt'] = head_pose['tilt'] - self.head_pose_offset['tilt']
raw_data['head_pose']['pitch'] -= self.head_pose_offset['pitch'] head_pose['pitch'] = head_pose['pitch'] - self.head_pose_offset['pitch']
calibrated_data['head_pose'] = head_pose
return raw_data return calibrated_data
def read_data(self) -> Dict[str, Any]: def read_data(self) -> Dict[str, Any]:
"""读取IMU数据""" """读取IMU数据"""
@ -2312,8 +2538,8 @@ class VideoStreamManager:
if self.device_manager: if self.device_manager:
self.device_manager._save_frame_to_cache(cropped_frame, 'camera') self.device_manager._save_frame_to_cache(cropped_frame, 'camera')
# 每1000帧记录一次缓存保存状态 # 每1000帧记录一次缓存保存状态
if frame_count % 1000 == 0: # if frame_count % 1000 == 0:
logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存") # logger.debug(f"视频推流已保存第 {frame_count} 帧到全局缓存")
else: else:
logger.warning("VideoStreamManager未关联DeviceManager无法保存帧到缓存") logger.warning("VideoStreamManager未关联DeviceManager无法保存帧到缓存")

57
backend/lib_fpms_usb.h Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#define __DLL_EXPORTS__
#ifdef __DLL_EXPORTS__
#define DLLAPI __declspec(dllexport)
#else
#define DLLAPI __declspec(dllimport)
#endif
#include <windows.h>
#include <cstdint>
#include <vector>
using namespace std;
typedef void* SM_HANDLE;
typedef struct _FPMS_DEVICE
{
uint16_t mn;
std::string sn;
uint16_t fwVersion;
uint8_t protoVer;
uint16_t pid;
uint16_t vid;
uint16_t rows;
uint16_t cols;
} FPMS_DEVICE_T;
extern "C"
{
DLLAPI
int WINAPI fpms_usb_init(int debugFlag);
DLLAPI
int WINAPI fpms_usb_get_device_list(std::vector<FPMS_DEVICE_T>& gDevList);
DLLAPI
int WINAPI fpms_usb_open(FPMS_DEVICE_T dev, SM_HANDLE& gHandle);
DLLAPI
int WINAPI fpms_usb_read_frame(SM_HANDLE gHandle, uint16_t* frame);
DLLAPI
int WINAPI fpms_usb_config_sensitivity(SM_HANDLE gHandle, uint8_t bWriteFlash, const uint8_t level);
DLLAPI
int WINAPI fpms_usb_get_sensitivity(SM_HANDLE gHandle, uint8_t& level);
DLLAPI
int WINAPI fpms_usb_close(SM_HANDLE gHandle);
}

View File

@ -1,108 +1,126 @@
# import cv2 import cv2
# class CameraViewer: class CameraViewer:
# def __init__(self, device_index=0): def __init__(self, device_index=0):
# self.device_index = device_index self.device_index = device_index
# self.window_name = "Camera Viewer" self.window_name = "Camera Viewer"
# def start_stream(self): def start_stream(self):
# cap = cv2.VideoCapture(self.device_index) cap = cv2.VideoCapture(self.device_index)
# if not cap.isOpened(): if not cap.isOpened():
# print(f"无法打开摄像头设备 {self.device_index}") print(f"无法打开摄像头设备 {self.device_index}")
# return return
# cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL) cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
# while True: while True:
# ret, frame = cap.read() ret, frame = cap.read()
# if not ret: if not ret:
# print("无法获取视频帧") print("无法获取视频帧")
# break break
# cv2.imshow(self.window_name, frame) cv2.imshow(self.window_name, frame)
# if cv2.waitKey(1) & 0xFF == ord('q'): if cv2.waitKey(1) & 0xFF == ord('q'):
# break break
# cap.release() cap.release()
# cv2.destroyAllWindows() cv2.destroyAllWindows()
if __name__ == "__main__":
# 修改这里的数字可以切换不同摄像头设备
viewer = CameraViewer(device_index=3)
viewer.start_stream()
# import ctypes
# from ctypes import c_int, c_uint16, c_uint8, c_char, c_char_p, Structure, POINTER, byref
# # 设备结构体对应wrapper中FPMS_DEVICE_C
# class FPMS_DEVICE_C(Structure):
# _pack_ = 1
# _fields_ = [
# ("mn", c_uint16),
# ("sn", c_char * 64),
# ("fwVersion", c_uint16),
# ("protoVer", c_uint8),
# ("pid", c_uint16),
# ("vid", c_uint16),
# ("rows", c_uint16),
# ("cols", c_uint16),
# ]
# # 加载DLL
# dll_path = r"D:\BodyBalanceEvaluation\backend\SMiTSenseUsbWrapper.dll"
# dll = ctypes.windll.LoadLibrary(dll_path)
# # 函数原型声明
# # int fpms_usb_init_c(int debugFlag);
# dll.fpms_usb_init_c.argtypes = [c_int]
# dll.fpms_usb_init_c.restype = c_int
# dll.fpms_usb_get_device_list_c.argtypes = [POINTER(FPMS_DEVICE_C), c_int]
# dll.fpms_usb_get_device_list_c.restype = c_int
# dll.fpms_usb_open_c.argtypes = [POINTER(FPMS_DEVICE_C), POINTER(ctypes.c_void_p)]
# dll.fpms_usb_open_c.restype = c_int
# # int fpms_usb_read_frame_c(void* handle, uint16_t* frame);
# dll.fpms_usb_read_frame_c.argtypes = [ctypes.c_void_p, POINTER(c_uint16)]
# dll.fpms_usb_read_frame_c.restype = c_int
# # int fpms_usb_close_c(void* handle);
# dll.fpms_usb_close_c.argtypes = [ctypes.c_void_p]
# dll.fpms_usb_close_c.restype = c_int
# # 其他函数如果需要可以类似声明
# def main():
# # 初始化
# ret = dll.fpms_usb_init_c(0)
# print(f"fpms_usb_init_c 返回值: {ret}")
# if ret != 0:
# print("初始化失败")
# return
# MAX_DEVICES = 8
# devices = (FPMS_DEVICE_C * MAX_DEVICES)() # 创建数组
# count = dll.fpms_usb_get_device_list_c(devices, MAX_DEVICES)
# print(f"设备数量: {count}")
# if count <= 0:
# print("未找到设备或错误")
# return
# for i in range(count):
# dev = devices[i]
# print(f"设备{i}: mn={dev.mn}, sn={dev.sn.decode(errors='ignore').rstrip(chr(0))}, fwVersion={dev.fwVersion}")
# # 打开第一个设备
# handle = ctypes.c_void_p()
# ret = dll.fpms_usb_open_c(byref(devices[0]), byref(handle))
# print(f"fpms_usb_open_c 返回值: {ret}")
# if ret != 0:
# print("打开设备失败")
# return
# # 假设帧大小是 rows * cols
# rows = devices[0].rows
# cols = devices[0].cols
# frame_size = rows * cols
# frame_buffer = (c_uint16 * frame_size)()
# ret = dll.fpms_usb_read_frame_c(handle, frame_buffer)
# print(f"fpms_usb_read_frame_c 返回值: {ret}")
# if ret == 0:
# # 打印前10个数据看看
# print("帧数据前10个点:", list(frame_buffer[:10]))
# else:
# print("读取帧失败")
# # 关闭设备
# ret = dll.fpms_usb_close_c(handle)
# print(f"fpms_usb_close_c 返回值: {ret}")
# if __name__ == "__main__": # if __name__ == "__main__":
# # 修改这里的数字可以切换不同摄像头设备 # main()
# viewer = CameraViewer(device_index=1)
# viewer.start_stream()
# import os
# import pefile
# def list_dll_exports(dll_path):
# """解析 DLL 并返回导出的函数列表"""
# try:
# pe = pefile.PE(dll_path)
# exports = []
# if hasattr(pe, 'DIRECTORY_ENTRY_EXPORT'):
# for exp in pe.DIRECTORY_ENTRY_EXPORT.symbols:
# if exp.name:
# exports.append(exp.name.decode('utf-8'))
# else:
# exports.append(f"Ordinal_{exp.ordinal}")
# return exports
# except Exception as e:
# print(f"[错误] 无法解析 {dll_path}: {e}")
# return []
# def scan_directory_for_dll_functions(directory):
# """扫描目录下所有 DLL 文件并解析导出函数"""
# results = {}
# for root, _, files in os.walk(directory):
# for file in files:
# if file.lower().endswith(".dll"):
# dll_path = os.path.join(root, file)
# print(f"\n正在解析: {dll_path}")
# funcs = list_dll_exports(dll_path)
# results[dll_path] = funcs
# for func in funcs:
# print(f" {func}")
# return results
# if __name__ == "__main__":
# folder_path = r"D:\BodyBalanceEvaluation\backend\tests" # 这里改成你的 DLL 文件目录
# scan_directory_for_dll_functions(folder_path)
import ctypes
dll_path = r"D:\BodyBalanceEvaluation\backend\tests\SMiTSenseUsb-F3.0.dll"
mydll = ctypes.WinDLL(dll_path)
class FPMS_DEVICE_C(ctypes.Structure):
_fields_ = [
("mn", ctypes.c_uint8),
("sn", ctypes.c_char * 32),
("swVersion", ctypes.c_char * 32),
("rows", ctypes.c_uint16),
("cols", ctypes.c_uint16)
]
# 声明
mydll.fpms_usb_init.argtypes = [ctypes.c_int]
mydll.fpms_usb_init.restype = ctypes.c_int
mydll.fpms_usb_get_device_list.argtypes = [
ctypes.POINTER(FPMS_DEVICE_C),
ctypes.POINTER(ctypes.c_int)
]
mydll.fpms_usb_get_device_list.restype = ctypes.c_int
# 初始化
print("init:", mydll.fpms_usb_init(0))
# 获取设备列表
device_count = ctypes.c_int()
devices = (FPMS_DEVICE_C * 10)()
res = mydll.fpms_usb_get_device_list(devices, ctypes.byref(device_count))
print("get_device_list 返回值:", res, "设备数量:", device_count.value)
# 打印设备信息
for i in range(device_count.value):
dev = devices[i]
print(f"[设备 {i}] mn={dev.mn}, sn={dev.sn.decode(errors='ignore')}, "
f"swVersion={dev.swVersion.decode(errors='ignore')}, rows={dev.rows}, cols={dev.cols}")

View File

@ -0,0 +1,89 @@
import serial
import time
def checksum(data):
return sum(data[:-1]) & 0xFF
def parse_packet(data):
if len(data) != 11:
return None
if data[0] != 0x55:
return None
if checksum(data) != data[-1]:
print("校验失败")
return None
packet_type = data[1]
# 后 8 字节拆成 4 个 16 位有符号整数(小端序)
vals = [int.from_bytes(data[i:i+2], 'little', signed=True) for i in range(2, 10, 2)]
# if packet_type == 0x51: # 加速度 (单位 0.001g)
# ax, ay, az, temp = vals
# ax /= 1000
# ay /= 1000
# az /= 1000
# temp /= 100
# return f"加速度 (g): x={ax:.3f}, y={ay:.3f}, z={az:.3f}, 温度={temp:.2f}℃"
# elif packet_type == 0x52: # 角速度 (单位 0.01°/s)
# wx, wy, wz, temp = vals
# wx /= 100
# wy /= 100
# wz /= 100
# temp /= 100
# return f"角速度 (°/s): x={wx:.2f}, y={wy:.2f}, z={wz:.2f}, 温度={temp:.2f}℃"
if packet_type == 0x53: # 姿态角
rxl, pitchl, yawl, temp = vals # 注意这里 vals 已经是有符号整数
# 使用第一段代码里的比例系数
k_angle = 180.0
roll = rxl / 32768.0 * k_angle
pitch = pitchl / 32768.0 * k_angle
yaw = yawl / 32768.0 * k_angle
temp /= 100 # 温度依然是 0.01°C
return f"姿态角 (°): roll={roll:.2f}, pitch={pitch:.2f}, yaw={yaw:.2f}, 温度={temp:.2f}"
# elif packet_type == 0x54: # 磁力计 (单位 uT)
# mx, my, mz, temp = vals
# temp /= 100
# return f"磁力计 (uT): x={mx}, y={my}, z={mz}, 温度={temp:.2f}℃"
# elif packet_type == 0x56: # 气压 (单位 Pa)
# p1, p2, p3, temp = vals
# pressure = ((p1 & 0xFFFF) | ((p2 & 0xFFFF) << 16)) / 100
# temp /= 100
# return f"气压 (Pa): pressure={pressure:.2f}, 温度={temp:.2f}℃"
# else:
# return f"未知包类型: {packet_type:#04x}"
def read_imu(port='COM6', baudrate=9600):
ser = serial.Serial(port, baudrate, timeout=1)
buffer = bytearray()
try:
while True:
bytes_waiting = ser.in_waiting
if bytes_waiting:
data = ser.read(bytes_waiting)
buffer.extend(data)
while len(buffer) >= 11:
if buffer[0] != 0x55:
buffer.pop(0)
continue
packet = buffer[:11]
result = parse_packet(packet)
if result:
print(result)
buffer = buffer[11:]
time.sleep(0.01)
except KeyboardInterrupt:
print("程序终止")
finally:
ser.close()
if __name__ == "__main__":
read_imu(port='COM8', baudrate=9600)

View File

@ -19,9 +19,11 @@ camera_index = 0
camera_width = 640 camera_width = 640
camera_height = 480 camera_height = 480
camera_fps = 30 camera_fps = 30
imu_port = COM3 imu_port = COM9
imu_baudrate = 9600
pressure_port = COM4 pressure_port = COM4
[DETECTION] [DETECTION]
default_duration = 60 default_duration = 60
sampling_rate = 30 sampling_rate = 30

View File

@ -42,6 +42,12 @@ def setup_debug_logging():
logging.getLogger('socketio').setLevel(logging.DEBUG) logging.getLogger('socketio').setLevel(logging.DEBUG)
logging.getLogger('engineio').setLevel(logging.DEBUG) logging.getLogger('engineio').setLevel(logging.DEBUG)
# 禁用第三方库的详细日志
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('PIL.PngImagePlugin').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.info('调试日志已启用') logger.info('调试日志已启用')
return logger return logger