BodyBalanceEvaluation/backend/devices/camera_manager.py
2025-08-20 10:30:51 +08:00

666 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
普通相机管理器
负责普通USB摄像头的连接、配置和数据采集
"""
import cv2
import threading
import time
import base64
import numpy as np
from typing import Optional, Dict, Any, Tuple
import logging
from collections import deque
import gc
try:
from .base_device import BaseDevice
from .utils.socket_manager import SocketManager
from .utils.config_manager import ConfigManager
except ImportError:
from base_device import BaseDevice
from utils.socket_manager import SocketManager
from utils.config_manager import ConfigManager
class CameraManager(BaseDevice):
"""普通相机管理器"""
def __init__(self, socketio, config_manager: Optional[ConfigManager] = None):
"""
初始化相机管理器
Args:
socketio: SocketIO实例
config_manager: 配置管理器实例
"""
# 配置管理
self.config_manager = config_manager or ConfigManager()
config = self.config_manager.get_device_config('camera')
super().__init__("camera", config)
# 保存socketio实例
self._socketio = socketio
# 相机相关属性
self.cap = None
self.device_index = config.get('device_index', 0)
self.width = config.get('width', 1280)
self.height = config.get('height', 720)
self.fps = config.get('fps', 30)
self.buffer_size = config.get('buffer_size', 1)
self.fourcc = config.get('fourcc', 'MJPG')
# 额外可调的降采样宽度(不改变外部配置语义,仅内部优化传输)
self._tx_max_width = int(config.get('tx_max_width', 640))
# 流控制
self.streaming_thread = None
# 减小缓存长度保留最近2帧即可避免累计占用
self.frame_cache = deque(maxlen=int(config.get('frame_cache_len', 2)))
self.last_frame = None
self.frame_count = 0
self.dropped_frames = 0
# 性能监控
self.fps_counter = 0
self.fps_start_time = time.time()
self.actual_fps = 0
# 重连机制
self.max_reconnect_attempts = 3
self.reconnect_delay = 2.0
# 设备标识和性能统计
self.device_id = f"camera_{self.device_index}"
self.performance_stats = {
'frames_processed': 0,
'actual_fps': 0,
'dropped_frames': 0
}
# 全局帧缓存(用于录制)
self.global_frame_cache = {}
self.frame_cache_lock = threading.Lock()
self.max_cache_size = 10
self.cache_timeout = 5.0 # 5秒超时
# OpenCV优化开关
try:
cv2.setUseOptimized(True)
except Exception:
pass
self.logger.info(f"相机管理器初始化完成 - 设备索引: {self.device_index}")
def initialize(self) -> bool:
"""
初始化相机设备
Returns:
bool: 初始化是否成功
"""
try:
self.logger.info(f"正在初始化相机设备 {self.device_index}...")
# 尝试多个后端Windows下优先MSMF/DShow
backends = [cv2.CAP_MSMF, cv2.CAP_DSHOW, cv2.CAP_ANY]
for backend in backends:
try:
self.cap = cv2.VideoCapture(self.device_index, backend)
if self.cap.isOpened():
self.logger.info(f"使用后端 {backend} 成功打开相机")
break
except Exception as e:
self.logger.warning(f"后端 {backend} 打开相机失败: {e}")
continue
else:
raise Exception("所有后端都无法打开相机")
# 设置相机属性
self._configure_camera()
# 验证相机是否正常工作
if not self._test_camera():
raise Exception("相机测试失败")
self.is_connected = True
self._device_info.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'fps': self.fps,
'backend': self.cap.getBackendName() if hasattr(self.cap, 'getBackendName') else 'Unknown'
})
self.logger.info("相机初始化成功")
return True
except Exception as e:
self.logger.error(f"相机初始化失败: {e}")
self.is_connected = False
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
return False
def _configure_camera(self):
"""
配置相机参数
"""
if not self.cap:
return
try:
# 设置FOURCC编码
if self.fourcc:
fourcc_code = cv2.VideoWriter_fourcc(*self.fourcc)
self.cap.set(cv2.CAP_PROP_FOURCC, fourcc_code)
# 设置分辨率
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.width)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.height)
# 设置帧率
self.cap.set(cv2.CAP_PROP_FPS, self.fps)
# 设置缓冲区大小(部分后端不生效)
try:
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, self.buffer_size)
except Exception:
pass
# 获取实际设置的值
actual_width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = self.cap.get(cv2.CAP_PROP_FPS)
self.logger.info(f"相机配置 - 分辨率: {actual_width}x{actual_height}, FPS: {actual_fps}")
except Exception as e:
self.logger.warning(f"配置相机参数失败: {e}")
def _test_camera(self) -> bool:
"""
测试相机是否正常工作
Returns:
bool: 测试是否成功
"""
try:
ret, frame = self.cap.read()
if ret and frame is not None:
self.logger.info(f"相机测试成功 - 帧大小: {frame.shape}")
return True
else:
self.logger.error("相机测试失败 - 无法读取帧")
return False
except Exception as e:
self.logger.error(f"相机测试异常: {e}")
return False
def calibrate(self) -> bool:
"""
校准相机(对于普通相机,主要是验证连接和设置)
Returns:
bool: 校准是否成功
"""
try:
self.logger.info("开始相机校准...")
if not self.is_connected:
if not self.initialize():
return False
# 读取几帧来稳定相机
for i in range(5):
ret, _ = self.cap.read()
if not ret:
self.logger.warning(f"校准时读取第{i+1}帧失败")
self.logger.info("相机校准完成")
return True
except Exception as e:
self.logger.error(f"相机校准失败: {e}")
return False
def start_streaming(self) -> bool:
"""
开始数据流推送
Returns:
bool: 启动是否成功
"""
if self.is_streaming:
self.logger.warning("相机流已在运行")
return True
if not self.is_connected:
if not self.initialize():
return False
try:
self.is_streaming = True
self.streaming_thread = threading.Thread(
target=self._streaming_worker,
name=f"Camera-{self.device_index}-Stream",
daemon=True
)
self.streaming_thread.start()
self.logger.info("相机流启动成功")
return True
except Exception as e:
self.logger.error(f"启动相机流失败: {e}")
self.is_streaming = False
return False
def stop_streaming(self) -> bool:
"""
停止数据流推送
Returns:
bool: 停止是否成功
"""
try:
self.is_streaming = False
if self.streaming_thread and self.streaming_thread.is_alive():
# 等待线程退出
self.streaming_thread.join(timeout=3.0)
self.streaming_thread = None
self.logger.info("相机流已停止")
return True
except Exception as e:
self.logger.error(f"停止相机流失败: {e}")
return False
def _streaming_worker(self):
"""
流处理工作线程
"""
self.logger.info("相机流工作线程启动")
reconnect_attempts = 0
# 基于目标FPS的简单节拍器防止无上限地读取/编码/发送导致对象堆积
frame_interval = 1.0 / max(self.fps, 1)
next_tick = time.time()
while self.is_streaming:
loop_start = time.time()
try:
if not self.cap or not self.cap.isOpened():
if reconnect_attempts < self.max_reconnect_attempts:
self.logger.warning(f"相机连接丢失,尝试重连 ({reconnect_attempts + 1}/{self.max_reconnect_attempts})")
if self._reconnect():
reconnect_attempts = 0
continue
else:
reconnect_attempts += 1
time.sleep(self.reconnect_delay)
continue
else:
self.logger.error("相机重连失败次数过多,停止流")
break
ret, frame = self.cap.read()
if not ret or frame is None:
self.dropped_frames += 1
if self.dropped_frames > 10:
self.logger.warning(f"连续丢帧过多: {self.dropped_frames}")
# 仅在异常情况下触发一次GC避免高频强制GC
try:
gc.collect()
except Exception:
pass
self.dropped_frames = 0
# 防止空转占满CPU
time.sleep(0.005)
continue
# 重置丢帧计数
self.dropped_frames = 0
# 保存原始帧到全局缓存(用于录制)
self._save_frame_to_cache(frame, 'camera')
# 处理帧(降采样以优化传输负载)
processed_frame = self._process_frame(frame)
# 缓存帧(不复制,减少内存占用)
self.last_frame = processed_frame
self.frame_cache.append(processed_frame)
# 发送帧数据
self._send_frame_data(processed_frame)
# 更新统计
self._update_statistics()
# 主动释放局部引用帮助GC更快识别可回收对象
del frame
# 注意processed_frame 被 last_frame 和 frame_cache 引用,不可删除其对象本身
# 限速保证不超过目标FPS减小发送端积压
now = time.time()
# 下一个tick基于固定间隔前移避免误差累积
next_tick += frame_interval
sleep_time = next_tick - now
if sleep_time > 0:
time.sleep(sleep_time)
else:
# 如果处理耗时超过间隔,纠正节拍器,避免持续为负
next_tick = now
except Exception as e:
self.logger.error(f"相机流处理异常: {e}")
# 小退避,避免异常情况下空转
time.sleep(0.02)
self.logger.info("相机流工作线程结束")
def _process_frame(self, frame: np.ndarray) -> np.ndarray:
"""
处理视频帧
Args:
frame: 原始帧
Returns:
np.ndarray: 处理后的帧
"""
try:
# 调整大小以优化传输(使用 INTER_AREA 质量好且更省内存/CPU
h, w = frame.shape[:2]
if w > self._tx_max_width:
scale = self._tx_max_width / float(w)
new_w = self._tx_max_width
new_h = int(h * scale)
frame = cv2.resize(frame, (new_w, new_h), interpolation=cv2.INTER_AREA)
return frame
except Exception as e:
self.logger.error(f"处理帧失败: {e}")
return frame
def _send_frame_data(self, frame: np.ndarray):
"""
发送帧数据
Args:
frame: 视频帧
"""
# 将临时对象局部化,并在 finally 中删除引用,加速回收
buffer = None
frame_bytes = None
frame_data = None
try:
# 编码为JPEG
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), 80]
ok, buffer = cv2.imencode('.jpg', frame, encode_param)
if not ok or buffer is None:
self.logger.warning("帧JPEG编码失败")
return
# 转换为bytes再做base64减少中间numpy对象的长生命周期
frame_bytes = buffer.tobytes()
frame_data = base64.b64encode(frame_bytes).decode('utf-8')
# 发送数据
data = {
'timestamp': time.time(),
'frame_count': self.frame_count,
'image': frame_data,
'fps': self.actual_fps,
'device_id': self.device_id
}
self._socketio.emit('camera_frame', data, namespace='/devices')
except Exception as e:
self.logger.error(f"发送帧数据失败: {e}")
finally:
# 显式删除临时大对象的引用,避免在高吞吐下堆积
del buffer
del frame_bytes
del frame_data
def _update_statistics(self):
"""
更新性能统计
"""
self.frame_count += 1
self.fps_counter += 1
# 每秒计算一次实际FPS
current_time = time.time()
if current_time - self.fps_start_time >= 1.0:
self.actual_fps = self.fps_counter / (current_time - self.fps_start_time)
self.fps_counter = 0
self.fps_start_time = current_time
# 更新性能统计
self.performance_stats.update({
'frames_processed': self.frame_count,
'actual_fps': round(self.actual_fps, 2),
'dropped_frames': self.dropped_frames
})
def _reconnect(self) -> bool:
"""
重新连接相机
Returns:
bool: 重连是否成功
"""
try:
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
time.sleep(1.0) # 等待设备释放
return self.initialize()
except Exception as e:
self.logger.error(f"相机重连失败: {e}")
return False
def get_status(self) -> Dict[str, Any]:
"""
获取设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
status = super().get_status()
status.update({
'device_index': self.device_index,
'resolution': f"{self.width}x{self.height}",
'target_fps': self.fps,
'actual_fps': self.actual_fps,
'frame_count': self.frame_count,
'dropped_frames': self.dropped_frames,
'has_frame': self.last_frame is not None
})
return status
def capture_image(self, save_path: Optional[str] = None) -> Optional[np.ndarray]:
"""
捕获单张图像
Args:
save_path: 保存路径(可选)
Returns:
Optional[np.ndarray]: 捕获的图像失败返回None
"""
try:
if not self.is_connected or not self.cap:
self.logger.error("相机未连接")
return None
ret, frame = self.cap.read()
if not ret or frame is None:
self.logger.error("捕获图像失败")
return None
if save_path:
cv2.imwrite(save_path, frame)
self.logger.info(f"图像已保存到: {save_path}")
return frame
except Exception as e:
self.logger.error(f"捕获图像异常: {e}")
return None
def get_latest_frame(self) -> Optional[np.ndarray]:
"""
获取最新帧
Returns:
Optional[np.ndarray]: 最新帧无帧返回None
"""
# 对外提供拷贝,内部保持原对象,避免重复持有
return self.last_frame.copy() if self.last_frame is not None else None
def disconnect(self):
"""
断开相机连接
"""
try:
self.stop_streaming()
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.is_connected = False
self.logger.info("相机已断开连接")
except Exception as e:
self.logger.error(f"断开相机连接失败: {e}")
def cleanup(self):
"""
清理资源
"""
try:
self.stop_streaming()
if self.cap:
try:
self.cap.release()
except Exception:
pass
self.cap = None
self.frame_cache.clear()
self.last_frame = None
# 清理全局帧缓存
with self.frame_cache_lock:
self.global_frame_cache.clear()
super().cleanup()
self.logger.info("相机资源清理完成")
except Exception as e:
self.logger.error(f"清理相机资源失败: {e}")
def _save_frame_to_cache(self, frame, frame_type='camera'):
"""保存帧到全局缓存"""
try:
with self.frame_cache_lock:
current_time = time.time()
# 清理过期帧
self._cleanup_expired_frames()
# 如果缓存已满,移除最旧的帧
if frame_type in self.global_frame_cache and len(self.global_frame_cache[frame_type]) >= self.max_cache_size:
oldest_key = min(self.global_frame_cache[frame_type].keys())
del self.global_frame_cache[frame_type][oldest_key]
# 初始化帧类型缓存
if frame_type not in self.global_frame_cache:
self.global_frame_cache[frame_type] = {}
# 保存帧(深拷贝避免引用问题)
frame_data = {
'frame': frame.copy(),
'timestamp': current_time,
'frame_id': len(self.global_frame_cache[frame_type])
}
self.global_frame_cache[frame_type][current_time] = frame_data
except Exception as e:
self.logger.error(f'保存帧到缓存失败: {e}')
def _get_latest_frame_from_cache(self, frame_type='camera'):
"""从缓存获取最新帧"""
try:
with self.frame_cache_lock:
if frame_type not in self.global_frame_cache:
self.logger.debug(f'缓存中不存在帧类型: {frame_type}, 可用类型: {list(self.global_frame_cache.keys())}')
return None, None
if not self.global_frame_cache[frame_type]:
self.logger.debug(f'帧类型 {frame_type} 的缓存为空')
return None, None
# 清理过期帧
self._cleanup_expired_frames()
if not self.global_frame_cache[frame_type]:
self.logger.debug(f'清理过期帧后,帧类型 {frame_type} 的缓存为空')
return None, None
# 获取最新帧
latest_timestamp = max(self.global_frame_cache[frame_type].keys())
frame_data = self.global_frame_cache[frame_type][latest_timestamp]
return frame_data['frame'].copy(), frame_data['timestamp']
except Exception as e:
self.logger.error(f'从缓存获取帧失败: {e}')
return None, None
def _cleanup_expired_frames(self):
"""清理过期的缓存帧"""
try:
current_time = time.time()
for frame_type in list(self.global_frame_cache.keys()):
expired_keys = []
for timestamp in self.global_frame_cache[frame_type].keys():
if current_time - timestamp > self.cache_timeout:
expired_keys.append(timestamp)
# 删除过期帧
for key in expired_keys:
del self.global_frame_cache[frame_type][key]
except Exception as e:
self.logger.error(f'清理过期帧失败: {e}')