BodyBalanceEvaluation/backend/devices/device_coordinator.py

667 lines
24 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备协调器
负责统一管理和协调所有设备的生命周期数据流和状态同步
"""
import threading
import time
import logging
from typing import Dict, List, Optional, Any, Callable
from collections import defaultdict
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from .camera_manager import CameraManager
from .imu_manager import IMUManager
from .pressure_manager import PressureManager
from .femtobolt_manager import FemtoBoltManager
from .utils.config_manager import ConfigManager
from .utils.socket_manager import SocketManager
except ImportError:
from camera_manager import CameraManager
from imu_manager import IMUManager
from pressure_manager import PressureManager
from femtobolt_manager import FemtoBoltManager
from utils.config_manager import ConfigManager
from utils.socket_manager import SocketManager
class DeviceCoordinator:
"""设备协调器 - 统一管理所有设备"""
def __init__(self, socketio, config_path: Optional[str] = None):
"""
初始化设备协调器
Args:
socketio: SocketIO实例
config_path: 配置文件路径
"""
self.socketio = socketio
self.logger = logging.getLogger(self.__class__.__name__)
# 配置管理
self.config_manager = ConfigManager(config_path)
self.socket_manager = SocketManager(socketio)
# 设备管理器
self.devices: Dict[str, Any] = {}
# 获取设备配置
self.device_configs = self.config_manager.get_all_device_configs()
# 状态管理
self.is_initialized = False
self.is_running = False
self.coordinator_lock = threading.RLock()
# 监控线程
self.monitor_thread = None
self.health_check_interval = 5.0 # 健康检查间隔(秒)
# 事件回调
self.event_callbacks: Dict[str, List[Callable]] = defaultdict(list)
# 性能统计
self.stats = {
'start_time': None,
'total_frames': 0,
'device_errors': defaultdict(int),
'reconnect_attempts': defaultdict(int)
}
# 线程池
self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="DeviceCoord")
self.logger.info("设备协调器初始化完成")
def initialize(self) -> bool:
"""
初始化所有设备
Returns:
bool: 初始化是否成功
"""
with self.coordinator_lock:
if self.is_initialized:
self.logger.warning("设备协调器已初始化")
return True
try:
self.logger.info("开始初始化设备协调器...")
# 注册Socket.IO命名空间
self._register_namespaces()
# 初始化设备
if not self._initialize_devices():
self.logger.warning("设备初始化失败,将以降级模式继续运行")
# 启动监控线程
self._start_monitor()
self.is_initialized = True
self.stats['start_time'] = time.time()
self.logger.info("设备协调器初始化成功")
self._emit_event('coordinator_initialized', {'devices': list(self.devices.keys())})
return True
except Exception as e:
self.logger.error(f"设备协调器初始化失败: {e}")
self._cleanup_devices()
return False
def _register_namespaces(self):
"""
注册Socket.IO命名空间
"""
namespace_mappings = {
'/devices': 'devices',
'/coordinator': 'coordinator'
}
for namespace, device_name in namespace_mappings.items():
self.socket_manager.register_namespace(namespace, device_name)
self.logger.info(f"已注册Socket.IO命名空间: {list(namespace_mappings.keys())}")
def _initialize_devices(self) -> bool:
"""
初始化所有设备
Returns:
bool: 初始化是否成功
"""
try:
# 并行初始化设备
futures = []
# FemtoBolt深度相机
if self.device_configs.get('femtobolt', {}).get('enabled', False):
future = self.executor.submit(self._init_femtobolt)
futures.append(('femtobolt', future))
# 普通相机
if self.device_configs.get('camera', {}).get('enabled', False):
future = self.executor.submit(self._init_camera)
futures.append(('camera', future))
# IMU传感器
if self.device_configs.get('imu', {}).get('enabled', False):
future = self.executor.submit(self._init_imu)
futures.append(('imu', future))
# 压力传感器
if self.device_configs.get('pressure', {}).get('enabled', False):
future = self.executor.submit(self._init_pressure)
futures.append(('pressure', future))
# 等待所有设备初始化完成
success_count = 0
for device_name, future in futures:
try:
result = future.result(timeout=30) # 30秒超时
if result:
success_count += 1
self.logger.info(f"{device_name}设备初始化成功")
else:
self.logger.error(f"{device_name}设备初始化失败")
except Exception as e:
self.logger.error(f"{device_name}设备初始化异常: {e}")
# 至少需要一个设备初始化成功
if success_count == 0:
self.logger.warning("没有设备初始化成功,但系统将继续运行")
return False
self.logger.info(f"设备初始化完成,成功: {success_count}/{len(futures)}")
return True
except Exception as e:
self.logger.error(f"设备初始化失败: {e}")
return False
def _init_camera(self) -> bool:
"""
初始化普通相机
Returns:
bool: 初始化是否成功
"""
try:
camera = CameraManager(self.socketio, self.config_manager)
self.devices['camera'] = camera
if camera.initialize():
return True
return False
except Exception as e:
self.logger.error(f"初始化相机失败: {e}")
return False
def _init_imu(self) -> bool:
"""
初始化IMU传感器
Returns:
bool: 初始化是否成功
"""
try:
imu = IMUManager(self.socketio, self.config_manager)
self.devices['imu'] = imu
if imu.initialize():
return True
return False
except Exception as e:
self.logger.error(f"初始化IMU失败: {e}")
return False
def _init_pressure(self) -> bool:
"""
初始化压力传感器
Returns:
bool: 初始化是否成功
"""
try:
pressure = PressureManager(self.socketio, self.config_manager)
self.devices['pressure'] = pressure
if pressure.initialize():
return True
return False
except Exception as e:
self.logger.error(f"初始化压力传感器失败: {e}")
return False
def _init_femtobolt(self) -> bool:
"""
初始化FemtoBolt深度相机
Returns:
bool: 初始化是否成功
"""
try:
femtobolt = FemtoBoltManager(self.socketio, self.config_manager)
self.devices['femtobolt'] = femtobolt
if femtobolt.initialize():
return True
return False
except Exception as e:
self.logger.error(f"初始化FemtoBolt失败: {e}")
return False
def start_all_streaming(self) -> bool:
"""
启动所有设备的数据流
Returns:
bool: 启动是否成功
"""
with self.coordinator_lock:
if not self.is_initialized:
self.logger.error("设备协调器未初始化")
return False
if self.is_running:
self.logger.warning("设备流已在运行")
return True
try:
self.logger.info("启动所有设备数据流...")
# 并行启动所有设备流
futures = []
for device_name, device in self.devices.items():
future = self.executor.submit(device.start_streaming)
futures.append((device_name, future))
# 等待所有设备启动完成
success_count = 0
for device_name, future in futures:
try:
result = future.result(timeout=10) # 10秒超时
if result:
success_count += 1
self.logger.info(f"{device_name}数据流启动成功")
else:
self.logger.error(f"{device_name}数据流启动失败")
except Exception as e:
self.logger.error(f"{device_name}数据流启动异常: {e}")
self.is_running = success_count > 0
if self.is_running:
self.logger.info(f"设备数据流启动完成,成功: {success_count}/{len(futures)}")
self._emit_event('streaming_started', {'active_devices': success_count})
else:
self.logger.error("没有设备数据流启动成功")
return self.is_running
except Exception as e:
self.logger.error(f"启动设备数据流失败: {e}")
return False
def stop_all_streaming(self) -> bool:
"""
停止所有设备的数据流
Returns:
bool: 停止是否成功
"""
with self.coordinator_lock:
if not self.is_running:
self.logger.warning("设备流未运行")
return True
try:
self.logger.info("停止所有设备数据流...")
# 并行停止所有设备流
futures = []
for device_name, device in self.devices.items():
if hasattr(device, 'stop_streaming'):
future = self.executor.submit(device.stop_streaming)
futures.append((device_name, future))
# 等待所有设备停止完成
for device_name, future in futures:
try:
future.result(timeout=5) # 5秒超时
self.logger.info(f"{device_name}数据流已停止")
except Exception as e:
self.logger.error(f"停止{device_name}数据流异常: {e}")
self.is_running = False
self.logger.info("所有设备数据流已停止")
self._emit_event('streaming_stopped', {})
return True
except Exception as e:
self.logger.error(f"停止设备数据流失败: {e}")
return False
def get_device_status(self, device_name: Optional[str] = None) -> Dict[str, Any]:
"""
获取设备状态
Args:
device_name: 设备名称None表示获取所有设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
if device_name:
if device_name in self.devices:
return self.devices[device_name].get_status()
else:
return {'error': f'设备 {device_name} 不存在'}
else:
# 获取所有设备状态
status = {
'coordinator': {
'is_initialized': self.is_initialized,
'is_running': self.is_running,
'device_count': len(self.devices),
'uptime': time.time() - self.stats['start_time'] if self.stats['start_time'] else 0
},
'devices': {}
}
for name, device in self.devices.items():
try:
status['devices'][name] = device.get_status()
except Exception as e:
status['devices'][name] = {'error': str(e)}
return status
def get_device(self, device_name: str) -> Optional[Any]:
"""
获取指定设备实例
Args:
device_name: 设备名称
Returns:
Optional[Any]: 设备实例不存在返回None
"""
return self.devices.get(device_name)
def get_device_managers(self) -> Dict[str, Any]:
"""
获取所有设备管理器实例
Returns:
Dict[str, Any]: 设备管理器字典
"""
return self.devices.copy()
def start_all_connection_monitor(self) -> bool:
"""
启动所有设备的连接监控
Returns:
bool: 启动是否成功
"""
with self.coordinator_lock:
if not self.is_initialized:
self.logger.error("设备协调器未初始化")
return False
try:
self.logger.info("启动所有设备连接监控...")
success_count = 0
for device_name, device in self.devices.items():
try:
# 对深度相机(femtobolt)和普通相机(camera)直接调用初始化和启动推流
if device_name in ['femtobolt', 'camera']:
continue
if hasattr(device, '_start_connection_monitor'):
device._start_connection_monitor()
success_count += 1
self.logger.info(f"{device_name}设备连接监控已启动")
else:
self.logger.warning(f"{device_name}设备不支持连接监控")
except Exception as e:
self.logger.error(f"启动{device_name}设备失败: {e}")
self.logger.info(f"设备连接监控启动完成,成功: {success_count}/{len(self.devices)}")
return success_count > 0
except Exception as e:
self.logger.error(f"启动设备连接监控失败: {e}")
return False
def stop_all_connection_monitor(self) -> bool:
"""
停止所有设备的连接监控
Returns:
bool: 停止是否成功
"""
with self.coordinator_lock:
try:
self.logger.info("停止所有设备连接监控...")
success_count = 0
for device_name, device in self.devices.items():
try:
# 对深度相机(femtobolt)和普通相机(camera)直接调用停止推流
if device_name in ['femtobolt', 'camera']:
self.logger.info(f"停止{device_name}设备推流")
# # 调用设备的cleanup方法清理资源,停止推流
# if hasattr(device, 'cleanup'):
# if device.cleanup():
# success_count += 1
# self.logger.info(f"{device_name}设备推流已停止")
# else:
# self.logger.warning(f"{device_name}设备推流停止失败")
# else:
# self.logger.warning(f"{device_name}设备不支持推流停止")
continue
if hasattr(device, '_stop_connection_monitor'):
device._stop_connection_monitor()
success_count += 1
self.logger.info(f"{device_name}设备连接监控已停止")
else:
self.logger.warning(f"{device_name}设备不支持连接监控")
except Exception as e:
self.logger.error(f"停止{device_name}设备失败: {e}")
self.logger.info(f"设备连接监控停止完成,成功: {success_count}/{len(self.devices)}")
return True
except Exception as e:
self.logger.error(f"停止设备连接监控失败: {e}")
return False
def restart_device(self, device_name: str) -> bool:
"""
重启指定设备
Args:
device_name: 设备名称
Returns:
bool: 重启是否成功
"""
if device_name not in self.devices:
self.logger.error(f"设备 {device_name} 不存在")
return False
try:
self.logger.info(f"重启设备: {device_name}")
device = self.devices[device_name]
# 停止数据流
if hasattr(device, 'stop_streaming'):
device.stop_streaming()
# 清理资源
if hasattr(device, 'cleanup'):
device.cleanup()
# 重新初始化
if device.initialize():
self.logger.info(f"设备 {device_name} 重启成功")
self._emit_event('device_restarted', {'device': device_name})
return True
else:
self.logger.error(f"设备 {device_name} 重启失败")
return False
except Exception as e:
self.logger.error(f"重启设备 {device_name} 异常: {e}")
return False
def _start_monitor(self):
"""
启动监控线程
"""
if self.monitor_thread and self.monitor_thread.is_alive():
return
self.monitor_thread = threading.Thread(
target=self._monitor_worker,
name="DeviceMonitor",
daemon=True
)
self.monitor_thread.start()
self.logger.info("设备监控线程已启动")
def _monitor_worker(self):
"""
监控工作线程
"""
self.logger.info("设备监控线程开始运行")
while self.is_initialized:
try:
# 检查设备健康状态
for device_name, device in self.devices.items():
try:
status = device.get_status()
if not status.get('is_connected', False):
self.logger.warning(f"设备 {device_name} 连接丢失")
self.stats['device_errors'][device_name] += 1
# 尝试重连
if self.stats['device_errors'][device_name] <= 3:
self.logger.info(f"尝试重连设备: {device_name}")
if self.restart_device(device_name):
self.stats['device_errors'][device_name] = 0
except Exception as e:
self.logger.error(f"检查设备 {device_name} 状态异常: {e}")
# 发送状态更新
if self.is_running:
status = self.get_device_status()
self.socket_manager.emit_to_namespace(
'/coordinator', 'status_update', status
)
time.sleep(self.health_check_interval)
except Exception as e:
self.logger.error(f"监控线程异常: {e}")
time.sleep(1.0)
self.logger.info("设备监控线程结束")
def register_event_callback(self, event_name: str, callback: Callable):
"""
注册事件回调
Args:
event_name: 事件名称
callback: 回调函数
"""
self.event_callbacks[event_name].append(callback)
def _emit_event(self, event_name: str, data: Dict[str, Any]):
"""
触发事件
Args:
event_name: 事件名称
data: 事件数据
"""
# 调用注册的回调
for callback in self.event_callbacks[event_name]:
try:
callback(data)
except Exception as e:
self.logger.error(f"事件回调异常 {event_name}: {e}")
# 发送到Socket.IO
self.socket_manager.emit_to_namespace(
'/coordinator', event_name, data
)
def _cleanup_devices(self):
"""
清理所有设备
"""
for device_name, device in self.devices.items():
try:
if hasattr(device, 'cleanup'):
device.cleanup()
self.logger.info(f"设备 {device_name} 清理完成")
except Exception as e:
self.logger.error(f"清理设备 {device_name} 失败: {e}")
self.devices.clear()
def shutdown(self):
"""
关闭设备协调器
"""
with self.coordinator_lock:
try:
self.logger.info("关闭设备协调器...")
# 停止数据流
self.stop_all_streaming()
# 停止监控
self.is_initialized = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5.0)
# 清理设备
self._cleanup_devices()
# 关闭线程池
self.executor.shutdown(wait=True)
# 清理Socket管理器
self.socket_manager.cleanup()
self.logger.info("设备协调器已关闭")
except Exception as e:
self.logger.error(f"关闭设备协调器失败: {e}")
def __enter__(self):
"""上下文管理器入口"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.shutdown()