#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 普通相机管理器 负责普通USB摄像头的连接、配置和数据采集 """ import cv2 import threading import time import base64 import numpy as np from typing import Optional, Dict, Any, Tuple import logging import queue import gc try: from .base_device import BaseDevice from .utils.socket_manager import SocketManager from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.socket_manager import SocketManager from utils.config_manager import ConfigManager class CameraManager(BaseDevice): """普通相机管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化相机管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() config = self.config_manager.get_device_config('camera') super().__init__("camera", config) # 保存socketio实例 self._socketio = socketio # 相机相关属性 self.cap = None self.device_index = config.get('device_index', 0) self.width = config.get('width', 1280) self.height = config.get('height', 720) self.fps = config.get('fps', 30) self.buffer_size = config.get('buffer_size', 1) self.fourcc = config.get('fourcc', 'MJPG') # 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输) self._tx_max_width = int(config.get('tx_max_width', 640)) # 流控制 self.streaming_thread = None # 减小缓存长度,保留最近2帧即可,避免累计占用 self.frame_cache = queue.Queue(maxsize=int(config.get('frame_cache_len', 2))) self.last_frame = None self.frame_count = 0 self.dropped_frames = 0 # 性能监控 self.fps_counter = 0 self.fps_start_time = time.time() self.actual_fps = 0 # 重连机制 self.max_reconnect_attempts = 3 self.reconnect_delay = 2.0 # 设备标识和性能统计 self.device_id = f"camera_{self.device_index}" self.performance_stats = { 'frames_processed': 0, 'actual_fps': 0, 'dropped_frames': 0 } # 全局帧队列(用于录制) self.frame_queue = queue.Queue(maxsize=10) # 最大长度10,自动丢弃旧帧 # OpenCV优化开关 try: cv2.setUseOptimized(True) except Exception: pass self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}") def initialize(self) -> bool: """ 初始化相机设备 Returns: bool: 初始化是否成功 """ try: self.logger.info(f"正在初始化相机设备 {self.device_index}...") # 尝试多个后端(Windows下优先MSMF/DShow) backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY] for backend in backends: try: self.cap = cv2.VideoCapture(self.device_index, backend) if self.cap.isOpened(): self.logger.info(f"使用后端 {backend} 成功打开相机") break except Exception as e: self.logger.warning(f"后端 {backend} 打开相机失败: {e}") continue else: raise Exception("所有后端都无法打开相机") # 设置相机属性 self._configure_camera() # 验证相机是否正常工作 if not self._test_camera(): raise Exception("相机测试失败") self.is_connected = True self._device_info.update({ 'device_index': self.device_index, 'resolution': f"{self.width}x{self.height}", 'fps': self.fps, 'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown' }) self.logger.info("相机初始化成功") return True except Exception as e: self.logger.error(f"相机初始化失败: {e}") self.is_connected = False if self.cap: try: self.cap.release() except Exception: pass self.cap = None return False def _configure_camera(self): """ 配置相机参数 """ if not self.cap: return try: # 设置FOURCC编码 if self.fourcc: fourcc_code = cv2.VideoWriter_fourcc(*self.fourcc) self.cap.set(cv2.CAP_PROP_FOURCC, fourcc_code) # 设置分辨率 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) # 设置帧率 self.cap.set(cv2.CAP_PROP_FPS, self.fps) # 设置缓冲区大小(部分后端不生效) try: self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size) except Exception: pass # 获取实际设置的值 actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) self.logger.info(f"相机配置 - 分辨率: {actual_width}x{actual_height}, FPS: {actual_fps}") except Exception as e: self.logger.warning(f"配置相机参数失败: {e}") def _test_camera(self) -> bool: """ 测试相机是否正常工作 Returns: bool: 测试是否成功 """ try: ret, frame = self.cap.read() if ret and frame is not None: self.logger.info(f"相机测试成功 - 帧大小: {frame.shape}") return True else: self.logger.error("相机测试失败 - 无法读取帧") return False except Exception as e: self.logger.error(f"相机测试异常: {e}") return False def calibrate(self) -> bool: """ 校准相机(对于普通相机,主要是验证连接和设置) Returns: bool: 校准是否成功 """ try: self.logger.info("开始相机校准...") if not self.is_connected: if not self.initialize(): return False # 读取几帧来稳定相机 for i in range(5): ret, _ = self.cap.read() if not ret: self.logger.warning(f"校准时读取第{i+1}帧失败") self.logger.info("相机校准完成") return True except Exception as e: self.logger.error(f"相机校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始数据流推送 Returns: bool: 启动是否成功 """ if self.is_streaming: self.logger.warning("相机流已在运行") return True if not self.is_connected: if not self.initialize(): return False try: self.is_streaming = True self.streaming_thread = threading.Thread( target=self._streaming_worker, name=f"Camera-{self.device_index}-Stream", daemon=True ) self.streaming_thread.start() self.logger.info("相机流启动成功") return True except Exception as e: self.logger.error(f"启动相机流失败: {e}") self.is_streaming = False return False def stop_streaming(self) -> bool: """ 停止数据流推送 Returns: bool: 停止是否成功 """ try: self.is_streaming = False if self.streaming_thread and self.streaming_thread.is_alive(): # 等待线程退出 self.streaming_thread.join(timeout=3.0) self.streaming_thread = None self.logger.info("相机流已停止") return True except Exception as e: self.logger.error(f"停止相机流失败: {e}") return False def _streaming_worker(self): """ 流处理工作线程 """ self.logger.info("相机流工作线程启动") reconnect_attempts = 0 # 基于目标FPS的简单节拍器,防止无上限地读取/编码/发送导致对象堆积 frame_interval = 1.0 / max(self.fps, 1) next_tick = time.time() while self.is_streaming: loop_start = time.time() try: if not self.cap or not self.cap.isOpened(): if reconnect_attempts < self.max_reconnect_attempts: self.logger.warning(f"相机连接丢失,尝试重连 ({reconnect_attempts + 1}/{self.max_reconnect_attempts})") if self._reconnect(): reconnect_attempts = 0 continue else: reconnect_attempts += 1 time.sleep(self.reconnect_delay) continue else: self.logger.error("相机重连失败次数过多,停止流") break ret, frame = self.cap.read() if not ret or frame is None: self.dropped_frames += 1 if self.dropped_frames > 10: self.logger.warning(f"连续丢帧过多: {self.dropped_frames}") # 仅在异常情况下触发一次GC,避免高频强制GC try: gc.collect() except Exception: pass self.dropped_frames = 0 # 防止空转占满CPU time.sleep(0.005) continue # 重置丢帧计数 self.dropped_frames = 0 # 保存原始帧到队列(用于录制) try: self.frame_queue.put_nowait({ 'frame': frame.copy(), 'timestamp': time.time() }) except queue.Full: # 队列满时丢弃最旧的帧,添加新帧 try: self.frame_queue.get_nowait() # 移除最旧的帧 self.frame_queue.put_nowait({ 'frame': frame.copy(), 'timestamp': time.time() }) except queue.Empty: pass # 队列为空,忽略 # 处理帧(降采样以优化传输负载) processed_frame = self._process_frame(frame) # # 缓存帧(不复制,减少内存占用) # self.last_frame = processed_frame # self.frame_cache.append(processed_frame) # 发送帧数据 self._send_frame_data(processed_frame) # 更新统计 self._update_statistics() # 主动释放局部引用,帮助GC更快识别可回收对象 del frame # 注意:processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身 # 限速:保证不超过目标FPS,减小发送端积压 now = time.time() # 下一个tick基于固定间隔前移,避免误差累积 next_tick += frame_interval sleep_time = next_tick - now if sleep_time > 0: time.sleep(sleep_time) else: # 如果处理耗时超过间隔,纠正节拍器,避免持续为负 next_tick = now except Exception as e: self.logger.error(f"相机流处理异常: {e}") # 小退避,避免异常情况下空转 time.sleep(0.02) self.logger.info("相机流工作线程结束") def _process_frame(self, frame: np.ndarray) -> np.ndarray: """ 处理视频帧 Args: frame: 原始帧 Returns: np.ndarray: 处理后的帧 """ try: # 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU) h, w = frame.shape[:2] if w > self._tx_max_width: scale = self._tx_max_width / float(w) new_w = self._tx_max_width new_h = int(h * scale) frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) return frame except Exception as e: self.logger.error(f"处理帧失败: {e}") return frame def _send_frame_data(self, frame: np.ndarray): """ 发送帧数据 Args: frame: 视频帧 """ # 将临时对象局部化,并在 finally 中删除引用,加速回收 buffer = None frame_bytes = None frame_data = None try: # 编码为JPEG encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] ok, buffer = cv2.imencode('.jpg', frame, encode_param) if not ok or buffer is None: self.logger.warning("帧JPEG编码失败") return # 转换为bytes再做base64,减少中间numpy对象的长生命周期 frame_bytes = buffer.tobytes() frame_data = base64.b64encode(frame_bytes).decode('utf-8') # 发送数据 data = { 'timestamp': time.time(), 'frame_count': self.frame_count, 'image': frame_data, 'fps': self.actual_fps, 'device_id': self.device_id } self._socketio.emit('camera_frame', data, namespace='/devices') except Exception as e: self.logger.error(f"发送帧数据失败: {e}") finally: # 显式删除临时大对象的引用,避免在高吞吐下堆积 del buffer del frame_bytes del frame_data def _update_statistics(self): """ 更新性能统计 """ self.frame_count += 1 self.fps_counter += 1 # 每秒计算一次实际FPS current_time = time.time() if current_time - self.fps_start_time >= 1.0: self.actual_fps = self.fps_counter / (current_time - self.fps_start_time) self.fps_counter = 0 self.fps_start_time = current_time # 更新性能统计 self.performance_stats.update({ 'frames_processed': self.frame_count, 'actual_fps': round(self.actual_fps, 2), 'dropped_frames': self.dropped_frames }) def _reconnect(self) -> bool: """ 重新连接相机 Returns: bool: 重连是否成功 """ try: if self.cap: try: self.cap.release() except Exception: pass self.cap = None time.sleep(1.0) # 等待设备释放 return self.initialize() except Exception as e: self.logger.error(f"相机重连失败: {e}") return False def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'device_index': self.device_index, 'resolution': f"{self.width}x{self.height}", 'target_fps': self.fps, 'actual_fps': self.actual_fps, 'frame_count': self.frame_count, 'dropped_frames': self.dropped_frames, 'has_frame': self.last_frame is not None }) return status def capture_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]: """ 捕获单张图像 Args: save_path: 保存路径(可选) Returns: Optional[np.ndarray]: 捕获的图像,失败返回None """ try: if not self.is_connected or not self.cap: self.logger.error("相机未连接") return None ret, frame = self.cap.read() if not ret or frame is None: self.logger.error("捕获图像失败") return None if save_path: cv2.imwrite(save_path, frame) self.logger.info(f"图像已保存到: {save_path}") return frame except Exception as e: self.logger.error(f"捕获图像异常: {e}") return None def disconnect(self): """ 断开相机连接 """ try: self.stop_streaming() if self.cap: try: self.cap.release() except Exception: pass self.cap = None self.is_connected = False self.logger.info("相机已断开连接") except Exception as e: self.logger.error(f"断开相机连接失败: {e}") def cleanup(self): """ 清理资源 """ try: self.stop_streaming() if self.cap: try: self.cap.release() except Exception: pass self.cap = None # 清理帧缓存 while not self.frame_cache.empty(): try: self.frame_cache.get_nowait() except queue.Empty: break self.last_frame = None # 清理帧队列 while not self.frame_queue.empty(): try: self.frame_queue.get_nowait() except queue.Empty: break super().cleanup() self.logger.info("相机资源清理完成") except Exception as e: self.logger.error(f"清理相机资源失败: {e}") def _save_frame_to_cache(self, frame, frame_type='camera'): """保存帧到全局缓存""" try: with self.frame_cache_lock: current_time = time.time() # 清理过期帧 self._cleanup_expired_frames() # 如果缓存已满,移除最旧的帧 if frame_type in self.global_frame_cache and len(self.global_frame_cache[frame_type]) >= self.max_cache_size: oldest_key = min(self.global_frame_cache[frame_type].keys()) del self.global_frame_cache[frame_type][oldest_key] # 初始化帧类型缓存 if frame_type not in self.global_frame_cache: self.global_frame_cache[frame_type] = {} # 保存帧(深拷贝避免引用问题) frame_data = { 'frame': frame.copy(), 'timestamp': current_time, 'frame_id': len(self.global_frame_cache[frame_type]) } self.global_frame_cache[frame_type][current_time] = frame_data except Exception as e: self.logger.error(f'保存帧到缓存失败: {e}') def _get_latest_frame_from_cache(self, frame_type='camera'): """从队列获取最新帧""" try: if self.frame_queue.empty(): self.logger.debug('帧队列为空') return None, None # 获取队列中的所有帧,保留最新的一个 frames = [] while not self.frame_queue.empty(): try: frames.append(self.frame_queue.get_nowait()) except queue.Empty: break if not frames: return None, None # 获取最新帧(最后一个) latest_frame = frames[-1] # 将最新帧重新放回队列 try: self.frame_queue.put_nowait(latest_frame) except queue.Full: pass # 队列满时忽略 return latest_frame['frame'].copy(), latest_frame['timestamp'] except Exception as e: self.logger.error(f'从队列获取帧失败: {e}') return None, None