足底压力仪、足底可见光拔插重连功能提交

This commit is contained in:
zhaozilong12 2025-09-11 17:40:03 +08:00
parent 3b5fce9be5
commit 147c66ada6
4 changed files with 262 additions and 83 deletions

View File

@ -70,9 +70,11 @@ class CameraManager(BaseDevice):
self.fps_start_time = time.time() self.fps_start_time = time.time()
self.actual_fps = 0 self.actual_fps = 0
# 重连机制 # 重连与断连检测机制(-1 表示无限重连)
self.max_reconnect_attempts = 3 self.max_reconnect_attempts = int(config.get('max_reconnect_attempts', -1))
self.reconnect_delay = 2.0 self.reconnect_delay = float(config.get('reconnect_delay', 2.0))
self.read_fail_threshold = int(config.get('read_fail_threshold', 30))
self._last_connected_state = None
# 设备标识和性能统计 # 设备标识和性能统计
self.device_id = f"camera_{self.device_index}" self.device_id = f"camera_{self.device_index}"
@ -131,6 +133,7 @@ class CameraManager(BaseDevice):
return False return False
self.is_connected = True self.is_connected = True
self._last_connected_state = True
self._device_info.update({ self._device_info.update({
'device_index': self.device_index, 'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}", 'resolution': f"{self.width}x{self.height}",
@ -235,7 +238,7 @@ class CameraManager(BaseDevice):
for i in range(5): for i in range(5):
ret, _ = self.cap.read() ret, _ = self.cap.read()
if not ret: if not ret:
self.logger.warning(f"时读取第{i+1}帧失败") self.logger.warning(f"时读取第{i+1}帧失败")
self.logger.info("相机校准完成") self.logger.info("相机校准完成")
return True return True
@ -305,6 +308,7 @@ class CameraManager(BaseDevice):
self.logger.info("相机流工作线程启动") self.logger.info("相机流工作线程启动")
reconnect_attempts = 0 reconnect_attempts = 0
consecutive_read_failures = 0
# 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积 # 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积
frame_interval = 1.0 / max(self.fps, 1) frame_interval = 1.0 / max(self.fps, 1)
@ -313,24 +317,73 @@ class CameraManager(BaseDevice):
while self.is_streaming: while self.is_streaming:
loop_start = time.time() loop_start = time.time()
try: try:
# 如果设备未打开,进入重连流程
if not self.cap or not self.cap.isOpened(): if not self.cap or not self.cap.isOpened():
if reconnect_attempts < self.max_reconnect_attempts: # 仅在状态变化时广播一次断连状态
self.logger.warning(f"相机连接丢失,尝试重连 ({reconnect_attempts + 1}/{self.max_reconnect_attempts})") if self._last_connected_state is not False:
try:
self._socketio.emit('camera_status', {
'status': 'disconnected',
'device_id': self.device_id,
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = False
# 无限重连max_reconnect_attempts == -1否则按次数重试
if self.max_reconnect_attempts == -1 or reconnect_attempts < self.max_reconnect_attempts:
self.logger.warning(f"相机连接丢失,尝试重连 ({'' if self.max_reconnect_attempts == -1 else reconnect_attempts + 1}/{self.max_reconnect_attempts if self.max_reconnect_attempts != -1 else ''})")
if not self.is_streaming:
break
if self._reconnect(): if self._reconnect():
reconnect_attempts = 0 reconnect_attempts = 0
consecutive_read_failures = 0
# 广播恢复
try:
self._socketio.emit('camera_status', {
'status': 'connected',
'device_id': self.device_id,
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = True
continue continue
else: else:
reconnect_attempts += 1 reconnect_attempts += 1
time.sleep(self.reconnect_delay) time.sleep(self.reconnect_delay)
continue continue
else: else:
self.logger.error("相机重连失败次数过多,停止流") # 超过次数也不退出线程,降频重试,防止永久停机
break self.logger.error("相机重连失败次数过多,进入降频重试模式")
time.sleep(max(self.reconnect_delay, 5.0))
# 重置计数以便继续尝试
reconnect_attempts = 0
continue
ret, frame = self.cap.read() ret, frame = self.cap.read()
if not ret or frame is None: if not ret or frame is None:
consecutive_read_failures += 1
self.dropped_frames += 1 self.dropped_frames += 1
if consecutive_read_failures >= getattr(self, 'read_fail_threshold', 30):
self.logger.warning(f"连续读帧失败 {consecutive_read_failures} 次,执行相机软复位并进入重连")
try:
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
except Exception:
pass
# 进入下一轮循环会走到未打开分支
consecutive_read_failures = 0
time.sleep(self.reconnect_delay)
continue
if self.dropped_frames > 10: if self.dropped_frames > 10:
self.logger.warning(f"连续丢帧过多: {self.dropped_frames}") self.logger.warning(f"连续丢帧过多: {self.dropped_frames}")
# 仅在异常情况下触发一次GC避免高频强制GC # 仅在异常情况下触发一次GC避免高频强制GC
@ -340,10 +393,11 @@ class CameraManager(BaseDevice):
pass pass
self.dropped_frames = 0 self.dropped_frames = 0
# 防止空转占满CPU # 防止空转占满CPU
time.sleep(0.005) time.sleep(0.02)
continue continue
# 重置丢帧计数 # 读帧成功,重置失败计数
consecutive_read_failures = 0
self.dropped_frames = 0 self.dropped_frames = 0
# 保存原始帧到队列(用于录制) # 保存原始帧到队列(用于录制)
@ -366,10 +420,6 @@ class CameraManager(BaseDevice):
# 处理帧(降采样以优化传输负载) # 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame) processed_frame = self._process_frame(frame)
# # 缓存帧(不复制,减少内存占用)
# self.last_frame = processed_frame
# self.frame_cache.append(processed_frame)
# 发送帧数据 # 发送帧数据
self._send_frame_data(processed_frame) self._send_frame_data(processed_frame)
@ -378,7 +428,6 @@ class CameraManager(BaseDevice):
# 主动释放局部引用帮助GC更快识别可回收对象 # 主动释放局部引用帮助GC更快识别可回收对象
del frame del frame
# 注意processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身
# 限速保证不超过目标FPS减小发送端积压 # 限速保证不超过目标FPS减小发送端积压
now = time.time() now = time.time()
@ -394,7 +443,7 @@ class CameraManager(BaseDevice):
except Exception as e: except Exception as e:
self.logger.error(f"相机流处理异常: {e}") self.logger.error(f"相机流处理异常: {e}")
# 小退避,避免异常情况下空转 # 小退避,避免异常情况下空转
time.sleep(0.02) time.sleep(0.05)
self.logger.info("相机流工作线程结束") self.logger.info("相机流工作线程结束")
@ -601,6 +650,10 @@ class CameraManager(BaseDevice):
self.buffer_size = config.get('buffer_size', 1) self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG') self.fourcc = config.get('fourcc', 'MJPG')
self._tx_max_width = int(config.get('tx_max_width', 640)) self._tx_max_width = int(config.get('tx_max_width', 640))
# 新增:动态更新重连/阈值配置
self.max_reconnect_attempts = int(config.get('max_reconnect_attempts', self.max_reconnect_attempts))
self.reconnect_delay = float(config.get('reconnect_delay', self.reconnect_delay))
self.read_fail_threshold = int(config.get('read_fail_threshold', self.read_fail_threshold))
# 更新帧缓存队列大小 # 更新帧缓存队列大小
frame_cache_len = int(config.get('frame_cache_len', 2)) frame_cache_len = int(config.get('frame_cache_len', 2))

View File

@ -186,6 +186,18 @@ class RealPressureDevice:
r = self.dll.fpms_usb_read_frame_wrap(self.device_handle.value, self.buf, self.frame_size) r = self.dll.fpms_usb_read_frame_wrap(self.device_handle.value, self.buf, self.frame_size)
if r != 0: if r != 0:
logger.warning(f"读取帧失败, code= {r}") logger.warning(f"读取帧失败, code= {r}")
# 如果返回负数,多半表示物理断开或严重错误,标记断连并关闭句柄,触发上层重连
if r < 0:
try:
if self.device_handle:
try:
self.dll.fpms_usb_close_wrap(self.device_handle.value)
except Exception:
pass
self.device_handle = None
except Exception:
pass
self.is_connected = False
return self._get_empty_data() return self._get_empty_data()
# 转换为numpy数组 # 转换为numpy数组
@ -728,6 +740,12 @@ class PressureManager(BaseDevice):
self.error_count = 0 self.error_count = 0
self.last_data_time = None self.last_data_time = None
# 重连相关配置与camera_manager保持一致的键名和默认值
self.max_reconnect_attempts = int(self.config.get('max_reconnect_attempts', -1)) # -1 表示无限重连
self.reconnect_delay = float(self.config.get('reconnect_delay', 2.0))
self.read_fail_threshold = int(self.config.get('read_fail_threshold', 30))
self._last_connected_state = None # 去抖动状态广播
self.logger.info(f"压力板管理器初始化完成 - 设备类型: {self.device_type}") self.logger.info(f"压力板管理器初始化完成 - 设备类型: {self.device_type}")
def initialize(self) -> bool: def initialize(self) -> bool:
@ -825,14 +843,84 @@ class PressureManager(BaseDevice):
""" """
self.logger.info("压力数据流线程启动") self.logger.info("压力数据流线程启动")
reconnect_attempts = 0
consecutive_read_failures = 0
try: try:
while self.is_streaming and self.is_connected: while self.is_streaming:
try: try:
# 若设备未连接或不存在,进入重连流程
if not self.device or not self.is_connected or (hasattr(self.device, 'is_connected') and not self.device.is_connected):
# 广播断开状态(仅状态变化时)
if self._last_connected_state is not False:
try:
if self._socketio:
self._socketio.emit('pressure_status', {
'status': 'disconnected',
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = False
if self.max_reconnect_attempts == -1 or reconnect_attempts < self.max_reconnect_attempts:
self.logger.warning(f"压力设备连接丢失,尝试重连 ({'' if self.max_reconnect_attempts == -1 else reconnect_attempts + 1}/{self.max_reconnect_attempts if self.max_reconnect_attempts != -1 else ''})")
if not self.is_streaming:
break
if self._reconnect():
reconnect_attempts = 0
consecutive_read_failures = 0
# 广播恢复
try:
if self._socketio:
self._socketio.emit('pressure_status', {
'status': 'connected',
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = True
continue
else:
reconnect_attempts += 1
time.sleep(self.reconnect_delay)
continue
else:
self.logger.error("压力设备重连失败次数过多,进入降频重试模式")
time.sleep(max(self.reconnect_delay, 5.0))
reconnect_attempts = 0
continue
# 从设备读取数据 # 从设备读取数据
pressure_data = None
if self.device: if self.device:
pressure_data = self.device.read_data() pressure_data = self.device.read_data()
# 如果底层设备在读取时标记了断开,则在此处进入下一轮以触发重连
if hasattr(self.device, 'is_connected') and not self.device.is_connected:
self.is_connected = False
time.sleep(self.reconnect_delay)
continue
if not pressure_data or 'foot_pressure' not in pressure_data:
consecutive_read_failures += 1
if consecutive_read_failures >= self.read_fail_threshold:
self.logger.warning(f"连续读取压力数据失败 {consecutive_read_failures} 次,执行设备软复位并进入重连")
try:
if self.device and hasattr(self.device, 'close'):
self.device.close()
except Exception:
pass
self.is_connected = False
consecutive_read_failures = 0
time.sleep(self.reconnect_delay)
continue
time.sleep(0.05)
continue
# 读数成功,重置失败计数
consecutive_read_failures = 0
self.is_connected = True
if pressure_data and 'foot_pressure' in pressure_data:
foot_pressure = pressure_data['foot_pressure'] foot_pressure = pressure_data['foot_pressure']
# 获取各区域压力值 # 获取各区域压力值
left_front = foot_pressure['left_front'] left_front = foot_pressure['left_front']
@ -857,7 +945,6 @@ class PressureManager(BaseDevice):
# 构建完整的足部压力数据 # 构建完整的足部压力数据
complete_pressure_data = { complete_pressure_data = {
# 分区压力值
'pressure_zones': { 'pressure_zones': {
'left_front': left_front, 'left_front': left_front,
'left_rear': left_rear, 'left_rear': left_rear,
@ -867,7 +954,6 @@ class PressureManager(BaseDevice):
'right_total': right_total, 'right_total': right_total,
'total_pressure': total_pressure 'total_pressure': total_pressure
}, },
# 平衡分析
'balance_analysis': { 'balance_analysis': {
'balance_ratio': round(balance_ratio, 3), 'balance_ratio': round(balance_ratio, 3),
'pressure_center_offset': round(pressure_center_offset, 2), 'pressure_center_offset': round(pressure_center_offset, 2),
@ -875,7 +961,6 @@ class PressureManager(BaseDevice):
'left_front_ratio': round(left_front_ratio, 3), 'left_front_ratio': round(left_front_ratio, 3),
'right_front_ratio': round(right_front_ratio, 3) 'right_front_ratio': round(right_front_ratio, 3)
}, },
# 压力图片
'pressure_image': pressure_data.get('pressure_image', ''), 'pressure_image': pressure_data.get('pressure_image', ''),
'timestamp': pressure_data['timestamp'] 'timestamp': pressure_data['timestamp']
} }
@ -905,6 +990,42 @@ class PressureManager(BaseDevice):
finally: finally:
self.logger.info("压力数据流线程结束") self.logger.info("压力数据流线程结束")
def _reconnect(self) -> bool:
"""重新连接压力设备"""
try:
# 先清理旧设备
try:
if self.device and hasattr(self.device, 'close'):
self.device.close()
except Exception:
pass
self.device = None
self.is_connected = False
time.sleep(1.0) # 等待设备释放
# 重新创建设备
if self.device_type == 'real':
self.device = RealPressureDevice()
else:
self.device = MockPressureDevice()
self.is_connected = True
# 广播一次连接状态
try:
if self._socketio:
self._socketio.emit('pressure_status', {
'status': 'connected',
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
return True
except Exception as e:
self.logger.error(f"压力设备重连失败: {e}")
self.is_connected = False
return False
def get_status(self) -> Dict[str, Any]: def get_status(self) -> Dict[str, Any]:
""" """
获取设备状态 获取设备状态
@ -990,6 +1111,10 @@ class PressureManager(BaseDevice):
self.config = new_config self.config = new_config
self.device_type = new_config.get('device_type', 'mock') self.device_type = new_config.get('device_type', 'mock')
self.stream_interval = new_config.get('stream_interval', 0.1) self.stream_interval = new_config.get('stream_interval', 0.1)
# 动态更新重连参数
self.max_reconnect_attempts = int(new_config.get('max_reconnect_attempts', self.max_reconnect_attempts))
self.reconnect_delay = float(new_config.get('reconnect_delay', self.reconnect_delay))
self.read_fail_threshold = int(new_config.get('read_fail_threshold', self.read_fail_threshold))
self.logger.info(f"压力板配置重新加载成功 - 设备类型: {self.device_type}, 流间隔: {self.stream_interval}") self.logger.info(f"压力板配置重新加载成功 - 设备类型: {self.device_type}, 流间隔: {self.stream_interval}")
return True return True

View File

@ -15,7 +15,7 @@ backup_interval = 24
max_backups = 7 max_backups = 7
[CAMERA] [CAMERA]
device_index = 1 device_index = 3
width = 1280 width = 1280
height = 720 height = 720
fps = 30 fps = 30
@ -33,8 +33,9 @@ fps = 15
synchronized_images_only = False synchronized_images_only = False
[DEVICES] [DEVICES]
imu_device_type = real imu_device_type = ble
imu_port = COM9 imu_port = COM9
imu_mac_address = ef:3c:1a:0a:fe:02
imu_baudrate = 9600 imu_baudrate = 9600
pressure_device_type = real pressure_device_type = real
pressure_use_mock = False pressure_use_mock = False

View File

@ -30,7 +30,7 @@ class CameraViewer:
if __name__ == "__main__": if __name__ == "__main__":
# 修改这里的数字可以切换不同摄像头设备 # 修改这里的数字可以切换不同摄像头设备
viewer = CameraViewer(device_index=1) viewer = CameraViewer(device_index=3)
viewer.start_stream() viewer.start_stream()
# import ctypes # import ctypes