408 lines
12 KiB
Python
408 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
设备抽象基类
|
|
定义所有设备管理器的通用接口和基础功能
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import Dict, Any, Optional
|
|
import threading
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from .utils.socket_manager import SocketManager
|
|
except ImportError:
|
|
from utils.socket_manager import SocketManager
|
|
|
|
|
|
class BaseDevice(ABC):
|
|
"""设备抽象基类"""
|
|
|
|
def __init__(self, device_name: str, config: Dict[str, Any]):
|
|
"""
|
|
初始化设备基类
|
|
|
|
Args:
|
|
device_name: 设备名称
|
|
config: 设备配置字典
|
|
"""
|
|
self.device_name = device_name
|
|
self.config = config
|
|
self.is_connected = False
|
|
self.is_streaming = False
|
|
self.socket_namespace = f"/{device_name}"
|
|
self.logger = logging.getLogger(f"device.{device_name}")
|
|
self._lock = threading.RLock() # 可重入锁
|
|
self._streaming_thread = None
|
|
self._stop_event = threading.Event()
|
|
self._socketio = None
|
|
self._last_heartbeat = time.time()
|
|
|
|
# 状态变化回调
|
|
self._status_change_callbacks = []
|
|
|
|
# 设备连接监控
|
|
self._connection_monitor_thread = None
|
|
self._monitor_stop_event = threading.Event()
|
|
self._connection_check_interval = config.get('connection_check_interval', 5.0) # 默认5秒检查一次
|
|
self._connection_timeout = config.get('connection_timeout', 30.0) # 默认30秒超时
|
|
|
|
# 设备状态信息
|
|
self._device_info = {
|
|
'name': device_name,
|
|
'type': self.__class__.__name__,
|
|
'version': '1.0.0',
|
|
'initialized_at': None,
|
|
'last_error': None
|
|
}
|
|
|
|
# 性能统计
|
|
self._stats = {
|
|
'frames_processed': 0,
|
|
'errors_count': 0,
|
|
'start_time': None,
|
|
'last_frame_time': None
|
|
}
|
|
|
|
@abstractmethod
|
|
def initialize(self) -> bool:
|
|
"""
|
|
初始化设备
|
|
|
|
Returns:
|
|
bool: 初始化是否成功
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def calibrate(self) -> Dict[str, Any]:
|
|
"""
|
|
校准设备
|
|
|
|
Returns:
|
|
Dict[str, Any]: 校准结果
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def start_streaming(self, socketio) -> bool:
|
|
"""
|
|
启动数据推流
|
|
|
|
Args:
|
|
socketio: SocketIO实例
|
|
|
|
Returns:
|
|
bool: 启动是否成功
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def stop_streaming(self) -> bool:
|
|
"""
|
|
停止数据推流
|
|
|
|
Returns:
|
|
bool: 停止是否成功
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_status(self) -> Dict[str, Any]:
|
|
"""
|
|
获取设备状态
|
|
|
|
Returns:
|
|
Dict[str, Any]: 设备状态信息
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def disconnect(self) -> None:
|
|
"""
|
|
断开设备连接
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def cleanup(self) -> None:
|
|
"""
|
|
清理资源
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def reload_config(self) -> bool:
|
|
"""
|
|
重新加载设备配置
|
|
|
|
Returns:
|
|
bool: 重新加载是否成功
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def check_hardware_connection(self) -> bool:
|
|
"""
|
|
检查设备硬件连接状态
|
|
|
|
Returns:
|
|
bool: 设备是否物理连接
|
|
"""
|
|
pass
|
|
|
|
def set_socketio(self, socketio):
|
|
"""
|
|
设置SocketIO实例
|
|
|
|
Args:
|
|
socketio: SocketIO实例
|
|
"""
|
|
self._socketio = socketio
|
|
|
|
def add_status_change_callback(self, callback):
|
|
"""
|
|
添加状态变化回调函数
|
|
|
|
Args:
|
|
callback: 回调函数,接收参数 (device_name, is_connected)
|
|
"""
|
|
if callback not in self._status_change_callbacks:
|
|
self._status_change_callbacks.append(callback)
|
|
|
|
def remove_status_change_callback(self, callback):
|
|
"""
|
|
移除状态变化回调函数
|
|
|
|
Args:
|
|
callback: 要移除的回调函数
|
|
"""
|
|
if callback in self._status_change_callbacks:
|
|
self._status_change_callbacks.remove(callback)
|
|
|
|
def _notify_status_change(self, is_connected: bool):
|
|
"""
|
|
通知状态变化
|
|
|
|
Args:
|
|
is_connected: 连接状态
|
|
"""
|
|
for callback in self._status_change_callbacks:
|
|
try:
|
|
callback(self.device_name, is_connected)
|
|
except Exception as e:
|
|
self.logger.error(f"状态变化回调执行失败: {e}")
|
|
|
|
def set_connected(self, is_connected: bool):
|
|
"""
|
|
设置连接状态并触发回调
|
|
|
|
Args:
|
|
is_connected: 连接状态
|
|
"""
|
|
old_status = self.is_connected
|
|
self.is_connected = is_connected
|
|
|
|
# 只有状态真正改变时才触发回调
|
|
if old_status != is_connected:
|
|
self._notify_status_change(is_connected)
|
|
|
|
# 启动或停止连接监控
|
|
if is_connected and not self._connection_monitor_thread:
|
|
self._start_connection_monitor()
|
|
elif not is_connected and self._connection_monitor_thread:
|
|
self._stop_connection_monitor()
|
|
|
|
def emit_data(self, event: str, data: Any, namespace: Optional[str] = None):
|
|
"""
|
|
发送数据到前端
|
|
|
|
Args:
|
|
event: 事件名称
|
|
data: 数据
|
|
namespace: 命名空间,默认使用设备命名空间
|
|
"""
|
|
if self._socketio:
|
|
ns = namespace or self.socket_namespace
|
|
try:
|
|
self._socketio.emit(event, data, namespace=ns)
|
|
except Exception as e:
|
|
self.logger.error(f"发送数据失败: {e}")
|
|
|
|
def update_heartbeat(self):
|
|
"""
|
|
更新心跳时间
|
|
"""
|
|
self._last_heartbeat = time.time()
|
|
|
|
def is_alive(self, timeout: float = 30.0) -> bool:
|
|
"""
|
|
检查设备是否存活
|
|
|
|
Args:
|
|
timeout: 超时时间(秒)
|
|
|
|
Returns:
|
|
bool: 设备是否存活
|
|
"""
|
|
return (time.time() - self._last_heartbeat) < timeout
|
|
|
|
def get_device_info(self) -> Dict[str, Any]:
|
|
"""
|
|
获取设备信息
|
|
|
|
Returns:
|
|
Dict[str, Any]: 设备信息
|
|
"""
|
|
with self._lock:
|
|
return self._device_info.copy()
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""
|
|
获取性能统计
|
|
|
|
Returns:
|
|
Dict[str, Any]: 性能统计信息
|
|
"""
|
|
with self._lock:
|
|
stats = self._stats.copy()
|
|
if stats['start_time']:
|
|
stats['uptime'] = time.time() - stats['start_time']
|
|
if stats['frames_processed'] > 0 and stats['uptime'] > 0:
|
|
stats['fps'] = stats['frames_processed'] / stats['uptime']
|
|
else:
|
|
stats['fps'] = 0
|
|
return stats
|
|
|
|
def _update_stats(self, frame_processed: bool = True, error: bool = False):
|
|
"""
|
|
更新统计信息
|
|
|
|
Args:
|
|
frame_processed: 是否处理了一帧
|
|
error: 是否发生错误
|
|
"""
|
|
with self._lock:
|
|
if frame_processed:
|
|
self._stats['frames_processed'] += 1
|
|
self._stats['last_frame_time'] = time.time()
|
|
if error:
|
|
self._stats['errors_count'] += 1
|
|
|
|
def _set_error(self, error_msg: str):
|
|
"""
|
|
设置错误信息
|
|
|
|
Args:
|
|
error_msg: 错误消息
|
|
"""
|
|
with self._lock:
|
|
self._device_info['last_error'] = {
|
|
'message': error_msg,
|
|
'timestamp': datetime.now().isoformat()
|
|
}
|
|
|
|
def _clear_error(self):
|
|
"""
|
|
清除错误信息
|
|
"""
|
|
with self._lock:
|
|
self._device_info['last_error'] = None
|
|
|
|
def _start_stats_tracking(self):
|
|
"""
|
|
开始统计跟踪
|
|
"""
|
|
with self._lock:
|
|
self._stats['start_time'] = time.time()
|
|
self._stats['frames_processed'] = 0
|
|
self._stats['errors_count'] = 0
|
|
|
|
def _stop_stats_tracking(self):
|
|
"""
|
|
停止统计跟踪
|
|
"""
|
|
with self._lock:
|
|
self._stats['start_time'] = None
|
|
|
|
def _start_connection_monitor(self):
|
|
"""
|
|
启动连接监控线程
|
|
"""
|
|
if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
|
|
return
|
|
|
|
self._monitor_stop_event.clear()
|
|
self._connection_monitor_thread = threading.Thread(
|
|
target=self._connection_monitor_worker,
|
|
name=f"{self.device_name}_connection_monitor",
|
|
daemon=True
|
|
)
|
|
self._connection_monitor_thread.start()
|
|
self.logger.info(f"设备 {self.device_name} 连接监控线程已启动")
|
|
|
|
def _stop_connection_monitor(self):
|
|
"""
|
|
停止连接监控线程
|
|
"""
|
|
if self._connection_monitor_thread:
|
|
self._monitor_stop_event.set()
|
|
if self._connection_monitor_thread.is_alive():
|
|
self._connection_monitor_thread.join(timeout=2.0)
|
|
self._connection_monitor_thread = None
|
|
self.logger.info(f"设备 {self.device_name} 连接监控线程已停止")
|
|
|
|
def _connection_monitor_worker(self):
|
|
"""
|
|
连接监控工作线程
|
|
"""
|
|
self.logger.info(f"设备 {self.device_name} 连接监控开始")
|
|
|
|
while not self._monitor_stop_event.is_set():
|
|
try:
|
|
# 检查硬件连接状态
|
|
hardware_connected = self.check_hardware_connection()
|
|
|
|
# 如果硬件断开但软件状态仍为连接,则更新状态
|
|
if not hardware_connected and self.is_connected:
|
|
self.logger.warning(f"检测到设备 {self.device_name} 硬件连接断开")
|
|
self.set_connected(False)
|
|
break # 硬件断开后停止监控
|
|
|
|
# 检查心跳超时
|
|
if self.is_connected and time.time() - self._last_heartbeat > self._connection_timeout:
|
|
self.logger.warning(f"设备 {self.device_name} 心跳超时,判定为断开连接")
|
|
self.set_connected(False)
|
|
break # 超时后停止监控
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"设备 {self.device_name} 连接监控异常: {e}")
|
|
|
|
# 等待下次检查
|
|
self._monitor_stop_event.wait(self._connection_check_interval)
|
|
|
|
self.logger.info(f"设备 {self.device_name} 连接监控结束")
|
|
|
|
def __enter__(self):
|
|
"""
|
|
上下文管理器入口
|
|
"""
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""
|
|
上下文管理器出口
|
|
"""
|
|
self.cleanup()
|
|
|
|
def _cleanup_monitoring(self):
|
|
"""
|
|
清理监控线程
|
|
"""
|
|
self._stop_connection_monitor()
|
|
|
|
def __repr__(self):
|
|
return f"<{self.__class__.__name__}(name='{self.device_name}', connected={self.is_connected}, streaming={self.is_streaming})>" |