#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 设备抽象基类 定义所有设备管理器的通用接口和基础功能 """ from abc import ABC, abstractmethod from typing import Dict, Any, Optional import threading import logging import time from datetime import datetime try: from .utils.socket_manager import SocketManager except ImportError: from utils.socket_manager import SocketManager class BaseDevice(ABC): """设备抽象基类""" def __init__(self, device_name: str, config: Dict[str, Any]): """ 初始化设备基类 Args: device_name: 设备名称 config: 设备配置字典 """ self.device_name = device_name self.config = config self.is_connected = False self.is_streaming = False self.socket_namespace = f"/{device_name}" self.logger = logging.getLogger(f"device.{device_name}") self._lock = threading.RLock() # 可重入锁 self._streaming_thread = None self._stop_event = threading.Event() self._socketio = None self._last_heartbeat = time.time() # 状态变化回调 self._status_change_callbacks = [] # 设备连接监控 self._connection_monitor_thread = None self._monitor_stop_event = threading.Event() self._connection_check_interval = config.get('connection_check_interval', 5.0) # 默认5秒检查一次 self._connection_timeout = config.get('connection_timeout', 30.0) # 默认30秒超时 # 设备状态信息 self._device_info = { 'name': device_name, 'type': self.__class__.__name__, 'version': '1.0.0', 'initialized_at': None, 'last_error': None } @abstractmethod def initialize(self) -> bool: """ 初始化设备 Returns: bool: 初始化是否成功 """ pass @abstractmethod def calibrate(self) -> Dict[str, Any]: """ 校准设备 Returns: Dict[str, Any]: 校准结果 """ pass @abstractmethod def start_streaming(self, socketio) -> bool: """ 启动数据推流 Args: socketio: SocketIO实例 Returns: bool: 启动是否成功 """ pass @abstractmethod def stop_streaming(self) -> bool: """ 停止数据推流 Returns: bool: 停止是否成功 """ pass @abstractmethod def get_status(self) -> Dict[str, Any]: """ 获取设备状态 Returns: Dict[str, Any]: 设备状态信息 """ pass @abstractmethod def disconnect(self) -> None: """ 断开设备连接 """ pass @abstractmethod def cleanup(self) -> None: """ 清理资源 """ pass @abstractmethod def reload_config(self) -> bool: """ 重新加载设备配置 Returns: bool: 重新加载是否成功 """ pass @abstractmethod def check_hardware_connection(self) -> bool: """ 检查设备硬件连接状态 Returns: bool: 设备是否物理连接 """ pass def set_socketio(self, socketio): """ 设置SocketIO实例 Args: socketio: SocketIO实例 """ self._socketio = socketio def add_status_change_callback(self, callback): """ 添加状态变化回调函数 Args: callback: 回调函数,接收参数 (device_name, is_connected) """ if callback not in self._status_change_callbacks: self._status_change_callbacks.append(callback) def remove_status_change_callback(self, callback): """ 移除状态变化回调函数 Args: callback: 要移除的回调函数 """ if callback in self._status_change_callbacks: self._status_change_callbacks.remove(callback) def _notify_status_change(self, is_connected: bool): """ 通知状态变化 Args: is_connected: 连接状态 """ for callback in self._status_change_callbacks: try: callback(self.device_name, is_connected) except Exception as e: self.logger.error(f"状态变化回调执行失败: {e}") def set_connected(self, is_connected: bool): """ 设置连接状态并触发回调 Args: is_connected: 连接状态 """ old_status = self.is_connected self.is_connected = is_connected # 只有状态真正改变时才触发回调 if old_status != is_connected: self._notify_status_change(is_connected) def emit_data(self, event: str, data: Any, namespace: Optional[str] = None): """ 发送数据到前端 Args: event: 事件名称 data: 数据 namespace: 命名空间,默认使用设备命名空间 """ if self._socketio: ns = namespace or self.socket_namespace try: self._socketio.emit(event, data, namespace=ns) except Exception as e: self.logger.error(f"发送数据失败: {e}") def update_heartbeat(self): """ 更新心跳时间 """ self._last_heartbeat = time.time() def is_alive(self, timeout: float = 30.0) -> bool: """ 检查设备是否存活 Args: timeout: 超时时间(秒) Returns: bool: 设备是否存活 """ return (time.time() - self._last_heartbeat) < timeout def get_device_info(self) -> Dict[str, Any]: """ 获取设备信息 Returns: Dict[str, Any]: 设备信息 """ with self._lock: return self._device_info.copy() def _set_error(self, error_msg: str): """ 设置错误信息 Args: error_msg: 错误消息 """ with self._lock: self._device_info['last_error'] = { 'message': error_msg, 'timestamp': datetime.now().isoformat() } def _clear_error(self): """ 清除错误信息 """ with self._lock: self._device_info['last_error'] = None def _start_connection_monitor(self): """ 启动连接监控线程 """ if self._connection_monitor_thread and self._connection_monitor_thread.is_alive(): return self._monitor_stop_event.clear() self._connection_monitor_thread = threading.Thread( target=self._connection_monitor_worker, name=f"{self.device_name}_connection_monitor", daemon=True ) self._connection_monitor_thread.start() self.logger.info(f"设备 {self.device_name} 连接监控线程已启动") def _stop_connection_monitor(self): """ 停止连接监控线程 """ if self._connection_monitor_thread: self._monitor_stop_event.set() if self._connection_monitor_thread.is_alive(): self._connection_monitor_thread.join(timeout=2.0) self._connection_monitor_thread = None self.logger.info(f"设备 {self.device_name} 连接监控线程已停止") def _connection_monitor_worker(self): """ 连接监控工作线程 """ self.logger.info(f"设备 {self.device_name} 连接监控开始") while not self._monitor_stop_event.is_set(): try: # 检查硬件连接状态 hardware_connected = self.check_hardware_connection() self.logger.info(f"检测到设备 {self.device_name} 硬件连接状态: {hardware_connected} is_connected:{self.is_connected}") # 如果硬件断开但软件状态仍为连接,则更新状态 if not hardware_connected and self.is_connected: self.logger.warning(f"检测到设备 {self.device_name} 硬件连接断开") # 直接更新状态,避免在监控线程中调用set_connected导致死锁 self.is_connected = False self._notify_status_change(False) # 如果硬件重新连接但软件状态仍为断开,则更新状态 elif hardware_connected and not self.is_connected: self.logger.info(f"检测到设备 {self.device_name} 硬件重新连接") # 直接更新状态,避免在监控线程中调用set_connected导致死锁 self.is_connected = True # 重置心跳时间,避免立即触发心跳超时 self.update_heartbeat() self._notify_status_change(True) # 检查心跳超时(仅在当前状态为连接时检查) if self.is_connected and time.time() - self._last_heartbeat > self._connection_timeout: self.logger.warning(f"设备 {self.device_name} 心跳超时,判定为断开连接") # 直接更新状态,避免在监控线程中调用set_connected导致死锁 self.is_connected = False self._notify_status_change(False) except Exception as e: self.logger.error(f"设备 {self.device_name} 连接监控异常: {e}") # 等待下次检查 self._monitor_stop_event.wait(self._connection_check_interval) self.logger.info(f"设备 {self.device_name} 连接监控结束") # 清理线程引用 self._connection_monitor_thread = None def __enter__(self): """ 上下文管理器入口 """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ 上下文管理器出口 """ self.cleanup() def _cleanup_monitoring(self): """ 清理监控线程 """ self._stop_connection_monitor() def __repr__(self): return f"<{self.__class__.__name__}(name='{self.device_name}', connected={self.is_connected}, streaming={self.is_streaming})>"