#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 普通相机管理器 负责普通USB摄像头的连接、配置和数据采集 """ import cv2 import threading import time import base64 import numpy as np from typing import Optional, Dict, Any, Tuple import logging from collections import deque import gc try: from .base_device import BaseDevice from .utils.socket_manager import SocketManager from .utils.config_manager import ConfigManager except ImportError: from base_device import BaseDevice from utils.socket_manager import SocketManager from utils.config_manager import ConfigManager class CameraManager(BaseDevice): """普通相机管理器""" def __init__(self, socketio, config_manager: Optional[ConfigManager] = None): """ 初始化相机管理器 Args: socketio: SocketIO实例 config_manager: 配置管理器实例 """ # 配置管理 self.config_manager = config_manager or ConfigManager() config = self.config_manager.get_device_config('camera') super().__init__("camera", config) # 保存socketio实例 self._socketio = socketio # 相机相关属性 self.cap = None self.device_index = config.get('device_index', 0) self.width = config.get('width', 1280) self.height = config.get('height', 720) self.fps = config.get('fps', 30) self.buffer_size = config.get('buffer_size', 1) self.fourcc = config.get('fourcc', 'MJPG') # 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输) self._tx_max_width = int(config.get('tx_max_width', 640)) # 流控制 self.streaming_thread = None # 减小缓存长度,保留最近2帧即可,避免累计占用 self.frame_cache = deque(maxlen=int(config.get('frame_cache_len', 2))) self.last_frame = None self.frame_count = 0 self.dropped_frames = 0 # 性能监控 self.fps_counter = 0 self.fps_start_time = time.time() self.actual_fps = 0 # 重连机制 self.max_reconnect_attempts = 3 self.reconnect_delay = 2.0 # 设备标识和性能统计 self.device_id = f"camera_{self.device_index}" self.performance_stats = { 'frames_processed': 0, 'actual_fps': 0, 'dropped_frames': 0 } # OpenCV优化开关 try: cv2.setUseOptimized(True) except Exception: pass self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}") def initialize(self) -> bool: """ 初始化相机设备 Returns: bool: 初始化是否成功 """ try: self.logger.info(f"正在初始化相机设备 {self.device_index}...") # 尝试多个后端(Windows下优先MSMF/DShow) backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY] for backend in backends: try: self.cap = cv2.VideoCapture(self.device_index, backend) if self.cap.isOpened(): self.logger.info(f"使用后端 {backend} 成功打开相机") break except Exception as e: self.logger.warning(f"后端 {backend} 打开相机失败: {e}") continue else: raise Exception("所有后端都无法打开相机") # 设置相机属性 self._configure_camera() # 验证相机是否正常工作 if not self._test_camera(): raise Exception("相机测试失败") self.is_connected = True self._device_info.update({ 'device_index': self.device_index, 'resolution': f"{self.width}x{self.height}", 'fps': self.fps, 'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown' }) self.logger.info("相机初始化成功") return True except Exception as e: self.logger.error(f"相机初始化失败: {e}") self.is_connected = False if self.cap: try: self.cap.release() except Exception: pass self.cap = None return False def _configure_camera(self): """ 配置相机参数 """ if not self.cap: return try: # 设置FOURCC编码 if self.fourcc: fourcc_code = cv2.VideoWriter_fourcc(*self.fourcc) self.cap.set(cv2.CAP_PROP_FOURCC, fourcc_code) # 设置分辨率 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height) # 设置帧率 self.cap.set(cv2.CAP_PROP_FPS, self.fps) # 设置缓冲区大小(部分后端不生效) try: self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size) except Exception: pass # 获取实际设置的值 actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) actual_fps = self.cap.get(cv2.CAP_PROP_FPS) self.logger.info(f"相机配置 - 分辨率: {actual_width}x{actual_height}, FPS: {actual_fps}") except Exception as e: self.logger.warning(f"配置相机参数失败: {e}") def _test_camera(self) -> bool: """ 测试相机是否正常工作 Returns: bool: 测试是否成功 """ try: ret, frame = self.cap.read() if ret and frame is not None: self.logger.info(f"相机测试成功 - 帧大小: {frame.shape}") return True else: self.logger.error("相机测试失败 - 无法读取帧") return False except Exception as e: self.logger.error(f"相机测试异常: {e}") return False def calibrate(self) -> bool: """ 校准相机(对于普通相机,主要是验证连接和设置) Returns: bool: 校准是否成功 """ try: self.logger.info("开始相机校准...") if not self.is_connected: if not self.initialize(): return False # 读取几帧来稳定相机 for i in range(5): ret, _ = self.cap.read() if not ret: self.logger.warning(f"校准时读取第{i+1}帧失败") self.logger.info("相机校准完成") return True except Exception as e: self.logger.error(f"相机校准失败: {e}") return False def start_streaming(self) -> bool: """ 开始数据流推送 Returns: bool: 启动是否成功 """ if self.is_streaming: self.logger.warning("相机流已在运行") return True if not self.is_connected: if not self.initialize(): return False try: self.is_streaming = True self.streaming_thread = threading.Thread( target=self._streaming_worker, name=f"Camera-{self.device_index}-Stream", daemon=True ) self.streaming_thread.start() self.logger.info("相机流启动成功") return True except Exception as e: self.logger.error(f"启动相机流失败: {e}") self.is_streaming = False return False def stop_streaming(self) -> bool: """ 停止数据流推送 Returns: bool: 停止是否成功 """ try: self.is_streaming = False if self.streaming_thread and self.streaming_thread.is_alive(): # 等待线程退出 self.streaming_thread.join(timeout=3.0) self.streaming_thread = None self.logger.info("相机流已停止") return True except Exception as e: self.logger.error(f"停止相机流失败: {e}") return False def _streaming_worker(self): """ 流处理工作线程 """ self.logger.info("相机流工作线程启动") reconnect_attempts = 0 # 基于目标FPS的简单节拍器,防止无上限地读取/编码/发送导致对象堆积 frame_interval = 1.0 / max(self.fps, 1) next_tick = time.time() while self.is_streaming: loop_start = time.time() try: if not self.cap or not self.cap.isOpened(): if reconnect_attempts < self.max_reconnect_attempts: self.logger.warning(f"相机连接丢失,尝试重连 ({reconnect_attempts + 1}/{self.max_reconnect_attempts})") if self._reconnect(): reconnect_attempts = 0 continue else: reconnect_attempts += 1 time.sleep(self.reconnect_delay) continue else: self.logger.error("相机重连失败次数过多,停止流") break ret, frame = self.cap.read() if not ret or frame is None: self.dropped_frames += 1 if self.dropped_frames > 10: self.logger.warning(f"连续丢帧过多: {self.dropped_frames}") # 仅在异常情况下触发一次GC,避免高频强制GC try: gc.collect() except Exception: pass self.dropped_frames = 0 # 防止空转占满CPU time.sleep(0.005) continue # 重置丢帧计数 self.dropped_frames = 0 # 处理帧(降采样以优化传输负载) processed_frame = self._process_frame(frame) # 缓存帧(不复制,减少内存占用) # self.last_frame = processed_frame # self.frame_cache.append(processed_frame) # 发送帧数据 self._send_frame_data(processed_frame) # 更新统计 self._update_statistics() # 主动释放局部引用,帮助GC更快识别可回收对象 del frame # 注意:processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身 # 限速:保证不超过目标FPS,减小发送端积压 now = time.time() # 下一个tick基于固定间隔前移,避免误差累积 next_tick += frame_interval sleep_time = next_tick - now if sleep_time > 0: time.sleep(sleep_time) else: # 如果处理耗时超过间隔,纠正节拍器,避免持续为负 next_tick = now except Exception as e: self.logger.error(f"相机流处理异常: {e}") # 小退避,避免异常情况下空转 time.sleep(0.02) self.logger.info("相机流工作线程结束") def _process_frame(self, frame: np.ndarray) -> np.ndarray: """ 处理视频帧 Args: frame: 原始帧 Returns: np.ndarray: 处理后的帧 """ try: # 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU) h, w = frame.shape[:2] if w > self._tx_max_width: scale = self._tx_max_width / float(w) new_w = self._tx_max_width new_h = int(h * scale) frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA) return frame except Exception as e: self.logger.error(f"处理帧失败: {e}") return frame def _send_frame_data(self, frame: np.ndarray): """ 发送帧数据 Args: frame: 视频帧 """ # 将临时对象局部化,并在 finally 中删除引用,加速回收 buffer = None frame_bytes = None frame_data = None try: # 编码为JPEG encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80] ok, buffer = cv2.imencode('.jpg', frame, encode_param) if not ok or buffer is None: self.logger.warning("帧JPEG编码失败") return # 转换为bytes再做base64,减少中间numpy对象的长生命周期 frame_bytes = buffer.tobytes() frame_data = base64.b64encode(frame_bytes).decode('utf-8') # 发送数据 data = { 'timestamp': time.time(), 'frame_count': self.frame_count, 'image': frame_data, 'fps': self.actual_fps, 'device_id': self.device_id } self._socketio.emit('camera_frame', data, namespace='/devices') except Exception as e: self.logger.error(f"发送帧数据失败: {e}") finally: # 显式删除临时大对象的引用,避免在高吞吐下堆积 del buffer del frame_bytes del frame_data def _update_statistics(self): """ 更新性能统计 """ self.frame_count += 1 self.fps_counter += 1 # 每秒计算一次实际FPS current_time = time.time() if current_time - self.fps_start_time >= 1.0: self.actual_fps = self.fps_counter / (current_time - self.fps_start_time) self.fps_counter = 0 self.fps_start_time = current_time # 更新性能统计 self.performance_stats.update({ 'frames_processed': self.frame_count, 'actual_fps': round(self.actual_fps, 2), 'dropped_frames': self.dropped_frames }) def _reconnect(self) -> bool: """ 重新连接相机 Returns: bool: 重连是否成功 """ try: if self.cap: try: self.cap.release() except Exception: pass self.cap = None time.sleep(1.0) # 等待设备释放 return self.initialize() except Exception as e: self.logger.error(f"相机重连失败: {e}") return False def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ status = super().get_status() status.update({ 'device_index': self.device_index, 'resolution': f"{self.width}x{self.height}", 'target_fps': self.fps, 'actual_fps': self.actual_fps, 'frame_count': self.frame_count, 'dropped_frames': self.dropped_frames, 'has_frame': self.last_frame is not None }) return status def capture_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]: """ 捕获单张图像 Args: save_path: 保存路径(可选) Returns: Optional[np.ndarray]: 捕获的图像,失败返回None """ try: if not self.is_connected or not self.cap: self.logger.error("相机未连接") return None ret, frame = self.cap.read() if not ret or frame is None: self.logger.error("捕获图像失败") return None if save_path: cv2.imwrite(save_path, frame) self.logger.info(f"图像已保存到: {save_path}") return frame except Exception as e: self.logger.error(f"捕获图像异常: {e}") return None def get_latest_frame(self) -> Optional[np.ndarray]: """ 获取最新帧 Returns: Optional[np.ndarray]: 最新帧,无帧返回None """ # 对外提供拷贝,内部保持原对象,避免重复持有 return self.last_frame.copy() if self.last_frame is not None else None def disconnect(self): """ 断开相机连接 """ try: self.stop_streaming() if self.cap: try: self.cap.release() except Exception: pass self.cap = None self.is_connected = False self.logger.info("相机已断开连接") except Exception as e: self.logger.error(f"断开相机连接失败: {e}") def cleanup(self): """ 清理资源 """ try: self.stop_streaming() if self.cap: try: self.cap.release() except Exception: pass self.cap = None self.frame_cache.clear() self.last_frame = None super().cleanup() self.logger.info("相机资源清理完成") except Exception as e: self.logger.error(f"清理相机资源失败: {e}")