BodyBalanceEvaluation/backend/devices/femtobolt_manager.py
2025-08-19 18:15:56 +08:00

887 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
FemtoBolt深度相机管理器
负责FemtoBolt深度相机的连接、配置和深度图像数据采集
"""
import os
import sys
import threading
import time
import base64
import numpy as np
import cv2
from typing import Optional, Dict, Any, Tuple
import logging
from collections import deque
import gc
from matplotlib.colors import LinearSegmentedColormap
try:
from .base_device import BaseDevice
from .utils.socket_manager import SocketManager
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.socket_manager import SocketManager
from utils.config_manager import ConfigManager
class FemtoBoltManager(BaseDevice):
"""FemtoBolt深度相机管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化FemtoBolt管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
self.config = self.config_manager.get_device_config('femtobolt')
# 调用父类初始化
super().__init__("femtobolt", self.config)
# 设置SocketIO实例
self.set_socketio(socketio)
# 设备信息字典
self.device_info = {}
# 设备ID
self.device_id = "femtobolt_001"
# 性能统计
self.performance_stats = {
'fps': 0.0,
'frame_count': 0,
'dropped_frames': 0,
'processing_time': 0.0
}
# FemtoBolt SDK相关
self.femtobolt = None
self.device_handle = None
self.sdk_initialized = False
# 设备配置
self.color_resolution = self.config.get('color_resolution', '1080P')
self.depth_mode = self.config.get('depth_mode', 'NFOV_UNBINNED')
self.fps = self.config.get('fps', 15)
self.depth_range_min = self.config.get('depth_range_min', 500)
self.depth_range_max = self.config.get('depth_range_max', 4500)
self.synchronized_images_only = self.config.get('synchronized_images_only', False)
# 数据处理
self.streaming_thread = None
self.depth_frame_cache = deque(maxlen=10)
self.color_frame_cache = deque(maxlen=10)
self.last_depth_frame = None
self.last_color_frame = None
self.frame_count = 0
# 图像处理参数
self.contrast_factor = 1.2
self.gamma_value = 0.8
self.use_pseudo_color = True
# 性能监控
self.fps_counter = 0
self.fps_start_time = time.time()
self.actual_fps = 0
self.dropped_frames = 0
# 重连机制
self.max_reconnect_attempts = 3
self.reconnect_delay = 3.0
# 发送频率控制(内存优化)
self.send_fps = self.config.get('send_fps', 20) # 默认20FPS发送
self._min_send_interval = 1.0 / self.send_fps if self.send_fps > 0 else 0.05
self._last_send_time = 0
# 编码参数缓存(避免每帧创建数组)
self._encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), int(self.config.get('jpeg_quality', 80))]
# 预计算伽马LUT避免每帧计算
self._gamma_lut = None
self._current_gamma = None
self._update_gamma_lut()
# 预生成网格背景(避免每帧创建)
self._grid_bg = None
self._grid_size = (480, 640) # 默认尺寸
# 自定义彩虹色 colormap参考testfemtobolt.py
colors = ['fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue',
'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue']
self.custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors)
self.logger.info("FemtoBolt管理器初始化完成")
def _update_gamma_lut(self):
"""更新伽马校正查找表"""
if self._current_gamma != self.gamma_value:
self._gamma_lut = np.array([((i / 255.0) ** self.gamma_value) * 255 for i in range(256)]).astype("uint8")
self._current_gamma = self.gamma_value
def initialize(self) -> bool:
"""
初始化FemtoBolt设备
Returns:
bool: 初始化是否成功
"""
try:
self.logger.info("正在初始化FemtoBolt设备...")
# 初始化SDK
if not self._initialize_sdk():
raise Exception("SDK初始化失败")
# 配置设备
if not self._configure_device():
raise Exception("设备配置失败")
# 启动设备
if not self._start_device():
raise Exception("设备启动失败")
self.is_connected = True
self.device_info.update({
'color_resolution': self.color_resolution,
'depth_mode': self.depth_mode,
'fps': self.fps,
'depth_range': f"{self.depth_range_min}-{self.depth_range_max}mm"
})
self.logger.info("FemtoBolt初始化成功")
return True
except Exception as e:
self.logger.error(f"FemtoBolt初始化失败: {e}")
self.is_connected = False
self._cleanup_device()
return False
def _initialize_sdk(self) -> bool:
"""
初始化FemtoBolt SDK (使用pykinect_azure)
Returns:
bool: SDK初始化是否成功
"""
try:
# 尝试导入pykinect_azure
real_pykinect = None
try:
import pykinect_azure as pykinect
real_pykinect = pykinect
self.logger.info("成功导入pykinect_azure库")
except ImportError as e:
self.logger.warning(f"无法导入pykinect_azure库使用模拟模式: {e}")
self.pykinect = self._create_mock_pykinect()
self.sdk_initialized = True
return True
# 查找并初始化SDK路径
sdk_initialized = False
if real_pykinect and hasattr(real_pykinect, 'initialize_libraries'):
sdk_paths = self._get_femtobolt_sdk_paths()
for sdk_path in sdk_paths:
if os.path.exists(sdk_path):
try:
real_pykinect.initialize_libraries(track_body=False, module_k4a_path=sdk_path)
self.logger.info(f'✓ 成功使用FemtoBolt SDK: {sdk_path}')
self.pykinect = real_pykinect
sdk_initialized = True
break
except Exception as e:
self.logger.warning(f'✗ FemtoBolt SDK路径失败: {sdk_path} - {e}')
continue
if not sdk_initialized:
self.logger.info('未找到真实SDK使用模拟模式')
self.pykinect = self._create_mock_pykinect()
self.sdk_initialized = True
return True
except Exception as e:
self.logger.error(f"SDK初始化失败: {e}")
return False
def _get_femtobolt_sdk_paths(self) -> list:
import platform
sdk_paths = []
if platform.system() == "Windows":
# 优先使用Orbbec SDK K4A Wrapper与azure_kinect_image_example.py一致
base_dir = os.path.dirname(os.path.abspath(__file__))
dll_path = os.path.join(base_dir,"..", "dll","femtobolt","bin", "k4a.dll")
self.logger.info(f"FemtoBolt SDK路径: {dll_path}")
sdk_paths.append(dll_path)
return sdk_paths
def _create_mock_pykinect(self):
"""
创建模拟pykinect_azure用于测试
Returns:
Mock pykinect对象
"""
class MockPyKinect:
def __init__(self):
self.default_configuration = self._create_mock_config()
def initialize_libraries(self, track_body=False, module_k4a_path=None):
pass
def start_device(self, config=None):
return MockDevice()
def _create_mock_config(self):
class MockConfig:
def __init__(self):
self.depth_mode = 'NFOV_UNBINNED'
self.camera_fps = 15
self.synchronized_images_only = False
self.color_resolution = 0
return MockConfig()
# 添加常量
K4A_DEPTH_MODE_NFOV_UNBINNED = 'NFOV_UNBINNED'
K4A_FRAMES_PER_SECOND_15 = 15
class MockDevice:
def __init__(self):
self.is_started = True
def update(self):
return MockCapture()
def stop(self):
self.is_started = False
def close(self):
pass
class MockCapture:
def __init__(self):
pass
def get_depth_image(self):
# 生成模拟深度图像
height, width = 480, 640
depth_image = np.full((height, width), 2000, dtype=np.uint16)
# 添加人体轮廓
center_x = width // 2
center_y = height // 2
# 头部
cv2.circle(depth_image, (center_x, center_y - 100), 40, 1500, -1)
# 身体
cv2.rectangle(depth_image, (center_x - 50, center_y - 60),
(center_x + 50, center_y + 100), 1600, -1)
# 手臂
cv2.rectangle(depth_image, (center_x - 80, center_y - 40),
(center_x - 50, center_y + 20), 1700, -1)
cv2.rectangle(depth_image, (center_x + 50, center_y - 40),
(center_x + 80, center_y + 20), 1700, -1)
return True, depth_image
def get_color_image(self):
return None
return MockPyKinect()
def _configure_device(self) -> bool:
"""
配置FemtoBolt设备
Returns:
bool: 配置是否成功
"""
try:
if not self.pykinect:
return False
# 配置FemtoBolt设备参数
self.femtobolt_config = self.pykinect.default_configuration
self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_UNBINNED
self.femtobolt_config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_15
self.femtobolt_config.synchronized_images_only = False
self.femtobolt_config.color_resolution = 0
self.logger.info(f"FemtoBolt设备配置完成 - 深度模式: {self.depth_mode}, FPS: {self.fps}")
return True
except Exception as e:
self.logger.error(f"FemtoBolt设备配置失败: {e}")
return False
def _start_device(self) -> bool:
"""
启动FemtoBolt设备
Returns:
bool: 启动是否成功
"""
try:
# 启动FemtoBolt设备
self.logger.info(f'尝试启动FemtoBolt设备...')
if hasattr(self.pykinect, 'start_device'):
# 真实设备模式
self.device_handle = self.pykinect.start_device(config=self.femtobolt_config)
if self.device_handle:
self.logger.info('✓ FemtoBolt深度相机初始化成功!')
else:
raise Exception('设备启动返回None')
else:
# 模拟设备模式
self.device_handle = self.pykinect.start_device(config=self.femtobolt_config)
self.logger.info('✓ FemtoBolt深度相机模拟模式启动成功!')
# 等待设备稳定
time.sleep(1.0)
# 测试捕获
if not self._test_capture():
raise Exception("设备捕获测试失败")
self.logger.info("FemtoBolt设备启动成功")
return True
except Exception as e:
self.logger.error(f"FemtoBolt设备启动失败: {e}")
return False
def _test_capture(self) -> bool:
"""
测试设备捕获
Returns:
bool: 测试是否成功
"""
try:
for i in range(3):
capture = self.device_handle.update()
if capture:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
self.logger.info(f"FemtoBolt捕获测试成功 - 深度图像大小: {depth_image.shape}")
return True
time.sleep(0.1)
self.logger.error("FemtoBolt捕获测试失败")
return False
except Exception as e:
self.logger.error(f"FemtoBolt捕获测试异常: {e}")
return False
def calibrate(self) -> bool:
"""
校准FemtoBolt设备
Returns:
bool: 校准是否成功
"""
try:
self.logger.info("开始FemtoBolt校准...")
if not self.is_connected:
if not self.initialize():
return False
# 对于FemtoBolt校准主要是验证设备工作状态
# 捕获几帧来确保设备稳定
for i in range(10):
capture = self.device_handle.get_capture()
if capture:
depth_image = capture.get_depth_image()
if depth_image is not None:
# 检查深度图像质量
valid_pixels = np.sum((depth_image >= self.depth_range_min) &
(depth_image <= self.depth_range_max))
total_pixels = depth_image.size
valid_ratio = valid_pixels / total_pixels
if valid_ratio > 0.1: # 至少10%的像素有效
self.logger.info(f"校准帧 {i+1}: 有效像素比例 {valid_ratio:.2%}")
else:
self.logger.warning(f"校准帧 {i+1}: 有效像素比例过低 {valid_ratio:.2%}")
capture.release()
else:
self.logger.warning(f"校时时无法获取第{i+1}")
time.sleep(0.1)
self.logger.info("FemtoBolt校准完成")
return True
except Exception as e:
self.logger.error(f"FemtoBolt校准失败: {e}")
return False
def start_streaming(self) -> bool:
"""
开始数据流推送
Returns:
bool: 启动是否成功
"""
if self.is_streaming:
self.logger.warning("FemtoBolt流已在运行")
return True
if not self.is_connected:
if not self.initialize():
return False
try:
self.is_streaming = True
self.streaming_thread = threading.Thread(
target=self._streaming_worker,
name="FemtoBolt-Stream",
daemon=True
)
self.streaming_thread.start()
self.logger.info("FemtoBolt流启动成功")
return True
except Exception as e:
self.logger.error(f"启动FemtoBolt流失败: {e}")
self.is_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止数据流推送
Returns:
bool: 停止是否成功
"""
try:
self.is_streaming = False
if self.streaming_thread and self.streaming_thread.is_alive():
self.streaming_thread.join(timeout=5.0)
self.logger.info("FemtoBolt流已停止")
return True
except Exception as e:
self.logger.error(f"停止FemtoBolt流失败: {e}")
return False
def _streaming_worker(self):
"""
流处理工作线程
"""
self.logger.info("FemtoBolt流工作线程启动")
frame_count = 0
try:
while self.is_streaming:
# 发送频率限制
now = time.time()
if now - self._last_send_time < self._min_send_interval:
time.sleep(0.001)
continue
if self.device_handle and self._socketio:
try:
capture = self.device_handle.update()
if capture is not None:
try:
ret, depth_image = capture.get_depth_image()
if ret and depth_image is not None:
# 确保二维数据
if depth_image.ndim == 3 and depth_image.shape[2] == 1:
depth_image = depth_image[:, :, 0]
rows, cols = depth_image.shape[:2]
# 生成或复用网格背景
if (self._grid_bg is None) or (self._grid_size != (rows, cols)):
bg = np.ones((rows, cols, 3), dtype=np.uint8) * 128
cell_size = 50
grid_color = (255, 255, 255)
grid = np.zeros_like(bg)
for x in range(0, cols, cell_size):
cv2.line(grid, (x, 0), (x, rows), grid_color, 1)
for y in range(0, rows, cell_size):
cv2.line(grid, (0, y), (cols, y), grid_color, 1)
mask_grid = (grid.sum(axis=2) > 0)
bg[mask_grid] = grid[mask_grid]
self._grid_bg = bg
self._grid_size = (rows, cols)
background = self._grid_bg.copy()
# 生成深度掩码,仅保留指定范围内的像素
mask_valid = (depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max)
depth_clipped = np.clip(depth_image, self.depth_range_min, self.depth_range_max)
normed = (depth_clipped.astype(np.float32) - self.depth_range_min) / (self.depth_range_max - self.depth_range_min)
# 反转映射,保证颜色方向与之前一致
normed = 1.0 - normed
# 应用自定义 colormap将深度值映射到 RGB
rgba = self.custom_cmap(normed)
rgb = (rgba[..., :3] * 255).astype(np.uint8)
# 叠加:在背景上覆盖彩色深度图(掩码处不覆盖,保留灰色背景+网格)
depth_colored_final = background.copy()
depth_colored_final[mask_valid] = rgb[mask_valid]
# 裁剪宽度
height, width = depth_colored_final.shape[:2]
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
depth_colored_final = depth_colored_final[:, left:right]
# 推送SocketIO
success, buffer = cv2.imencode('.jpg', depth_colored_final, self._encode_param)
if success and self._socketio:
jpg_as_text = base64.b64encode(memoryview(buffer).tobytes()).decode('utf-8')
self._socketio.emit('femtobolt_frame', {
'depth_image': jpg_as_text,
'frame_count': frame_count,
'timestamp': now,
'fps': self.actual_fps,
'device_id': self.device_id,
'depth_range': {
'min': self.depth_range_min,
'max': self.depth_range_max
}
}, namespace='/devices')
frame_count += 1
self._last_send_time = now
# 更新统计
self._update_statistics()
else:
time.sleep(0.005)
except Exception as e:
# 捕获处理过程中出现异常,记录并继续
self.logger.error(f"FemtoBolt捕获处理错误: {e}")
finally:
# 无论处理成功与否都应释放capture以回收内存:contentReference[oaicite:3]{index=3}
try:
if hasattr(capture, 'release'):
capture.release()
except Exception:
pass
else:
time.sleep(0.005)
except Exception as e:
self.logger.error(f'FemtoBolt帧推送失败: {e}')
time.sleep(0.05)
# 降低空转CPU
time.sleep(0.001)
except Exception as e:
self.logger.error(f"FemtoBolt流处理异常: {e}")
finally:
self.is_streaming = False
self.logger.info("FemtoBolt流工作线程结束")
def _process_depth_image(self, depth_image) -> np.ndarray:
"""
处理深度图像采用testfemtobolt.py的渲染方式
"""
try:
if not isinstance(depth_image, np.ndarray):
self.logger.error(f"输入的深度图像不是numpy数组: {type(depth_image)}")
return np.zeros((480, 640, 3), dtype=np.uint8)
# 确保二维数据
if depth_image.ndim == 3 and depth_image.shape[2] == 1:
depth_image = depth_image[:, :, 0]
h, w = depth_image.shape
# 生成灰色背景和白色网格参考testfemtobolt.py
background = np.full((h, w, 3), 128, dtype=np.uint8) # 灰色背景
# 绘制网格线
for x in range(0, w, 50): # 每50像素一条竖线
cv2.line(background, (x, 0), (x, h-1), (255, 255, 255), 1)
for y in range(0, h, 50): # 每50像素一条横线
cv2.line(background, (0, y), (w-1, y), (255, 255, 255), 1)
# 生成深度掩码,仅保留指定范围内的像素
mask_valid = (depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max)
depth_clipped = np.clip(depth_image, self.depth_range_min, self.depth_range_max)
normed = (depth_clipped.astype(np.float32) - self.depth_range_min) / (self.depth_range_max - self.depth_range_min)
# 反转映射,保证颜色方向与之前一致
normed = 1.0 - normed
# 应用自定义 colormap将深度值映射到 RGB
rgba = self.custom_cmap(normed)
rgb = (rgba[..., :3] * 255).astype(np.uint8)
# 叠加:在背景上覆盖彩色深度图(掩码处不覆盖,保留灰色背景+网格)
final_img = background.copy()
final_img[mask_valid] = rgb[mask_valid]
# 裁剪宽度(保持原有功能)
height, width = final_img.shape[:2]
target_width = height // 2
if width > target_width:
left = (width - target_width) // 2
right = left + target_width
final_img = final_img[:, left:right]
return final_img
except Exception as e:
self.logger.error(f"处理深度图像失败: {e}")
return np.zeros((480, 640, 3), dtype=np.uint8)
def _send_depth_data(self, depth_image: np.ndarray, color_image: Optional[np.ndarray] = None):
try:
_, depth_buffer = cv2.imencode('.jpg', depth_image, self._encode_param)
depth_data = base64.b64encode(memoryview(depth_buffer).tobytes()).decode('utf-8')
send_data = {
'timestamp': time.time(),
'frame_count': self.frame_count,
'depth_image': depth_data,
'fps': self.actual_fps,
'device_id': self.device_id,
'depth_range': {
'min': self.depth_range_min,
'max': self.depth_range_max
},
'last_update': time.strftime('%H:%M:%S')
}
if color_image is not None:
_, color_buffer = cv2.imencode('.jpg', color_image, self._encode_param)
color_data = base64.b64encode(memoryview(color_buffer).tobytes()).decode('utf-8')
send_data['color_image'] = color_data
self._socketio.emit('femtobolt_frame', send_data, namespace='/devices')
except Exception as e:
self.logger.error(f"发送深度数据失败: {e}")
def _update_statistics(self):
"""
更新性能统计
"""
self.frame_count += 1
self.fps_counter += 1
# 每秒计算一次实际FPS
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.actual_fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 更新性能统计
self.performance_stats.update({
'frames_processed': self.frame_count,
'actual_fps': round(self.actual_fps, 2),
'dropped_frames': self.dropped_frames
})
def _reconnect(self) -> bool:
"""
重新连接FemtoBolt设备
Returns:
bool: 重连是否成功
"""
try:
self._cleanup_device()
time.sleep(2.0) # 等待设备释放
return self.initialize()
except Exception as e:
self.logger.error(f"FemtoBolt重连失败: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
status = super().get_status()
status.update({
'color_resolution': self.color_resolution,
'depth_mode': self.depth_mode,
'target_fps': self.fps,
'actual_fps': self.actual_fps,
'frame_count': self.frame_count,
'dropped_frames': self.dropped_frames,
'depth_range': f"{self.depth_range_min}-{self.depth_range_max}mm",
'has_depth_frame': self.last_depth_frame is not None,
'has_color_frame': self.last_color_frame is not None
})
return status
def capture_body_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]:
"""
捕获身体图像
Args:
save_path: 保存路径(可选)
Returns:
Optional[np.ndarray]: 捕获的图像失败返回None
"""
try:
if not self.is_connected or not self.device_handle:
self.logger.error("FemtoBolt设备未连接")
return None
capture = self.device_handle.get_capture()
if not capture:
self.logger.error("无法获取FemtoBolt捕获")
return None
depth_image = capture.get_depth_image()
if depth_image is None:
self.logger.error("无法获取深度图像")
capture.release()
return None
# 处理深度图像
processed_image = self._process_depth_image(depth_image)
if save_path:
cv2.imwrite(save_path, processed_image)
self.logger.info(f"身体图像已保存到: {save_path}")
capture.release()
return processed_image
except Exception as e:
self.logger.error(f"捕获身体图像异常: {e}")
return None
def get_latest_depth_frame(self) -> Optional[np.ndarray]:
"""
获取最新深度帧
Returns:
Optional[np.ndarray]: 最新深度帧无帧返回None
"""
return self.last_depth_frame.copy() if self.last_depth_frame is not None else None
def get_latest_color_frame(self) -> Optional[np.ndarray]:
"""
获取最新彩色帧
Returns:
Optional[np.ndarray]: 最新彩色帧无帧返回None
"""
return self.last_color_frame.copy() if self.last_color_frame is not None else None
def collect_body_pose_data(self) -> Optional[Dict[str, Any]]:
"""
采集身体姿态数据(兼容原接口)
Returns:
Optional[Dict[str, Any]]: 身体姿态数据
"""
# 这里可以集成姿态估计算法
# 目前返回模拟数据
if not self.last_depth_frame is not None:
return None
# 模拟身体姿态数据
mock_keypoints = [
{'name': 'head', 'x': 320, 'y': 100, 'confidence': 0.9},
{'name': 'neck', 'x': 320, 'y': 150, 'confidence': 0.8},
{'name': 'left_shoulder', 'x': 280, 'y': 160, 'confidence': 0.7},
{'name': 'right_shoulder', 'x': 360, 'y': 160, 'confidence': 0.7},
{'name': 'left_hip', 'x': 300, 'y': 300, 'confidence': 0.6},
{'name': 'right_hip', 'x': 340, 'y': 300, 'confidence': 0.6}
]
return {
'timestamp': time.time(),
'keypoints': mock_keypoints,
'balance_score': np.random.uniform(0.7, 0.9),
'center_of_mass': {'x': 320, 'y': 240},
'device_id': self.device_id
}
def _cleanup_device(self):
"""
清理设备资源
"""
try:
if self.device_handle:
# 尝试停止设备如果有stop方法
if hasattr(self.device_handle, 'stop'):
try:
self.device_handle.stop()
self.logger.info("FemtoBolt设备已停止")
except Exception as e:
self.logger.warning(f"停止FemtoBolt设备时出现警告: {e}")
# 尝试关闭设备如果有close方法
if hasattr(self.device_handle, 'close'):
try:
self.device_handle.close()
self.logger.info("FemtoBolt设备连接已关闭")
except Exception as e:
self.logger.warning(f"关闭FemtoBolt设备时出现警告: {e}")
self.device_handle = None
except Exception as e:
self.logger.error(f"清理FemtoBolt设备失败: {e}")
def disconnect(self):
"""
断开FemtoBolt设备连接
"""
try:
self.stop_streaming()
self._cleanup_device()
self.is_connected = False
self.logger.info("FemtoBolt设备已断开连接")
except Exception as e:
self.logger.error(f"断开FemtoBolt设备连接失败: {e}")
def cleanup(self):
"""
清理资源
"""
try:
self.stop_streaming()
self._cleanup_device()
self.depth_frame_cache.clear()
self.color_frame_cache.clear()
self.last_depth_frame = None
self.last_color_frame = None
super().cleanup()
self.logger.info("FemtoBolt资源清理完成")
except Exception as e:
self.logger.error(f"清理FemtoBolt资源失败: {e}")