#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 设备抽象基类 定义所有设备管理器的通用接口和基础功能 """ from abc import ABC, abstractmethod from typing import Dict, Any, Optional import threading import logging import time from datetime import datetime try: from .utils.socket_manager import SocketManager except ImportError: from utils.socket_manager import SocketManager class BaseDevice(ABC): """设备抽象基类""" def __init__(self, device_name: str, config: Dict[str, Any]): """ 初始化设备基类 Args: device_name: 设备名称 config: 设备配置字典 """ self.device_name = device_name self.config = config self.is_connected = False self.is_streaming = False self.socket_namespace = f"/{device_name}" self.logger = logging.getLogger(f"device.{device_name}") self._lock = threading.RLock() # 可重入锁 self._streaming_thread = None self._stop_event = threading.Event() self._socketio = None self._last_heartbeat = time.time() # 状态变化回调 self._status_change_callbacks = [] # 设备状态信息 self._device_info = { 'name': device_name, 'type': self.__class__.__name__, 'version': '1.0.0', 'initialized_at': None, 'last_error': None } # 性能统计 self._stats = { 'frames_processed': 0, 'errors_count': 0, 'start_time': None, 'last_frame_time': None } @abstractmethod def initialize(self) -> bool: """ 初始化设备 Returns: bool: 初始化是否成功 """ pass @abstractmethod def calibrate(self) -> Dict[str, Any]: """ 校准设备 Returns: Dict[str, Any]: 校准结果 """ pass @abstractmethod def start_streaming(self, socketio) -> bool: """ 启动数据推流 Args: socketio: SocketIO实例 Returns: bool: 启动是否成功 """ pass @abstractmethod def stop_streaming(self) -> bool: """ 停止数据推流 Returns: bool: 停止是否成功 """ pass @abstractmethod def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ pass @abstractmethod def disconnect(self) -> None: """ 断开设备连接 """ pass @abstractmethod def cleanup(self) -> None: """ 清理资源 """ pass def set_socketio(self, socketio): """ 设置SocketIO实例 Args: socketio: SocketIO实例 """ self._socketio = socketio def add_status_change_callback(self, callback): """ 添加状态变化回调函数 Args: callback: 回调函数,接收参数 (device_name, is_connected) """ if callback not in self._status_change_callbacks: self._status_change_callbacks.append(callback) def remove_status_change_callback(self, callback): """ 移除状态变化回调函数 Args: callback: 要移除的回调函数 """ if callback in self._status_change_callbacks: self._status_change_callbacks.remove(callback) def _notify_status_change(self, is_connected: bool): """ 通知状态变化 Args: is_connected: 连接状态 """ for callback in self._status_change_callbacks: try: callback(self.device_name, is_connected) except Exception as e: self.logger.error(f"状态变化回调执行失败: {e}") def set_connected(self, is_connected: bool): """ 设置连接状态并触发回调 Args: is_connected: 连接状态 """ old_status = self.is_connected self.is_connected = is_connected # 只有状态真正改变时才触发回调 if old_status != is_connected: self._notify_status_change(is_connected) def emit_data(self, event: str, data: Any, namespace: Optional[str] = None): """ 发送数据到前端 Args: event: 事件名称 data: 数据 namespace: 命名空间,默认使用设备命名空间 """ if self._socketio: ns = namespace or self.socket_namespace try: self._socketio.emit(event, data, namespace=ns) except Exception as e: self.logger.error(f"发送数据失败: {e}") def update_heartbeat(self): """ 更新心跳时间 """ self._last_heartbeat = time.time() def is_alive(self, timeout: float = 30.0) -> bool: """ 检查设备是否存活 Args: timeout: 超时时间(秒) Returns: bool: 设备是否存活 """ return (time.time() - self._last_heartbeat) < timeout def get_device_info(self) -> Dict[str, Any]: """ 获取设备信息 Returns: Dict[str, Any]: 设备信息 """ with self._lock: return self._device_info.copy() def get_stats(self) -> Dict[str, Any]: """ 获取性能统计 Returns: Dict[str, Any]: 性能统计信息 """ with self._lock: stats = self._stats.copy() if stats['start_time']: stats['uptime'] = time.time() - stats['start_time'] if stats['frames_processed'] > 0 and stats['uptime'] > 0: stats['fps'] = stats['frames_processed'] / stats['uptime'] else: stats['fps'] = 0 return stats def _update_stats(self, frame_processed: bool = True, error: bool = False): """ 更新统计信息 Args: frame_processed: 是否处理了一帧 error: 是否发生错误 """ with self._lock: if frame_processed: self._stats['frames_processed'] += 1 self._stats['last_frame_time'] = time.time() if error: self._stats['errors_count'] += 1 def _set_error(self, error_msg: str): """ 设置错误信息 Args: error_msg: 错误消息 """ with self._lock: self._device_info['last_error'] = { 'message': error_msg, 'timestamp': datetime.now().isoformat() } def _clear_error(self): """ 清除错误信息 """ with self._lock: self._device_info['last_error'] = None def _start_stats_tracking(self): """ 开始统计跟踪 """ with self._lock: self._stats['start_time'] = time.time() self._stats['frames_processed'] = 0 self._stats['errors_count'] = 0 def _stop_stats_tracking(self): """ 停止统计跟踪 """ with self._lock: self._stats['start_time'] = None def __enter__(self): """ 上下文管理器入口 """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ 上下文管理器出口 """ self.cleanup() def __repr__(self): return f"<{self.__class__.__name__}(name='{self.device_name}', connected={self.is_connected}, streaming={self.is_streaming})>"