BodyBalanceEvaluation/backend/devices/camera_manager.py

1105 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
普通相机管理器
负责普通USB摄像头的连接、配置和数据采集
"""
import cv2
import threading
import time
import base64
import numpy as np
from typing import Optional, Dict, Any
import logging
import queue
import gc
try:
from .base_device import BaseDevice
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.config_manager import ConfigManager
class CameraManager(BaseDevice):
"""普通相机管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化相机管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
config = self.config_manager.get_device_config('camera')
super().__init__("camera", config)
# 保存socketio实例
self._socketio = socketio
# 相机相关属性
self.cap = None
self.device_index = config.get('device_index', 0)
self.width = config.get('width', 1280)
self.height = config.get('height', 720)
self.fps = config.get('fps', 30)
self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG')
# OpenCV后端配置 (DirectShow性能最佳)
backend_name = config.get('backend', 'directshow').lower()
self.backend_map = {
'directshow': cv2.CAP_DSHOW,
'dshow': cv2.CAP_DSHOW,
'msmf': cv2.CAP_MSMF,
'any': cv2.CAP_ANY
}
self.preferred_backend = self.backend_map.get(backend_name, cv2.CAP_DSHOW)
self.backend_name = backend_name
# 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输)
self._tx_max_width = int(config.get('tx_max_width', 640))
# 流控制
self.streaming_thread = None
# 减小缓存长度保留最近2帧即可避免累计占用
self.frame_cache = queue.Queue(maxsize=int(config.get('frame_cache_len', 2)))
self.last_frame = None
self.frame_count = 0
self.dropped_frames = 0
# 性能监控
self.fps_counter = 0
self.fps_start_time = time.time()
self.actual_fps = 0
# 重连与断连检测机制(-1 表示无限重连)
self.max_reconnect_attempts = int(config.get('max_reconnect_attempts', -1))
self.reconnect_delay = float(config.get('reconnect_delay', 2.0))
self.read_fail_threshold = int(config.get('read_fail_threshold', 30))
self._last_connected_state = None
# 设备标识和性能统计
self.device_id = f"camera_{self.device_index}"
self.performance_stats = {
'frames_processed': 0,
'actual_fps': 0,
'dropped_frames': 0
}
# 全局帧队列(用于录制)
self.frame_queue = queue.Queue(maxsize=10) # 最大长度10自动丢弃旧帧
# 属性缓存机制 - 避免重复设置相同属性值
self._property_cache = {}
self._cache_enabled = True
# OpenCV优化开关和性能设置
try:
cv2.setUseOptimized(True)
# 设置OpenCV线程数以提高性能
cv2.setNumThreads(4) # 使用4个线程
except Exception:
pass
self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}")
def _set_property_optimized(self, prop, value):
"""
优化的属性设置方法,使用缓存避免重复设置
Args:
prop: OpenCV属性常量
value: 属性值
Returns:
bool: 是否实际执行了设置操作
"""
if not self.cap:
return False
# 检查缓存,避免重复设置相同值
if self._cache_enabled and prop in self._property_cache:
if self._property_cache[prop] == value:
return False # 值未改变,跳过设置
# 执行属性设置
result = self.cap.set(prop, value)
# 更新缓存
if self._cache_enabled:
self._property_cache[prop] = value
return True
def initialize(self) -> bool:
"""
初始化相机设备
Returns:
bool: 初始化是否成功
"""
start_time = time.time()
try:
self.logger.info(f"正在初始化相机设备...")
# 使用构造函数中已加载的配置,避免并发读取配置文件
config_time = time.time()
self.logger.info(f"使用已加载配置: device_index={self.device_index}, resolution={self.width}x{self.height}, fps={self.fps} (耗时: {(config_time - start_time)*1000:.1f}ms)")
# 使用配置的后端,如果失败则尝试其他后端
if self.preferred_backend == cv2.CAP_DSHOW:
backends = [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
elif self.preferred_backend == cv2.CAP_MSMF:
backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY]
else:
backends = [self.preferred_backend, cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
camera_open_time = time.time()
for backend in backends:
backend_start = time.time()
try:
# 快速打开相机,减少超时等待
self.cap = cv2.VideoCapture(self.device_index, backend)
# 设置较短的超时时间以加快检测
if hasattr(cv2, 'CAP_PROP_OPEN_TIMEOUT_MSEC'):
self.cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 3000) # 3秒超时
if self.cap.isOpened():
backend_time = (time.time() - backend_start) * 1000
self.logger.info(f"使用后端 {backend} 成功打开相机 (耗时: {backend_time:.1f}ms)")
break
except Exception as e:
backend_time = (time.time() - backend_start) * 1000
self.logger.warning(f"后端 {backend} 打开相机失败: {e} (耗时: {backend_time:.1f}ms)")
continue
else:
total_open_time = (time.time() - camera_open_time) * 1000
self.logger.warning(f"所有后端都无法打开相机,相机设备不可用 (总耗时: {total_open_time:.1f}ms)")
return False
total_open_time = (time.time() - camera_open_time) * 1000
self.logger.info(f"相机打开完成 (总耗时: {total_open_time:.1f}ms)")
# 设置相机属性
config_start = time.time()
self._configure_camera()
config_time = (time.time() - config_start) * 1000
self.logger.info(f"相机配置完成 (耗时: {config_time:.1f}ms)")
# 验证相机是否正常工作(优化测试过程)
test_start = time.time()
if not self._test_camera():
test_time = (time.time() - test_start) * 1000
self.logger.warning(f"相机测试失败,相机设备不可用 (耗时: {test_time:.1f}ms)")
return False
test_time = (time.time() - test_start) * 1000
self.logger.info(f"相机测试完成 (耗时: {test_time:.1f}ms)")
# 使用set_connected方法来正确启动连接监控线程
self.set_connected(True)
self._last_connected_state = True
self._device_info.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'fps': self.fps,
'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown'
})
total_time = (time.time() - start_time) * 1000
self.logger.info(f"相机初始化成功 (总耗时: {total_time:.1f}ms)")
return True
except Exception as e:
error_msg = f"相机初始化失败: {e}"
# 提供更详细的错误信息和解决建议
if "所有后端都无法打开相机" in str(e):
error_msg += f"\n可能原因:\n1. 相机设备索引 {self.device_index} 不存在或被其他程序占用\n2. 相机驱动程序未正确安装\n3. 相机硬件连接问题\n建议:检查相机连接状态,尝试更换设备索引或重启相机设备"
elif "Camera index out of range" in str(e):
error_msg += f"\n相机设备索引 {self.device_index} 超出范围,请检查可用的相机设备"
elif "backend" in str(e).lower():
error_msg += "\n相机后端初始化失败,可能是驱动程序问题"
self.logger.error(error_msg)
self.is_connected = False
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
return False
def _configure_camera(self):
"""
配置相机参数
"""
if not self.cap:
return
try:
# 批量设置相机属性以提高效率
config_start = time.time()
# 设置缓冲区大小(优先设置,减少延迟)
buffer_start = time.time()
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, min(self.buffer_size, 1)) # 使用最小缓冲区减少延迟
except Exception:
pass
buffer_time = (time.time() - buffer_start) * 1000
self.logger.debug(f"缓冲区设置耗时: {buffer_time:.1f}ms")
# 性能优化设置:禁用可能导致延迟的自动功能
optimization_start = time.time()
try:
# 禁用自动曝光以减少处理时间
self.cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # 手动曝光模式
# 禁用自动白平衡
self.cap.set(cv2.CAP_PROP_AUTO_WB, 0)
# 设置较低的曝光值以减少延迟
self.cap.set(cv2.CAP_PROP_EXPOSURE, -6)
except Exception as e:
self.logger.debug(f"设置性能优化参数时出现警告: {e}")
optimization_time = (time.time() - optimization_start) * 1000
self.logger.debug(f"性能优化设置耗时: {optimization_time:.1f}ms")
# 激进优化:跳过非关键属性设置,只设置必要属性
resolution_start = time.time()
# 优先级属性设置:只设置最关键的属性
critical_properties = [
(cv2.CAP_PROP_FRAME_WIDTH, self.width),
(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
]
# 可选属性(在某些情况下可以跳过)
optional_properties = [
(cv2.CAP_PROP_FPS, self.fps)
]
batch_start = time.time()
actual_sets = 0
skipped_sets = 0
# 设置关键属性
for prop, value in critical_properties:
try:
if self._set_property_optimized(prop, value):
actual_sets += 1
else:
skipped_sets += 1
except Exception as e:
self.logger.debug(f"设置属性 {prop} 失败: {e}")
skipped_sets += 1
# 条件设置可选属性(如果时间允许)
current_time = (time.time() - batch_start) * 1000
if current_time < 1000: # 如果已用时间少于1秒才设置FPS
for prop, value in optional_properties:
try:
if self._set_property_optimized(prop, value):
actual_sets += 1
else:
skipped_sets += 1
except Exception as e:
self.logger.debug(f"设置可选属性 {prop} 失败: {e}")
skipped_sets += 1
else:
self.logger.debug(f"跳过FPS设置以节省时间 (已耗时: {current_time:.1f}ms)")
skipped_sets += len(optional_properties)
batch_time = (time.time() - batch_start) * 1000
resolution_time = (time.time() - resolution_start) * 1000
self.logger.debug(f"优化属性设置耗时: {batch_time:.1f}ms, 实际设置: {actual_sets}, 跳过: {skipped_sets}, 总计: {resolution_time:.1f}ms")
# 将原来的单独计时变量设为批量时间的一部分用于兼容性
width_time = batch_time * 0.5 # 估算宽度设置占比
height_time = batch_time * 0.4 # 估算高度设置占比
fps_time = batch_time * 0.1 # 估算帧率设置占比(可能被跳过)
# 设置FOURCC编码如果需要
if self.fourcc:
fourcc_start = time.time()
fourcc_code = cv2.VideoWriter_fourcc(*self.fourcc)
self.cap.set(cv2.CAP_PROP_FOURCC, fourcc_code)
fourcc_time = (time.time() - fourcc_start) * 1000
self.logger.debug(f"FOURCC设置耗时: {fourcc_time:.1f}ms")
# 激进优化:延迟验证机制 - 只在必要时验证
verification_start = time.time()
# 检查是否需要验证(基于配置或调试需求)
need_verification = self.logger.isEnabledFor(logging.DEBUG) or getattr(self, '_force_verification', False)
if need_verification:
# 只读取关键属性进行验证
batch_read_start = time.time()
try:
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# FPS验证可选因为某些相机不支持精确FPS设置
actual_fps = self.cap.get(cv2.CAP_PROP_FPS) if actual_sets > 2 else self.fps
batch_read_time = (time.time() - batch_read_start) * 1000
self.logger.debug(f"验证读取耗时: {batch_read_time:.1f}ms")
except Exception as e:
# 验证失败不影响主流程
actual_width, actual_height, actual_fps = self.width, self.height, self.fps
batch_read_time = (time.time() - batch_read_start) * 1000
self.logger.debug(f"属性验证失败,使用默认值: {e}")
else:
# 跳过验证,使用设置值
actual_width, actual_height, actual_fps = self.width, self.height, self.fps
batch_read_time = 0.0
self.logger.debug("跳过属性验证以节省时间")
verification_time = (time.time() - verification_start) * 1000
# 为兼容性保留单独计时变量
width_read_time = batch_read_time * 0.4
height_read_time = batch_read_time * 0.4
fps_read_time = batch_read_time * 0.2
self.logger.debug(f"延迟验证耗时: {batch_read_time:.1f}ms, 总计: {verification_time:.1f}ms")
total_config_time = (time.time() - config_start) * 1000
self.logger.info(f"相机配置完成 - 分辨率: {actual_width}x{actual_height}, FPS: {actual_fps}")
self.logger.info(f"配置耗时统计 - 缓冲区: {buffer_time:.1f}ms, 优化设置: {optimization_time:.1f}ms, 分辨率: {resolution_time:.1f}ms, 帧率: {fps_time:.1f}ms, 验证: {verification_time:.1f}ms, 总计: {total_config_time:.1f}ms")
self.logger.debug(f"配置详情 - 分辨率设置: {resolution_time:.1f}ms, FPS设置: {fps_time:.1f}ms, 验证: {verification_time:.1f}ms, 总计: {total_config_time:.1f}ms")
except Exception as e:
self.logger.warning(f"配置相机参数失败: {e}")
def _test_camera(self) -> bool:
"""
测试相机是否正常工作
Returns:
bool: 测试是否成功
"""
try:
# 快速测试:只读取一帧进行验证
read_start = time.time()
ret, frame = self.cap.read()
read_time = (time.time() - read_start) * 1000
if ret and frame is not None:
# 基本帧验证
if len(frame.shape) >= 2 and frame.shape[0] > 0 and frame.shape[1] > 0:
self.logger.info(f"相机测试成功 - 帧大小: {frame.shape}, 读取耗时: {read_time:.1f}ms")
# 清理测试帧内存
del frame
return True
else:
self.logger.error(f"相机测试失败 - 帧数据无效: {frame.shape if frame is not None else 'None'}")
return False
else:
self.logger.error(f"相机测试失败 - 无法读取帧, 读取耗时: {read_time:.1f}ms")
return False
except Exception as e:
self.logger.error(f"相机测试异常: {e}")
return False
def calibrate(self) -> bool:
"""
校准相机(对于普通相机,主要是验证连接和设置)
Returns:
bool: 校准是否成功
"""
calibrate_start = time.time()
try:
self.logger.info("开始相机校准...")
if not self.is_connected:
init_start = time.time()
if not self.initialize():
return False
init_time = (time.time() - init_start) * 1000
self.logger.info(f"校准中初始化完成 (耗时: {init_time:.1f}ms)")
# 优化减少稳定帧数量只读取2帧来稳定相机
stabilize_start = time.time()
stable_frames = 2 # 减少从5帧到2帧
for i in range(stable_frames):
frame_start = time.time()
ret, frame = self.cap.read()
frame_time = (time.time() - frame_start) * 1000
if not ret:
self.logger.warning(f"校准时读取第{i+1}帧失败 (耗时: {frame_time:.1f}ms)")
else:
# 立即释放帧内存
if frame is not None:
del frame
self.logger.debug(f"校准帧{i+1}读取成功 (耗时: {frame_time:.1f}ms)")
stabilize_time = (time.time() - stabilize_start) * 1000
total_time = (time.time() - calibrate_start) * 1000
self.logger.info(f"相机校准完成 - 稳定化耗时: {stabilize_time:.1f}ms, 总耗时: {total_time:.1f}ms")
return True
except Exception as e:
total_time = (time.time() - calibrate_start) * 1000
self.logger.error(f"相机校准失败: {e} (耗时: {total_time:.1f}ms)")
return False
def start_streaming(self) -> bool:
"""
开始数据流推送
Returns:
bool: 启动是否成功
"""
if self.is_streaming:
self.logger.warning("相机流已在运行")
return True
if not self.is_connected:
if not self.initialize():
return False
try:
self.is_streaming = True
self.streaming_thread = threading.Thread(
target=self._streaming_worker,
name=f"Camera-{self.device_index}-Stream",
daemon=True
)
self.streaming_thread.start()
self.logger.info("相机流启动成功")
return True
except Exception as e:
self.logger.error(f"启动相机流失败: {e}")
self.is_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止数据流推送
Returns:
bool: 停止是否成功
"""
try:
self.is_streaming = False
if self.streaming_thread and self.streaming_thread.is_alive():
# 等待线程退出
self.streaming_thread.join(timeout=3.0)
self.streaming_thread = None
self.logger.info("相机流已停止")
return True
except Exception as e:
self.logger.error(f"停止相机流失败: {e}")
return False
def _streaming_worker(self):
"""
流处理工作线程
"""
self.logger.info("相机流工作线程启动")
reconnect_attempts = 0
consecutive_read_failures = 0
# 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积
frame_interval = 1.0 / max(self.fps, 1)
next_tick = time.time()
while self.is_streaming:
loop_start = time.time()
try:
# 如果设备未打开,进入重连流程
if not self.cap or not self.cap.isOpened():
# 仅在状态变化时广播一次断连状态
if self._last_connected_state is not False:
try:
self._socketio.emit('camera_status', {
'status': 'disconnected',
'device_id': self.device_id,
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = False
# 无限重连max_reconnect_attempts == -1否则按次数重试
if self.max_reconnect_attempts == -1 or reconnect_attempts < self.max_reconnect_attempts:
self.logger.warning(f"相机连接丢失,尝试重连 ({'' if self.max_reconnect_attempts == -1 else reconnect_attempts + 1}/{self.max_reconnect_attempts if self.max_reconnect_attempts != -1 else ''})")
if not self.is_streaming:
break
if self._reconnect():
reconnect_attempts = 0
consecutive_read_failures = 0
# 广播恢复
try:
self._socketio.emit('camera_status', {
'status': 'connected',
'device_id': self.device_id,
'timestamp': time.time()
}, namespace='/devices')
except Exception:
pass
self._last_connected_state = True
continue
else:
reconnect_attempts += 1
time.sleep(self.reconnect_delay)
continue
else:
# 超过次数也不退出线程,降频重试,防止永久停机
self.logger.error("相机重连失败次数过多,进入降频重试模式")
time.sleep(max(self.reconnect_delay, 5.0))
# 重置计数以便继续尝试
reconnect_attempts = 0
continue
ret, frame = self.cap.read()
if not ret or frame is None:
consecutive_read_failures += 1
self.dropped_frames += 1
if consecutive_read_failures >= getattr(self, 'read_fail_threshold', 30):
self.logger.warning(f"连续读帧失败 {consecutive_read_failures} 次,执行相机软复位并进入重连")
try:
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
except Exception:
pass
# 进入下一轮循环会走到未打开分支
consecutive_read_failures = 0
time.sleep(self.reconnect_delay)
continue
if self.dropped_frames > 10:
self.logger.warning(f"连续丢帧过多: {self.dropped_frames}")
# 仅在异常情况下触发一次GC避免高频强制GC
try:
gc.collect()
except Exception:
pass
self.dropped_frames = 0
# 防止空转占满CPU
time.sleep(0.02)
continue
# 读帧成功,重置失败计数
consecutive_read_failures = 0
self.dropped_frames = 0
# 更新心跳时间,防止连接监控线程判定为超时
self.update_heartbeat()
# 保存原始帧到队列(用于录制)
try:
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Full:
# 队列满时丢弃最旧的帧,添加新帧
try:
self.frame_queue.get_nowait() # 移除最旧的帧
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Empty:
pass # 队列为空,忽略
# 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame)
# 发送帧数据
self._send_frame_data(processed_frame)
# 更新统计
self._update_statistics()
# 主动释放局部引用帮助GC更快识别可回收对象
del frame
# 限速保证不超过目标FPS减小发送端积压
now = time.time()
# 下一个tick基于固定间隔前移避免误差累积
next_tick += frame_interval
sleep_time = next_tick - now
if sleep_time > 0:
time.sleep(sleep_time)
else:
# 如果处理耗时超过间隔,纠正节拍器,避免持续为负
next_tick = now
except Exception as e:
self.logger.error(f"相机流处理异常: {e}")
# 小退避,避免异常情况下空转
time.sleep(0.05)
self.logger.info("相机流工作线程结束")
def _process_frame(self, frame: np.ndarray) -> np.ndarray:
"""
处理视频帧
Args:
frame: 原始帧
Returns:
np.ndarray: 处理后的帧
"""
try:
# 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU
h, w = frame.shape[:2]
if w > self._tx_max_width:
scale = self._tx_max_width / float(w)
new_w = self._tx_max_width
new_h = int(h * scale)
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
return frame
except Exception as e:
self.logger.error(f"处理帧失败: {e}")
return frame
def _send_frame_data(self, frame: np.ndarray):
"""
发送帧数据
Args:
frame: 视频帧
"""
# 将临时对象局部化,并在 finally 中删除引用,加速回收
buffer = None
frame_bytes = None
frame_data = None
try:
# 编码为JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
ok, buffer = cv2.imencode('.jpg', frame, encode_param)
if not ok or buffer is None:
self.logger.warning("帧JPEG编码失败")
return
# 转换为bytes再做base64减少中间numpy对象的长生命周期
frame_bytes = buffer.tobytes()
frame_data = base64.b64encode(frame_bytes).decode('utf-8')
# 发送数据
data = {
'timestamp': time.time(),
'frame_count': self.frame_count,
'image': frame_data,
'fps': self.actual_fps,
'device_id': self.device_id
}
self._socketio.emit('camera_frame', data, namespace='/devices')
except Exception as e:
self.logger.error(f"发送帧数据失败: {e}")
finally:
# 显式删除临时大对象的引用,避免在高吞吐下堆积
del buffer
del frame_bytes
del frame_data
def _update_statistics(self):
"""
更新性能统计
"""
self.frame_count += 1
self.fps_counter += 1
# 每秒计算一次实际FPS
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.actual_fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 更新性能统计
self.performance_stats.update({
'frames_processed': self.frame_count,
'actual_fps': round(self.actual_fps, 2),
'dropped_frames': self.dropped_frames
})
def _reconnect(self) -> bool:
"""
重新连接相机
Returns:
bool: 重连是否成功
"""
try:
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
time.sleep(1.0) # 等待设备释放
return self.initialize()
except Exception as e:
self.logger.error(f"相机重连失败: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
status = super().get_status()
status.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'target_fps': self.fps,
'actual_fps': self.actual_fps,
'frame_count': self.frame_count,
'dropped_frames': self.dropped_frames,
'has_frame': self.last_frame is not None
})
return status
def capture_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]:
"""
捕获单张图像
Args:
save_path: 保存路径(可选)
Returns:
Optional[np.ndarray]: 捕获的图像失败返回None
"""
try:
if not self.is_connected or not self.cap:
self.logger.error("相机未连接")
return None
ret, frame = self.cap.read()
if not ret or frame is None:
self.logger.error("捕获图像失败")
return None
if save_path:
cv2.imwrite(save_path, frame)
self.logger.info(f"图像已保存到: {save_path}")
return frame
except Exception as e:
self.logger.error(f"捕获图像异常: {e}")
return None
def disconnect(self):
"""
断开相机连接
"""
try:
self.stop_streaming()
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
self.logger.info("相机已断开连接")
except Exception as e:
self.logger.error(f"断开相机连接失败: {e}")
def reload_config(self) -> bool:
"""
重新加载设备配置
Returns:
bool: 重新加载是否成功
"""
try:
self.logger.info("正在重新加载相机配置...")
# 获取最新配置
config = self.config_manager.get_device_config('camera')
# 更新配置属性
self.device_index = config.get('device_index', 0)
self.width = config.get('width', 1280)
self.height = config.get('height', 720)
self.fps = config.get('fps', 30)
self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG')
self._tx_max_width = int(config.get('tx_max_width', 640))
# 新增:动态更新重连/阈值配置
self.max_reconnect_attempts = int(config.get('max_reconnect_attempts', self.max_reconnect_attempts))
self.reconnect_delay = float(config.get('reconnect_delay', self.reconnect_delay))
self.read_fail_threshold = int(config.get('read_fail_threshold', self.read_fail_threshold))
# 更新帧缓存队列大小
frame_cache_len = int(config.get('frame_cache_len', 2))
if frame_cache_len != self.frame_cache.maxsize:
# 清空旧队列
while not self.frame_cache.empty():
try:
self.frame_cache.get_nowait()
except queue.Empty:
break
# 创建新队列
self.frame_cache = queue.Queue(maxsize=frame_cache_len)
# 更新设备信息
self.device_id = f"camera_{self.device_index}"
self.logger.info(f"相机配置重新加载成功 - 设备索引: {self.device_index}, 分辨率: {self.width}x{self.height}, FPS: {self.fps}")
return True
except Exception as e:
self.logger.error(f"重新加载相机配置失败: {e}")
return False
def check_hardware_connection(self) -> bool:
"""
检查相机硬件连接状态
Returns:
bool: 相机是否物理连接
"""
try:
if not self.cap:
# 如果相机实例不存在,尝试重新创建
self.logger.info("相机实例不存在,尝试重新创建-----------------")
return self._attempt_device_reconnection()
if not self.cap.isOpened():
# 相机未打开,尝试重连
self.logger.info("相机未打开,尝试重新连接-----------------")
return self._attempt_device_reconnection()
# 多层次验证相机连接状态
try:
# 第一步使用grab()方法快速清除所有缓存帧
self.logger.debug("快速清除相机缓存帧...")
try:
# grab()方法只获取帧但不解码,速度更快
# 连续grab多次以清空内部缓冲区
for _ in range(15): # 增加清除次数,确保缓存完全清空
if not self.cap.grab():
break # 如果grab失败说明没有更多缓存帧
except Exception as e:
self.logger.debug(f"清除缓存帧时出现异常: {e}")
# 第二步:严格的连续帧检测
failed_count = 0
total_frames = 15 # 增加检测帧数
consecutive_failures = 0 # 连续失败计数
for i in range(total_frames):
try:
ret, frame = self.cap.read()
if ret and frame is not None:
# 验证帧数据的有效性
if self._validate_frame_data(frame):
del frame
consecutive_failures = 0 # 重置连续失败计数
else:
failed_count += 1
consecutive_failures += 1
del frame
else:
failed_count += 1
consecutive_failures += 1
# 如果连续3帧失败立即判定为断开
if consecutive_failures >= 3:
self.logger.warning(f"相机连接检测失败:连续{consecutive_failures}帧失败")
return False
# 如果总失败帧数超过30%,判定为断开
if failed_count > total_frames * 0.3:
self.logger.warning(f"相机连接检测失败:{failed_count}/{i+1}帧读取失败超过30%阈值")
return False
except Exception as e:
failed_count += 1
consecutive_failures += 1
self.logger.debug(f"读取第{i+1}帧时异常: {e}")
# 连续异常也判定为断开
if consecutive_failures >= 3:
self.logger.warning(f"相机连接检测异常:连续{consecutive_failures}帧异常")
return False
# 短暂延时,避免过快读取
time.sleep(0.005) # 减少延时提高检测速度
# 第三步:最终判断
success_rate = (total_frames - failed_count) / total_frames
if success_rate >= 0.7: # 成功率需要达到70%
self.logger.info(f"相机连接检测成功:{total_frames-failed_count}/{total_frames}帧读取成功,成功率{success_rate:.1%}")
return True
else:
self.logger.warning(f"相机连接检测失败:成功率{success_rate:.1%}低于70%阈值")
return False
except Exception as e:
self.logger.warning(f"相机连接检测过程中发生异常: {e}")
return False
except Exception as e:
self.logger.debug(f"检查相机硬件连接时发生异常: {e}")
return False
def _validate_frame_data(self, frame) -> bool:
"""
验证帧数据的有效性
Args:
frame: 要验证的帧数据
Returns:
bool: 帧数据是否有效
"""
try:
if frame is None:
return False
# 检查帧尺寸
if frame.shape[0] < 10 or frame.shape[1] < 10:
return False
# 检查帧数据是否全为零(可能是无效帧)
if np.all(frame == 0):
return False
# 检查帧数据的方差(全黑或全白帧可能是无效的)
if np.var(frame) < 1.0:
return False
return True
except Exception:
return False
def _attempt_device_reconnection(self) -> bool:
"""
尝试重新连接相机设备
Returns:
bool: 重连是否成功
"""
try:
self.logger.info("检测到相机设备断开,尝试重新连接...")
# 清理旧的相机实例
if self.cap:
try:
self.cap.release()
except Exception as e:
self.logger.debug(f"清理旧相机实例时出错: {e}")
self.cap = None
# 等待设备释放
time.sleep(0.5)
# 重新初始化相机
if self.initialize():
self._notify_status_change(True)
# 重连成功后,确保数据流正在运行
if not self.is_streaming:
self.logger.info("重连成功,启动相机数据流")
self.start_streaming()
# 更新设备信息
self._device_info.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'fps': self.fps,
'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown'
})
return True
else:
self.logger.warning("相机设备重连失败")
return False
except Exception as e:
self.logger.error(f"相机设备重连过程中出错: {e}")
self.cap = None
return False
def cleanup(self):
"""
清理资源
"""
try:
self.logger.info("开始清理相机资源")
# 清理监控线程
self._cleanup_monitoring()
# 停止流
if self.is_streaming:
self.stop_streaming()
# 断开连接
self.disconnect()
# 清理帧缓存
while not self.frame_cache.empty():
try:
self.frame_cache.get_nowait()
except queue.Empty:
break
# 清理全局帧队列
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
self.last_frame = None
super().cleanup()
self.logger.info("相机资源清理完成")
except Exception as e:
self.logger.error(f"清理相机资源时发生错误: {e}")