BodyBalanceEvaluation/backend/devices/camera_manager.py

663 lines
22 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
普通相机管理器
负责普通USB摄像头的连接配置和数据采集
"""
import cv2
import threading
import time
import base64
import numpy as np
from typing import Optional, Dict, Any, Tuple
import logging
2025-08-21 12:04:14 +08:00
import queue
import gc
try:
from .base_device import BaseDevice
from .utils.socket_manager import SocketManager
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.socket_manager import SocketManager
from utils.config_manager import ConfigManager
class CameraManager(BaseDevice):
"""普通相机管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化相机管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
config = self.config_manager.get_device_config('camera')
super().__init__("camera", config)
# 保存socketio实例
self._socketio = socketio
# 相机相关属性
self.cap = None
self.device_index = config.get('device_index', 0)
self.width = config.get('width', 1280)
self.height = config.get('height', 720)
self.fps = config.get('fps', 30)
self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG')
2025-08-18 18:30:49 +08:00
# 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输)
self._tx_max_width = int(config.get('tx_max_width', 640))
# 流控制
self.streaming_thread = None
2025-08-18 18:30:49 +08:00
# 减小缓存长度保留最近2帧即可避免累计占用
2025-08-21 12:04:14 +08:00
self.frame_cache = queue.Queue(maxsize=int(config.get('frame_cache_len', 2)))
self.last_frame = None
self.frame_count = 0
self.dropped_frames = 0
# 性能监控
self.fps_counter = 0
self.fps_start_time = time.time()
self.actual_fps = 0
# 重连机制
self.max_reconnect_attempts = 3
self.reconnect_delay = 2.0
# 设备标识和性能统计
self.device_id = f"camera_{self.device_index}"
self.performance_stats = {
'frames_processed': 0,
'actual_fps': 0,
'dropped_frames': 0
}
2025-08-20 10:30:51 +08:00
2025-08-21 12:04:14 +08:00
# 全局帧队列(用于录制)
self.frame_queue = queue.Queue(maxsize=10) # 最大长度10自动丢弃旧帧
2025-08-18 18:30:49 +08:00
# OpenCV优化开关
try:
cv2.setUseOptimized(True)
except Exception:
pass
self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}")
def initialize(self) -> bool:
"""
初始化相机设备
Returns:
bool: 初始化是否成功
"""
try:
self.logger.info(f"正在初始化相机设备 {self.device_index}...")
2025-08-18 18:30:49 +08:00
# 尝试多个后端Windows下优先MSMF/DShow
backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY]
for backend in backends:
try:
self.cap = cv2.VideoCapture(self.device_index, backend)
if self.cap.isOpened():
self.logger.info(f"使用后端 {backend} 成功打开相机")
break
except Exception as e:
self.logger.warning(f"后端 {backend} 打开相机失败: {e}")
continue
else:
raise Exception("所有后端都无法打开相机")
# 设置相机属性
self._configure_camera()
# 验证相机是否正常工作
if not self._test_camera():
raise Exception("相机测试失败")
self.is_connected = True
self._device_info.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'fps': self.fps,
'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown'
})
self.logger.info("相机初始化成功")
return True
except Exception as e:
self.logger.error(f"相机初始化失败: {e}")
self.is_connected = False
if self.cap:
2025-08-18 18:30:49 +08:00
try:
self.cap.release()
except Exception:
pass
self.cap = None
return False
def _configure_camera(self):
"""
配置相机参数
"""
if not self.cap:
return
try:
# 设置FOURCC编码
if self.fourcc:
fourcc_code = cv2.VideoWriter_fourcc(*self.fourcc)
self.cap.set(cv2.CAP_PROP_FOURCC, fourcc_code)
# 设置分辨率
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
# 设置帧率
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
2025-08-18 18:30:49 +08:00
# 设置缓冲区大小(部分后端不生效)
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size)
except Exception:
pass
# 获取实际设置的值
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
self.logger.info(f"相机配置 - 分辨率: {actual_width}x{actual_height}, FPS: {actual_fps}")
except Exception as e:
self.logger.warning(f"配置相机参数失败: {e}")
def _test_camera(self) -> bool:
"""
测试相机是否正常工作
Returns:
bool: 测试是否成功
"""
try:
ret, frame = self.cap.read()
if ret and frame is not None:
self.logger.info(f"相机测试成功 - 帧大小: {frame.shape}")
return True
else:
self.logger.error("相机测试失败 - 无法读取帧")
return False
except Exception as e:
self.logger.error(f"相机测试异常: {e}")
return False
def calibrate(self) -> bool:
"""
校准相机对于普通相机主要是验证连接和设置
Returns:
bool: 校准是否成功
"""
try:
self.logger.info("开始相机校准...")
if not self.is_connected:
if not self.initialize():
return False
# 读取几帧来稳定相机
for i in range(5):
2025-08-18 18:30:49 +08:00
ret, _ = self.cap.read()
if not ret:
self.logger.warning(f"校准时读取第{i+1}帧失败")
self.logger.info("相机校准完成")
return True
except Exception as e:
self.logger.error(f"相机校准失败: {e}")
return False
def start_streaming(self) -> bool:
"""
开始数据流推送
Returns:
bool: 启动是否成功
"""
if self.is_streaming:
self.logger.warning("相机流已在运行")
return True
if not self.is_connected:
if not self.initialize():
return False
try:
self.is_streaming = True
self.streaming_thread = threading.Thread(
target=self._streaming_worker,
name=f"Camera-{self.device_index}-Stream",
daemon=True
)
self.streaming_thread.start()
self.logger.info("相机流启动成功")
return True
except Exception as e:
self.logger.error(f"启动相机流失败: {e}")
self.is_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止数据流推送
Returns:
bool: 停止是否成功
"""
try:
self.is_streaming = False
if self.streaming_thread and self.streaming_thread.is_alive():
2025-08-18 18:30:49 +08:00
# 等待线程退出
self.streaming_thread.join(timeout=3.0)
2025-08-18 18:30:49 +08:00
self.streaming_thread = None
self.logger.info("相机流已停止")
return True
except Exception as e:
self.logger.error(f"停止相机流失败: {e}")
return False
def _streaming_worker(self):
"""
流处理工作线程
"""
self.logger.info("相机流工作线程启动")
reconnect_attempts = 0
2025-08-18 18:30:49 +08:00
# 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积
frame_interval = 1.0 / max(self.fps, 1)
next_tick = time.time()
while self.is_streaming:
2025-08-18 18:30:49 +08:00
loop_start = time.time()
try:
if not self.cap or not self.cap.isOpened():
if reconnect_attempts < self.max_reconnect_attempts:
self.logger.warning(f"相机连接丢失,尝试重连 ({reconnect_attempts + 1}/{self.max_reconnect_attempts})")
if self._reconnect():
reconnect_attempts = 0
continue
else:
reconnect_attempts += 1
time.sleep(self.reconnect_delay)
continue
else:
self.logger.error("相机重连失败次数过多,停止流")
break
ret, frame = self.cap.read()
if not ret or frame is None:
self.dropped_frames += 1
if self.dropped_frames > 10:
self.logger.warning(f"连续丢帧过多: {self.dropped_frames}")
2025-08-18 18:30:49 +08:00
# 仅在异常情况下触发一次GC避免高频强制GC
try:
gc.collect()
except Exception:
pass
self.dropped_frames = 0
2025-08-18 18:30:49 +08:00
# 防止空转占满CPU
time.sleep(0.005)
continue
# 重置丢帧计数
self.dropped_frames = 0
2025-08-21 12:04:14 +08:00
# 保存原始帧到队列(用于录制)
try:
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Full:
# 队列满时丢弃最旧的帧,添加新帧
try:
self.frame_queue.get_nowait() # 移除最旧的帧
self.frame_queue.put_nowait({
'frame': frame.copy(),
'timestamp': time.time()
})
except queue.Empty:
pass # 队列为空,忽略
2025-08-20 10:30:51 +08:00
2025-08-18 18:30:49 +08:00
# 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame)
2025-08-18 18:30:49 +08:00
2025-08-21 12:04:14 +08:00
# # 缓存帧(不复制,减少内存占用)
# self.last_frame = processed_frame
# self.frame_cache.append(processed_frame)
# 发送帧数据
self._send_frame_data(processed_frame)
# 更新统计
self._update_statistics()
2025-08-18 18:30:49 +08:00
# 主动释放局部引用帮助GC更快识别可回收对象
del frame
# 注意processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身
# 限速保证不超过目标FPS减小发送端积压
now = time.time()
# 下一个tick基于固定间隔前移避免误差累积
next_tick += frame_interval
sleep_time = next_tick - now
if sleep_time > 0:
time.sleep(sleep_time)
else:
# 如果处理耗时超过间隔,纠正节拍器,避免持续为负
next_tick = now
except Exception as e:
self.logger.error(f"相机流处理异常: {e}")
2025-08-18 18:30:49 +08:00
# 小退避,避免异常情况下空转
time.sleep(0.02)
self.logger.info("相机流工作线程结束")
def _process_frame(self, frame: np.ndarray) -> np.ndarray:
"""
处理视频帧
Args:
frame: 原始帧
Returns:
np.ndarray: 处理后的帧
"""
try:
2025-08-18 18:30:49 +08:00
# 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU
h, w = frame.shape[:2]
if w > self._tx_max_width:
scale = self._tx_max_width / float(w)
new_w = self._tx_max_width
new_h = int(h * scale)
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
return frame
except Exception as e:
self.logger.error(f"处理帧失败: {e}")
return frame
def _send_frame_data(self, frame: np.ndarray):
"""
发送帧数据
Args:
frame: 视频帧
"""
2025-08-18 18:30:49 +08:00
# 将临时对象局部化,并在 finally 中删除引用,加速回收
buffer = None
frame_bytes = None
frame_data = None
try:
# 编码为JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
2025-08-18 18:30:49 +08:00
ok, buffer = cv2.imencode('.jpg', frame, encode_param)
if not ok or buffer is None:
self.logger.warning("帧JPEG编码失败")
return
2025-08-18 18:30:49 +08:00
# 转换为bytes再做base64减少中间numpy对象的长生命周期
frame_bytes = buffer.tobytes()
frame_data = base64.b64encode(frame_bytes).decode('utf-8')
# 发送数据
data = {
'timestamp': time.time(),
'frame_count': self.frame_count,
'image': frame_data,
'fps': self.actual_fps,
'device_id': self.device_id
}
2025-08-17 16:42:05 +08:00
self._socketio.emit('camera_frame', data, namespace='/devices')
except Exception as e:
self.logger.error(f"发送帧数据失败: {e}")
2025-08-18 18:30:49 +08:00
finally:
# 显式删除临时大对象的引用,避免在高吞吐下堆积
del buffer
del frame_bytes
del frame_data
def _update_statistics(self):
"""
更新性能统计
"""
self.frame_count += 1
self.fps_counter += 1
# 每秒计算一次实际FPS
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.actual_fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 更新性能统计
self.performance_stats.update({
'frames_processed': self.frame_count,
'actual_fps': round(self.actual_fps, 2),
'dropped_frames': self.dropped_frames
})
def _reconnect(self) -> bool:
"""
重新连接相机
Returns:
bool: 重连是否成功
"""
try:
if self.cap:
2025-08-18 18:30:49 +08:00
try:
self.cap.release()
except Exception:
pass
self.cap = None
time.sleep(1.0) # 等待设备释放
return self.initialize()
except Exception as e:
self.logger.error(f"相机重连失败: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
status = super().get_status()
status.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'target_fps': self.fps,
'actual_fps': self.actual_fps,
'frame_count': self.frame_count,
'dropped_frames': self.dropped_frames,
'has_frame': self.last_frame is not None
})
return status
def capture_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]:
"""
捕获单张图像
Args:
save_path: 保存路径可选
Returns:
Optional[np.ndarray]: 捕获的图像失败返回None
"""
try:
if not self.is_connected or not self.cap:
self.logger.error("相机未连接")
return None
ret, frame = self.cap.read()
if not ret or frame is None:
self.logger.error("捕获图像失败")
return None
if save_path:
cv2.imwrite(save_path, frame)
self.logger.info(f"图像已保存到: {save_path}")
return frame
except Exception as e:
self.logger.error(f"捕获图像异常: {e}")
return None
2025-08-21 12:04:14 +08:00
def disconnect(self):
"""
断开相机连接
"""
try:
self.stop_streaming()
if self.cap:
2025-08-18 18:30:49 +08:00
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
self.logger.info("相机已断开连接")
except Exception as e:
self.logger.error(f"断开相机连接失败: {e}")
def cleanup(self):
"""
清理资源
"""
try:
self.stop_streaming()
if self.cap:
2025-08-18 18:30:49 +08:00
try:
self.cap.release()
except Exception:
pass
self.cap = None
2025-08-21 12:04:14 +08:00
# 清理帧缓存
while not self.frame_cache.empty():
try:
self.frame_cache.get_nowait()
except queue.Empty:
break
self.last_frame = None
2025-08-21 12:04:14 +08:00
# 清理帧队列
while not self.frame_queue.empty():
try:
self.frame_queue.get_nowait()
except queue.Empty:
break
2025-08-20 10:30:51 +08:00
super().cleanup()
self.logger.info("相机资源清理完成")
except Exception as e:
2025-08-18 18:30:49 +08:00
self.logger.error(f"清理相机资源失败: {e}")
2025-08-20 10:30:51 +08:00
def _save_frame_to_cache(self, frame, frame_type='camera'):
"""保存帧到全局缓存"""
try:
with self.frame_cache_lock:
current_time = time.time()
# 清理过期帧
self._cleanup_expired_frames()
# 如果缓存已满,移除最旧的帧
if frame_type in self.global_frame_cache and len(self.global_frame_cache[frame_type]) >= self.max_cache_size:
oldest_key = min(self.global_frame_cache[frame_type].keys())
del self.global_frame_cache[frame_type][oldest_key]
# 初始化帧类型缓存
if frame_type not in self.global_frame_cache:
self.global_frame_cache[frame_type] = {}
# 保存帧(深拷贝避免引用问题)
frame_data = {
'frame': frame.copy(),
'timestamp': current_time,
'frame_id': len(self.global_frame_cache[frame_type])
}
self.global_frame_cache[frame_type][current_time] = frame_data
except Exception as e:
self.logger.error(f'保存帧到缓存失败: {e}')
def _get_latest_frame_from_cache(self, frame_type='camera'):
2025-08-21 12:04:14 +08:00
"""从队列获取最新帧"""
2025-08-20 10:30:51 +08:00
try:
2025-08-21 12:04:14 +08:00
if self.frame_queue.empty():
self.logger.debug('帧队列为空')
return None, None
2025-08-20 10:30:51 +08:00
2025-08-21 12:04:14 +08:00
# 获取队列中的所有帧,保留最新的一个
frames = []
while not self.frame_queue.empty():
try:
frames.append(self.frame_queue.get_nowait())
except queue.Empty:
break
if not frames:
return None, None
# 获取最新帧(最后一个)
latest_frame = frames[-1]
# 将最新帧重新放回队列
try:
self.frame_queue.put_nowait(latest_frame)
except queue.Full:
pass # 队列满时忽略
return latest_frame['frame'].copy(), latest_frame['timestamp']
2025-08-20 10:30:51 +08:00
except Exception as e:
2025-08-21 12:04:14 +08:00
self.logger.error(f'从队列获取帧失败: {e}')
return None, None