#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ FemtoBolt深度相机管理器 负责FemtoBolt深度相机的连接、配置和深度图像数据采集 """ import os import sys import threading import time import base64 import numpy as np import cv2 from typing import Optional, Dict, Any, Tuple import logging from collections import deque import gc from matplotlib.colors import LinearSegmentedColormap import matplotlib.pyplot as plt import matplotlib from scipy import ndimage from scipy.interpolate import griddata import io import importlib from typing import Optional import subprocess import tempfile try: from .base_device import BaseDevice from .utils.socket_manager import SocketManager from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.socket_manager import SocketManager from utils.config_manager import ConfigManager class FemtoBoltManager(BaseDevice): """FemtoBolt深度相机管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化FemtoBolt管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() self.config = self.config_manager.get_device_config('femtobolt') # 调用父类初始化 super().__init__("femtobolt", self.config) # 设置SocketIO实例 self.set_socketio(socketio) # 设备信息字典 self.device_info = {} # 设备ID self.device_id = "femtobolt_001" # 性能统计 self.performance_stats = { 'fps': 0.0, 'frame_count': 0, 'dropped_frames': 0, 'processing_time': 0.0 } # FemtoBolt SDK相关 self.femtobolt = None self.device_handle = None self.sdk_initialized = False # 新增:记录已使用的k4a.dll路径,供子进程探测使用 self.k4a_dll_path: Optional[str] = None # 设备配置 self.algorithm_type = self.config.get('algorithm_type', 'opencv') self.color_resolution = self.config.get('color_resolution', '1080P') self.depth_mode = self.config.get('depth_mode', 'NFOV_2X2BINNED') self.color_format = self.config.get('color_format', 'COLOR_BGRA32') self.fps = self.config.get('camera_fps', 20) self.depth_range_min = self.config.get('depth_range_min', 500) self.depth_range_max = self.config.get('depth_range_max', 4500) self.synchronized_images_only = self.config.get('synchronized_images_only', False) # 数据处理 self.streaming_thread = None self.depth_frame_cache = deque(maxlen=10) self.color_frame_cache = deque(maxlen=10) self.last_depth_frame = None self.last_color_frame = None self.frame_count = 0 # 图像处理参数 self.contrast_factor = 1.2 self.gamma_value = 0.8 self.use_pseudo_color = True # 性能监控 self.fps_counter = 0 # 图像渲染缓存 self.background = None self.output_buffer = None self._depth_filtered = None # 用于复用深度图过滤结果 self._blur_buffer = None # 用于复用高斯模糊结果 self._current_gamma = None self.fps_start_time = time.time() self.actual_fps = 0 self.dropped_frames = 0 # 重连机制 self.max_reconnect_attempts = 3 self.reconnect_delay = 3.0 # 发送频率控制(内存优化) self.send_fps = self.config.get('send_fps', 15) # 默认20FPS发送 self._min_send_interval = 1.0 / self.send_fps if self.send_fps > 0 else 0.05 self._last_send_time = 0 # 编码参数缓存(避免每帧创建数组)- 为plt模式优化编码质量 jpeg_quality = 40 if self.algorithm_type == 'plt' else int(self.config.get('jpeg_quality', 60)) self._encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_quality] # 预计算伽马LUT(避免每帧计算) self._gamma_lut = None self._current_gamma = None self._update_gamma_lut() # 预分配缓冲区和变量(减少内存分配) self._grid_background = None self._reusable_buffer = None self._crop_params = None # 缓存裁剪参数 # 帧预测和插值机制 self._last_frame = None self._frame_history = deque(maxlen=3) # 保存最近3帧用于预测 self._prediction_enabled = True if self.algorithm_type == 'plt' else False self._grid_bg = None self._grid_size = (480, 640) # 默认尺寸 self.background = None # 用于缓存等高线渲染的背景 # 自定义彩虹色 colormap(参考testfemtobolt.py) colors = [ 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue', 'fuchsia', 'red', 'yellow', 'lime', 'cyan', 'blue'] self.custom_cmap = LinearSegmentedColormap.from_list("custom_cmap", colors) # 设置matplotlib为非交互模式 matplotlib.use('Agg') # 创建matplotlib图形对象(复用以提高性能) self.fig, self.ax = plt.subplots(figsize=(9, 6)) self.ax.set_aspect('equal') plt.subplots_adjust(left=0, right=1, top=1, bottom=0) self.logger.info(f"FemtoBolt设备配置完成 - 算法类型: {self.algorithm_type}, 深度模式: {self.depth_mode}, FPS: {self.fps}") def _update_gamma_lut(self): """更新伽马校正查找表""" if self._current_gamma != self.gamma_value: self._gamma_lut = np.array([((i / 255.0) ** (1.0 / self.gamma_value)) * 255 for i in range(256)], dtype=np.uint8) self._current_gamma = self.gamma_value def _generate_contour_image_opencv(self, depth): """改进版 OpenCV 等高线渲染,梯度平滑、局部对比增强""" try: # 初始化 depth_filtered 缓冲区 if self._depth_filtered is None or self._depth_filtered.shape != depth.shape: self._depth_filtered = np.zeros_like(depth, dtype=np.uint16) np.copyto(self._depth_filtered, depth) # 直接覆盖,不生成新数组 depth_filtered = self._depth_filtered depth_filtered[depth_filtered > self.depth_range_max] = 0 depth_filtered[depth_filtered < self.depth_range_min] = 0 height, width = depth_filtered.shape # 背景缓存 if self.background is None or self.background.shape[:2] != (height, width): background_gray = int(0.5 * 255 * 0.3 + 255 * (1 - 0.3)) self.background = np.ones((height, width, 3), dtype=np.uint8) * background_gray grid_spacing = max(height // 20, width // 20, 10) for x in range(0, width, grid_spacing): cv2.line(self.background, (x, 0), (x, height-1), (255, 255, 255), 1) for y in range(0, height, grid_spacing): cv2.line(self.background, (0, y), (width-1, y), (255, 255, 255), 1) # 初始化输出缓存和模糊缓存 self.output_buffer = np.empty_like(self.background) self._blur_buffer = np.empty_like(self.background) # 复用输出缓存,避免 copy() np.copyto(self.output_buffer, self.background) output = self.output_buffer valid_mask = depth_filtered > 0 if np.any(valid_mask): # 连续归一化深度值 norm_depth = np.zeros_like(depth_filtered, dtype=np.float32) norm_depth[valid_mask] = (depth_filtered[valid_mask] - self.depth_range_min) / (self.depth_range_max - self.depth_range_min) norm_depth = np.clip(norm_depth, 0, 1) ** 0.8 # Gamma增强 # 使用 colormap 映射 cmap_colors = (self.custom_cmap(norm_depth)[..., :3] * 255).astype(np.uint8) output[valid_mask] = cmap_colors[valid_mask] # Sobel 边界检测 + cv2.magnitude 替换 np.hypot depth_uint8 = (norm_depth * 255).astype(np.uint8) gx = cv2.Sobel(depth_uint8, cv2.CV_32F, 1, 0, ksize=3) gy = cv2.Sobel(depth_uint8, cv2.CV_32F, 0, 1, ksize=3) grad_mag = cv2.magnitude(gx, gy) grad_mag = grad_mag.astype(np.uint8) # 自适应局部对比度增强(向量化) edge_mask = grad_mag > 30 output[edge_mask] = np.clip(output[edge_mask].astype(np.float32) * 1.5, 0, 255).astype(np.uint8) # 高斯平滑,复用 dst 缓冲区 cv2.GaussianBlur(output, (3, 3), 0.3, dst=self._blur_buffer) # 注意:这里不进行裁剪,而是返回完整图像 # 推迟裁剪到显示阶段,与 testfemtobolt.py 保持一致 # 原代码在这里进行了裁剪: # target_width = height // 2 # if width > target_width: # left = (width - target_width) // 2 # right = left + target_width # output = output[:, left:right] return self._blur_buffer except Exception as e: self.logger.error(f"优化等高线生成失败: {e}") return None def _generate_contour_image_plt(self, depth): """使用matplotlib生成等高线图像(性能优化版本)""" try: # 设置图形背景色和边距 self.fig.patch.set_facecolor((50/255, 50/255, 50/255)) # 设置深灰色背景 rgb(50, 50, 50) self.fig.tight_layout(pad=0) # 移除所有边距 # 清除之前的绘图 self.ax.clear() self.ax.set_facecolor((50/255, 50/255, 50/255)) # 设置坐标区域背景色为黑色 # 深度数据过滤(与display_x.py完全一致) depth[depth > self.depth_range_max] = 0 depth[depth < self.depth_range_min] = 0 # 背景图(深灰色背景) # 创建RGB格式的背景图,确保颜色准确性 background_gray_value = 50 # RGB(50, 50, 50) background = np.full((*depth.shape, 3), background_gray_value, dtype=np.uint8) # 使用 np.ma.masked_equal() 来屏蔽深度图中的零值 depth = np.ma.masked_equal(depth, 0) # 绘制背景(不使用colormap,直接显示RGB图像) self.ax.imshow(background, origin='lower', alpha=1.0) # 优化:简化网格绘制,减少绘制操作,使用深灰色网格以适应黑色背景 self.ax.grid(True, which='major', axis='both', color='#333333', linestyle='-', linewidth=0.5, zorder=0) # 隐藏坐标轴 self.ax.set_xticks([]) self.ax.set_yticks([]) # 优化:减少等高线级别从100到30,大幅提升性能 start_time = time.perf_counter() self.ax.contourf(depth, levels=80, cmap=self.custom_cmap, vmin=self.depth_range_min, vmax=self.depth_range_max, origin='upper', zorder=2) # 优化:直接从canvas获取图像数据,避免PNG编码/解码 try: # 绘制到canvas self.fig.canvas.draw() # 兼容不同版本的matplotlib获取图像数据 canvas_width, canvas_height = self.fig.canvas.get_width_height() # 尝试使用新版本的方法 try: # matplotlib 3.8+ 使用 buffer_rgba() buf = np.frombuffer(self.fig.canvas.buffer_rgba(), dtype=np.uint8) buf = buf.reshape((canvas_height, canvas_width, 4)) # 转换RGBA到RGB img = cv2.cvtColor(buf, cv2.COLOR_RGBA2BGR) except AttributeError: try: # matplotlib 3.0-3.7 使用 tostring_argb() buf = np.frombuffer(self.fig.canvas.tostring_argb(), dtype=np.uint8) buf = buf.reshape((canvas_height, canvas_width, 4)) # 转换ARGB到BGR img = cv2.cvtColor(buf, cv2.COLOR_BGRA2BGR) except AttributeError: # 旧版本matplotlib使用 tostring_rgb() buf = np.frombuffer(self.fig.canvas.tostring_rgb(), dtype=np.uint8) buf = buf.reshape((canvas_height, canvas_width, 3)) # 转换RGB到BGR img = cv2.cvtColor(buf, cv2.COLOR_RGB2BGR) return img except Exception as e: self.logger.error(f"Canvas图像转换失败: {e}") # 降级到PNG方法 return None except Exception as e: self.logger.error(f"生成等高线图像失败: {e}") return None def _generate_contour_image_source(self, depth): """直接读取深度相机图片数据,不做处理返回深度相机图片(用于对比分析原始图像的延时情况)""" generation_start = time.perf_counter() try: # 将深度数据转换为可视化图像 # 深度数据通常是单通道的,需要转换为3通道BGR格式 if len(depth.shape) == 2: # 深度数据过滤(与其他方法保持一致) depth_filtered = depth.copy() depth_filtered[depth_filtered > self.depth_range_max] = 0 depth_filtered[depth_filtered < self.depth_range_min] = 0 # 将深度数据归一化到0-255范围 depth_normalized = cv2.normalize(depth_filtered, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U) # 应用彩色映射创建彩色深度图 # 使用JET色彩映射:蓝色(近)到红色(远) img_colored = cv2.applyColorMap(depth_normalized, cv2.COLORMAP_JET) # 将零值区域设为黑色(无效深度) mask = (depth_filtered == 0) img_colored[mask] = [0, 0, 0] # 黑色背景 img = img_colored else: # 如果已经是3通道,直接使用 img = depth.astype(np.uint8) return img except Exception as e: self.logger.error(f"生成原始深度图像失败: {e}") return None def initialize(self) -> bool: """ 初始化FemtoBolt设备 Returns: bool: 初始化是否成功 """ try: self.logger.info("正在初始化FemtoBolt设备...") # 使用构造函数中已加载的配置,避免并发读取配置文件 self.logger.info(f"使用已加载配置: algorithm_type={self.algorithm_type}, fps={self.fps}, depth_mode={self.depth_mode}") # 初始化SDK if not self._initialize_sdk(): raise Exception("SDK初始化失败") # 配置设备 if not self._configure_device(): raise Exception("设备配置失败") # 启动设备 if not self._start_device(): raise Exception("设备启动失败") # 使用set_connected方法启动连接监控线程 self.set_connected(True) self.device_info.update({ 'color_resolution': self.color_resolution, 'depth_mode': self.depth_mode, 'camera_fps': self.fps, 'depth_range': f"{self.depth_range_min}-{self.depth_range_max}mm" }) self.logger.info("FemtoBolt初始化成功") return True except Exception as e: self.logger.error(f"FemtoBolt初始化失败: {e}") # 使用set_connected方法停止连接监控线程 self.set_connected(False) self._cleanup_device() return False def _initialize_sdk(self) -> bool: """ 初始化FemtoBolt SDK (使用pykinect_azure) Returns: bool: SDK初始化是否成功 """ try: # 尝试导入pykinect_azure real_pykinect = None try: import pykinect_azure as pykinect real_pykinect = pykinect self.logger.info("成功导入pykinect_azure库") except ImportError as e: self.logger.error(f"无法导入pykinect_azure库: {e}") self.sdk_initialized = False return False # 查找并初始化SDK路径 sdk_initialized = False if real_pykinect and hasattr(real_pykinect, 'initialize_libraries'): sdk_paths = self._get_femtobolt_sdk_paths() for sdk_path in sdk_paths: if os.path.exists(sdk_path): try: real_pykinect.initialize_libraries(track_body=False, module_k4a_path=sdk_path) self.logger.info(f'✓ 成功使用FemtoBolt SDK: {sdk_path}') self.pykinect = real_pykinect sdk_initialized = True break except Exception as e: self.logger.warning(f'✗ FemtoBolt SDK路径失败: {sdk_path} - {e}') continue if not sdk_initialized: self.logger.error('未找到真实SDK,初始化失败') self.sdk_initialized = False return False self.sdk_initialized = True return True except Exception as e: self.logger.error(f"SDK初始化失败: {e}") return False def _get_femtobolt_sdk_paths(self) -> list: import platform sdk_paths = [] if platform.system() == "Windows": # 优先使用Orbbec SDK K4A Wrapper(与azure_kinect_image_example.py一致) base_dir = os.path.dirname(os.path.abspath(__file__)) dll_path = os.path.join(base_dir,"..", "dll","femtobolt", "k4a.dll") self.logger.info(f"FemtoBolt SDK路径: {dll_path}") sdk_paths.append(dll_path) return sdk_paths def _configure_device(self) -> bool: """ 配置FemtoBolt设备 Returns: bool: 配置是否成功 """ try: if not self.pykinect: return False # 配置FemtoBolt设备参数 self.femtobolt_config = self.pykinect.default_configuration self.femtobolt_config.depth_mode = self.pykinect.K4A_DEPTH_MODE_NFOV_2X2BINNED self.femtobolt_config.color_format = self.pykinect.K4A_IMAGE_FORMAT_COLOR_BGRA32 self.femtobolt_config.color_resolution = self.pykinect.K4A_COLOR_RESOLUTION_720P self.femtobolt_config.camera_fps = self.pykinect.K4A_FRAMES_PER_SECOND_30 # 30FPS self.femtobolt_config.synchronized_images_only = False return True except Exception as e: self.logger.error(f"FemtoBolt设备配置失败: {e}") return False def _start_device(self) -> bool: """ 启动FemtoBolt设备 Returns: bool: 启动是否成功 """ try: if not self.pykinect: return False # 通过探测后再真正启动 # 在真正调用 start_device 之前,先通过 k4a.dll 查询已安装设备数量,0 则跳过 try: import os, ctypes k4a_path = getattr(self, "k4a_dll_path", None) if not k4a_path: base_dir = os.path.dirname(os.path.abspath(__file__)) k4a_path = os.path.normpath(os.path.join(base_dir, "..", "dll", "femtobolt", "k4a.dll")) k4a = ctypes.CDLL(k4a_path) try: # 有些环境需要显式声明返回类型 k4a.k4a_device_get_installed_count.restype = ctypes.c_uint32 except Exception: pass device_count = int(k4a.k4a_device_get_installed_count()) except Exception as e: self.logger.warning(f"获取FemtoBolt设备数量失败,跳过启动: {e}") return False if device_count <= 0: self.logger.warning("未检测到FemtoBolt深度相机,跳过启动") return False else: self.logger.info(f"检测到 FemtoBolt 设备数量: {device_count}") self.device_handle = self.pykinect.start_device(config=self.femtobolt_config) if self.device_handle: self.logger.info('✓ FemtoBolt深度相机初始化成功!') else: self.logger.warning('FemtoBolt设备启动返回None,设备可能未连接') return False self.logger.info('FemtoBolt设备启动成功') return True except Exception as e: self.logger.error(f"FemtoBolt设备启动失败: {e}") return False def _test_capture(self) -> bool: """ 测试设备捕获 Returns: bool: 测试是否成功 """ try: for i in range(3): capture = self.device_handle.update() if capture: ret, depth_image = capture.get_depth_image() if ret and depth_image is not None: self.logger.info(f"FemtoBolt捕获测试成功 - 深度图像大小: {depth_image.shape}") return True time.sleep(0.1) self.logger.error("FemtoBolt捕获测试失败") return False except Exception as e: self.logger.error(f"FemtoBolt捕获测试异常: {e}") return False def calibrate(self) -> bool: """ 校准FemtoBolt设备 Returns: bool: 校准是否成功 """ try: self.logger.info("开始FemtoBolt校准...") if not self.is_connected: if not self.initialize(): return False # 对于FemtoBolt,校准主要是验证设备工作状态 # 捕获几帧来确保设备稳定 for i in range(10): capture = self.device_handle.get_capture() if capture: depth_image = capture.get_depth_image() if depth_image is not None: # 检查深度图像质量 valid_pixels = np.sum((depth_image >= self.depth_range_min) & (depth_image <= self.depth_range_max)) total_pixels = depth_image.size valid_ratio = valid_pixels / total_pixels if valid_ratio > 0.1: # 至少10%的像素有效 self.logger.info(f"校准帧 {i+1}: 有效像素比例 {valid_ratio:.2%}") else: self.logger.warning(f"校准帧 {i+1}: 有效像素比例过低 {valid_ratio:.2%}") capture.release() else: self.logger.warning(f"校时时无法获取第{i+1}帧") time.sleep(0.1) self.logger.info("FemtoBolt校准完成") return True except Exception as e: self.logger.error(f"FemtoBolt校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始数据流推送 Returns: bool: 启动是否成功 """ if self.is_streaming: self.logger.warning("FemtoBolt流已在运行") return True try: self.is_streaming = True self.streaming_thread = threading.Thread( target=self._streaming_worker, name="FemtoBolt-Stream", daemon=True ) self.streaming_thread.start() self.logger.info("FemtoBolt流启动成功") return True except Exception as e: self.logger.error(f"启动FemtoBolt流失败: {e}") self.is_streaming = False return False def stop_streaming(self) -> bool: """ 停止数据流推送 Returns: bool: 停止是否成功 """ try: self.is_streaming = False # 等待流线程自然结束 if self.streaming_thread and self.streaming_thread.is_alive(): self.logger.info("等待FemtoBolt流线程结束...") self.streaming_thread.join(timeout=3.0) if self.streaming_thread.is_alive(): self.logger.warning("FemtoBolt流线程未能在超时时间内结束") else: self.logger.info("FemtoBolt流工作线程结束") self.logger.info("FemtoBolt流已停止") return True except Exception as e: self.logger.error(f"停止FemtoBolt流失败: {e}") return False def _streaming_worker(self): """ 流处理工作线程 """ self.logger.info("FemtoBolt流工作线程启动") frame_count = 0 last_generation_time = 0.0 skip_frame_count = 0 # 根据算法类型设置自适应采样推流频率 if self.algorithm_type == 'source': # source算法:图像生成时间很短,提高推流频率以减少延时 adaptive_fps = 25 # 提高到25fps,接近Camera的30fps self.logger.info(f"FemtoBolt使用source算法,设置推流频率为{adaptive_fps}fps(优化延时)") elif self.algorithm_type == 'plt': # plt算法:图像生成时间约0.3-0.4秒,设置较低fps推流以减少延时 adaptive_fps = 5 self.logger.info(f"FemtoBolt使用plt算法,设置推流频率为{adaptive_fps}fps(优化延时)") elif self.algorithm_type == 'opencv': # opencv算法:图像生成时间约0.006秒,设置高fps推流 adaptive_fps = 20 self.logger.info(f"FemtoBolt使用opencv算法,设置推流频率为{adaptive_fps}fps") else: # 默认使用配置文件中的设置 adaptive_fps = self.send_fps self.logger.info(f"FemtoBolt使用默认推流频率{adaptive_fps}fps") # 计算自适应最小发送间隔 adaptive_min_interval = 1.0 / adaptive_fps if adaptive_fps > 0 else 0.05 try: while self.is_streaming: # 动态调整发送间隔,避免延时累积 now = time.time() # 对于source算法,简化间隔计算,减少延时 if self.algorithm_type == 'source': # source算法处理时间很短,使用固定间隔 dynamic_interval = adaptive_min_interval elif self.algorithm_type == 'plt' and last_generation_time > 0: # 根据实际生成时间动态调整间隔,避免延时累积 dynamic_interval = max(adaptive_min_interval, last_generation_time * 0.8) else: dynamic_interval = adaptive_min_interval if now - self._last_send_time < dynamic_interval: time.sleep(0.001) continue if self.device_handle and self._socketio: try: capture = self.device_handle.update() if capture is not None: try: ret, depth_image = capture.get_depth_image() if ret and depth_image is not None: # 对于source算法,跳过复杂的跳帧逻辑,直接处理 if self.algorithm_type == 'source': # source算法处理时间很短,不需要跳帧 pass elif self.algorithm_type == 'plt': # 对于plt算法,如果延时累积过多,跳过部分帧 time_since_last = now - self._last_send_time if time_since_last > adaptive_min_interval * 2: skip_frame_count += 1 if skip_frame_count % 2 == 0: # 每2帧跳过1帧 # 如果启用帧预测,发送预测帧而不是跳过 if self._prediction_enabled and self._last_frame is not None: self._send_predicted_frame(frame_count, now) frame_count += 1 continue # 测试:记录图像生成开始时间 generation_start_time = time.time() # 根据配置选择不同的等高线生成方法 if self.algorithm_type == 'source': depth_colored_final = self._generate_contour_image_source(depth_image) elif self.algorithm_type == 'plt': depth_colored_final = self._generate_contour_image_plt(depth_image) elif self.algorithm_type == 'opencv': depth_colored_final = self._generate_contour_image_opencv(depth_image) # 测试:计算并打印图像生成时间 generation_time = time.time() - generation_start_time last_generation_time = generation_time # print(f"[FemtoBolt] 帧图像生成时间: {generation_time:.4f}秒 ({self.algorithm_type}算法) - 跳帧数: {skip_frame_count}") if depth_colored_final is None: # 如果等高线生成失败,跳过这一帧 continue # 优化裁剪处理(缓存裁剪参数) h, w = depth_colored_final.shape[:2] # self.logger.info(f"深度图像尺寸: 宽={w}, 高={h}") target_width = h // 2 # 缓存裁剪参数,避免重复计算 if self._crop_params is None or self._crop_params[0] != (h, w): if w > target_width: left = (w - target_width) // 2 right = left + target_width self._crop_params = ((h, w), left, right, target_width) else: self._crop_params = ((h, w), None, None, w) # 应用裁剪 if self._crop_params[1] is not None: display_image = depth_colored_final[:, self._crop_params[1]:self._crop_params[2]] else: display_image = depth_colored_final # 优化编码:使用更低的质量和更快的编码 encode_start = time.time() success, buffer = cv2.imencode('.jpg', display_image, self._encode_param) encode_time = time.time() - encode_start if success and self._socketio: # 优化base64编码:直接使用memoryview避免额外拷贝 base64_start = time.time() jpg_as_text = base64.b64encode(memoryview(buffer)).decode('utf-8') base64_time = time.time() - base64_start # 添加编码时间监控 if encode_time > 0.01 or base64_time > 0.01: # 超过10ms记录 print(f"[FemtoBolt] 编码时间: JPEG={encode_time:.4f}s, Base64={base64_time:.4f}s") self._socketio.emit('femtobolt_frame', { 'depth_image': jpg_as_text, 'frame_count': frame_count, 'timestamp': now, 'fps': self.actual_fps, 'device_id': self.device_id, 'depth_range': { 'min': self.depth_range_min, 'max': self.depth_range_max } }, namespace='/devices') frame_count += 1 self._last_send_time = now # 更新帧历史用于预测(仅对需要预测的算法) if self._prediction_enabled and self.algorithm_type in ['plt', 'opencv']: # 优化内存管理:重用缓冲区 if self._reusable_buffer is None or self._reusable_buffer.shape != display_image.shape: self._reusable_buffer = np.empty_like(display_image) # 使用预分配的缓冲区复制图像 np.copyto(self._reusable_buffer, display_image) self._last_frame = self._reusable_buffer # 限制历史帧数量,减少内存占用 frame_data = { 'image': display_image.copy(), # 这里仍需要复制,因为display_image可能被修改 'timestamp': now, 'depth_range': { 'min': self.depth_range_min, 'max': self.depth_range_max } } self._frame_history.append(frame_data) # 主动触发垃圾回收(仅在必要时) if frame_count % 100 == 0: # 每100帧触发一次 gc.collect() else: time.sleep(0.005) except Exception as e: # 捕获处理过程中出现异常,记录并继续 self.logger.error(f"FemtoBolt捕获处理错误: {e}") finally: # 无论处理成功与否,都应释放capture以回收内存:contentReference[oaicite:3]{index=3} try: if hasattr(capture, 'release'): capture.release() except Exception: pass else: time.sleep(0.001) except Exception as e: self.logger.error(f'FemtoBolt帧推送失败: {e}') time.sleep(0.05) # 降低空转CPU time.sleep(0.001) except Exception as e: self.logger.error(f"FemtoBolt流处理异常: {e}") finally: self.is_streaming = False self.logger.info("FemtoBolt流工作线程结束") def _send_predicted_frame(self, frame_count: int, timestamp: float): """ 发送预测帧以减少延时感知 Args: frame_count: 帧计数 timestamp: 时间戳 """ try: if self._last_frame is None: return # 简单的帧预测:使用最后一帧 predicted_frame = self._last_frame # 如果有足够的历史帧,可以进行简单的运动预测 if len(self._frame_history) >= 2: # 这里可以实现更复杂的预测算法 # 目前使用最新帧作为预测帧 predicted_frame = self._frame_history[-1]['image'] # 编码并发送预测帧 success, buffer = cv2.imencode('.jpg', predicted_frame, self._encode_param) if success and self._socketio: jpg_as_text = base64.b64encode(memoryview(buffer)).decode('utf-8') self._socketio.emit('femtobolt_frame', { 'depth_image': jpg_as_text, 'frame_count': frame_count, 'timestamp': timestamp, 'fps': self.actual_fps, 'device_id': self.device_id, 'predicted': True, # 标记为预测帧 'depth_range': { 'min': self.depth_range_min, 'max': self.depth_range_max } }, namespace='/devices') self._last_send_time = timestamp except Exception as e: self.logger.error(f"发送预测帧失败: {e}") def _update_statistics(self): """ 更新性能统计 """ self.frame_count += 1 self.fps_counter += 1 # 每秒计算一次实际FPS current_time = time.time() if current_time - self.fps_start_time >= 1.0: self.actual_fps = self.fps_counter / (current_time - self.fps_start_time) self.fps_counter = 0 self.fps_start_time = current_time # 更新性能统计 self.performance_stats.update({ 'frames_processed': self.frame_count, 'actual_fps': round(self.actual_fps, 2), 'dropped_frames': self.dropped_frames }) def _reconnect(self) -> bool: """ 重新连接FemtoBolt设备 Returns: bool: 重连是否成功 """ try: self._cleanup_device() time.sleep(2.0) # 等待设备释放 return self.initialize() except Exception as e: self.logger.error(f"FemtoBolt重连失败: {e}") return False def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'color_resolution': self.color_resolution, 'depth_mode': self.depth_mode, 'target_fps': self.fps, 'actual_fps': self.actual_fps, 'frame_count': self.frame_count, 'dropped_frames': self.dropped_frames, 'depth_range': f"{self.depth_range_min}-{self.depth_range_max}mm", 'has_depth_frame': self.last_depth_frame is not None, 'has_color_frame': self.last_color_frame is not None }) return status def _cleanup_device(self): """ 清理设备资源 """ try: if self.device_handle: # 先停止Pipeline以释放设备资源 if hasattr(self, 'pipeline') and self.pipeline: try: self.logger.info("正在停止FemtoBolt Pipeline...") self.pipeline.stop() self.logger.info("FemtoBolt Pipeline已停止") # 等待Pipeline完全释放资源 time.sleep(0.5) except Exception as e: self.logger.warning(f"停止Pipeline时出现警告: {e}") finally: self.pipeline = None # 尝试停止设备(如果有stop方法) if hasattr(self.device_handle, 'stop'): try: self.device_handle.stop() self.logger.info("FemtoBolt设备已停止") # 等待设备完全停止 time.sleep(0.3) except Exception as e: self.logger.warning(f"停止FemtoBolt设备时出现警告: {e}") # 尝试关闭设备(如果有close方法) if hasattr(self.device_handle, 'close'): try: self.device_handle.close() self.logger.info("FemtoBolt设备连接已关闭") # 等待设备连接完全关闭 time.sleep(0.2) except Exception as e: self.logger.warning(f"关闭FemtoBolt设备时出现警告: {e}") self.device_handle = None except Exception as e: self.logger.error(f"清理FemtoBolt设备失败: {e}") finally: # 确保所有相关属性都被重置 self.pipeline = None self.device_handle = None def disconnect(self): """ 断开FemtoBolt设备连接 """ try: self.stop_streaming() self._cleanup_device() self.is_connected = False self.logger.info("FemtoBolt设备已断开连接") except Exception as e: self.logger.error(f"断开FemtoBolt设备连接失败: {e}") def reload_config(self) -> bool: """ 重新加载设备配置 Returns: bool: 重新加载是否成功 """ try: self.logger.info("正在重新加载FemtoBolt配置...") # 获取最新配置 self.config = self.config_manager.get_device_config('femtobolt') # 更新配置属性 self.algorithm_type = self.config.get('algorithm_type', 'opencv') self.color_resolution = self.config.get('color_resolution', '1080P') self.depth_mode = self.config.get('depth_mode', 'NFOV_2X2BINNED') self.color_format = self.config.get('color_format', 'COLOR_BGRA32') self.fps = self.config.get('camera_fps', 20) self.depth_range_min = self.config.get('depth_range_min', 500) self.depth_range_max = self.config.get('depth_range_max', 4500) self.synchronized_images_only = self.config.get('synchronized_images_only', False) # 更新图像处理参数 self.contrast_factor = self.config.get('contrast_factor', 1.2) self.gamma_value = self.config.get('gamma_value', 0.8) self.use_pseudo_color = self.config.get('use_pseudo_color', True) # 更新缓存队列大小 cache_size = self.config.get('frame_cache_size', 10) if cache_size != self.depth_frame_cache.maxlen: self.depth_frame_cache = deque(maxlen=cache_size) self.color_frame_cache = deque(maxlen=cache_size) # 更新gamma查找表 self._update_gamma_lut() self.logger.info(f"FemtoBolt配置重新加载成功 - 算法: {self.algorithm_type}, 分辨率: {self.color_resolution}, FPS: {self.fps}") return True except Exception as e: self.logger.error(f"重新加载FemtoBolt配置失败: {e}") return False def check_hardware_connection(self) -> bool: """ 相机连接检测太复杂,忽略连接检测 Returns: bool: 相机是否连接 """ return self.is_connected def cleanup(self): """ 清理资源 """ try: # 清理监控线程 # self._cleanup_monitoring() self.stop_streaming() self._cleanup_device() # 清理matplotlib图形对象 if hasattr(self, 'fig') and self.fig is not None: plt.close(self.fig) self.fig = None self.ax = None self.depth_frame_cache.clear() self.color_frame_cache.clear() self.last_depth_frame = None self.last_color_frame = None super().cleanup() self.logger.info("FemtoBolt资源清理完成") except Exception as e: self.logger.error(f"清理FemtoBolt资源失败: {e}")