BodyBalanceEvaluation/backend/devices/device_coordinator.py
2025-08-17 16:42:05 +08:00

606 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备协调器
负责统一管理和协调所有设备的生命周期、数据流和状态同步
"""
import threading
import time
import logging
from typing import Dict, List, Optional, Any, Callable
from collections import defaultdict
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from .camera_manager import CameraManager
from .imu_manager import IMUManager
from .pressure_manager import PressureManager
from .femtobolt_manager import FemtoBoltManager
from .utils.config_manager import ConfigManager
from .utils.socket_manager import SocketManager
except ImportError:
from camera_manager import CameraManager
from imu_manager import IMUManager
from pressure_manager import PressureManager
from femtobolt_manager import FemtoBoltManager
from utils.config_manager import ConfigManager
from utils.socket_manager import SocketManager
class DeviceCoordinator:
"""设备协调器 - 统一管理所有设备"""
def __init__(self, socketio, config_path: Optional[str] = None):
"""
初始化设备协调器
Args:
socketio: SocketIO实例
config_path: 配置文件路径
"""
self.socketio = socketio
self.logger = logging.getLogger(self.__class__.__name__)
# 配置管理
self.config_manager = ConfigManager(config_path)
self.socket_manager = SocketManager(socketio)
# 设备管理器
self.devices: Dict[str, Any] = {}
self.device_configs = self.config_manager.get_system_config().get('devices', {})
# 状态管理
self.is_initialized = False
self.is_running = False
self.coordinator_lock = threading.RLock()
# 监控线程
self.monitor_thread = None
self.health_check_interval = 5.0 # 健康检查间隔(秒)
# 事件回调
self.event_callbacks: Dict[str, List[Callable]] = defaultdict(list)
# 性能统计
self.stats = {
'start_time': None,
'total_frames': 0,
'device_errors': defaultdict(int),
'reconnect_attempts': defaultdict(int)
}
# 线程池
self.executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="DeviceCoord")
self.logger.info("设备协调器初始化完成")
def initialize(self) -> bool:
"""
初始化所有设备
Returns:
bool: 初始化是否成功
"""
with self.coordinator_lock:
if self.is_initialized:
self.logger.warning("设备协调器已初始化")
return True
try:
self.logger.info("开始初始化设备协调器...")
# 注册Socket.IO命名空间
self._register_namespaces()
# 初始化设备
if not self._initialize_devices():
raise Exception("设备初始化失败")
# 启动监控线程
self._start_monitor()
self.is_initialized = True
self.stats['start_time'] = time.time()
self.logger.info("设备协调器初始化成功")
self._emit_event('coordinator_initialized', {'devices': list(self.devices.keys())})
return True
except Exception as e:
self.logger.error(f"设备协调器初始化失败: {e}")
self._cleanup_devices()
return False
def _register_namespaces(self):
"""
注册Socket.IO命名空间
"""
namespaces = ['/devices', '/coordinator']
for namespace in namespaces:
self.socket_manager.register_namespace(namespace)
self.logger.info(f"已注册Socket.IO命名空间: {namespaces}")
def _initialize_devices(self) -> bool:
"""
初始化所有设备
Returns:
bool: 初始化是否成功
"""
try:
# 并行初始化设备
futures = []
# 普通相机
if self.device_configs.get('camera', {}).get('enabled', False):
future = self.executor.submit(self._init_camera)
futures.append(('camera', future))
# IMU传感器
if self.device_configs.get('imu', {}).get('enabled', False):
future = self.executor.submit(self._init_imu)
futures.append(('imu', future))
# 压力传感器
if self.device_configs.get('pressure', {}).get('enabled', False):
future = self.executor.submit(self._init_pressure)
futures.append(('pressure', future))
# FemtoBolt深度相机
if self.device_configs.get('femtobolt', {}).get('enabled', False):
future = self.executor.submit(self._init_femtobolt)
futures.append(('femtobolt', future))
# 等待所有设备初始化完成
success_count = 0
for device_name, future in futures:
try:
result = future.result(timeout=30) # 30秒超时
if result:
success_count += 1
self.logger.info(f"{device_name}设备初始化成功")
else:
self.logger.error(f"{device_name}设备初始化失败")
except Exception as e:
self.logger.error(f"{device_name}设备初始化异常: {e}")
# 至少需要一个设备初始化成功
if success_count == 0:
raise Exception("没有设备初始化成功")
self.logger.info(f"设备初始化完成,成功: {success_count}/{len(futures)}")
return True
except Exception as e:
self.logger.error(f"设备初始化失败: {e}")
return False
def _init_camera(self) -> bool:
"""
初始化普通相机
Returns:
bool: 初始化是否成功
"""
try:
camera = CameraManager(self.socketio, self.config_manager)
if camera.initialize():
self.devices['camera'] = camera
return True
return False
except Exception as e:
self.logger.error(f"初始化相机失败: {e}")
return False
def _init_imu(self) -> bool:
"""
初始化IMU传感器
Returns:
bool: 初始化是否成功
"""
try:
imu = IMUManager(self.socketio, self.config_manager)
if imu.initialize():
self.devices['imu'] = imu
return True
return False
except Exception as e:
self.logger.error(f"初始化IMU失败: {e}")
return False
def _init_pressure(self) -> bool:
"""
初始化压力传感器
Returns:
bool: 初始化是否成功
"""
try:
pressure = PressureManager(self.socketio, self.config_manager)
if pressure.initialize():
self.devices['pressure'] = pressure
return True
return False
except Exception as e:
self.logger.error(f"初始化压力传感器失败: {e}")
return False
def _init_femtobolt(self) -> bool:
"""
初始化FemtoBolt深度相机
Returns:
bool: 初始化是否成功
"""
try:
femtobolt = FemtoBoltManager(self.socketio, self.config_manager)
if femtobolt.initialize():
self.devices['femtobolt'] = femtobolt
return True
return False
except Exception as e:
self.logger.error(f"初始化FemtoBolt失败: {e}")
return False
def start_all_streaming(self) -> bool:
"""
启动所有设备的数据流
Returns:
bool: 启动是否成功
"""
with self.coordinator_lock:
if not self.is_initialized:
self.logger.error("设备协调器未初始化")
return False
if self.is_running:
self.logger.warning("设备流已在运行")
return True
try:
self.logger.info("启动所有设备数据流...")
# 并行启动所有设备流
futures = []
for device_name, device in self.devices.items():
future = self.executor.submit(device.start_streaming)
futures.append((device_name, future))
# 等待所有设备启动完成
success_count = 0
for device_name, future in futures:
try:
result = future.result(timeout=10) # 10秒超时
if result:
success_count += 1
self.logger.info(f"{device_name}数据流启动成功")
else:
self.logger.error(f"{device_name}数据流启动失败")
except Exception as e:
self.logger.error(f"{device_name}数据流启动异常: {e}")
self.is_running = success_count > 0
if self.is_running:
self.logger.info(f"设备数据流启动完成,成功: {success_count}/{len(futures)}")
self._emit_event('streaming_started', {'active_devices': success_count})
else:
self.logger.error("没有设备数据流启动成功")
return self.is_running
except Exception as e:
self.logger.error(f"启动设备数据流失败: {e}")
return False
def stop_all_streaming(self) -> bool:
"""
停止所有设备的数据流
Returns:
bool: 停止是否成功
"""
with self.coordinator_lock:
if not self.is_running:
self.logger.warning("设备流未运行")
return True
try:
self.logger.info("停止所有设备数据流...")
# 并行停止所有设备流
futures = []
for device_name, device in self.devices.items():
if hasattr(device, 'stop_streaming'):
future = self.executor.submit(device.stop_streaming)
futures.append((device_name, future))
# 等待所有设备停止完成
for device_name, future in futures:
try:
future.result(timeout=5) # 5秒超时
self.logger.info(f"{device_name}数据流已停止")
except Exception as e:
self.logger.error(f"停止{device_name}数据流异常: {e}")
self.is_running = False
self.logger.info("所有设备数据流已停止")
self._emit_event('streaming_stopped', {})
return True
except Exception as e:
self.logger.error(f"停止设备数据流失败: {e}")
return False
def calibrate_all_devices(self) -> Dict[str, bool]:
"""
校准所有设备
Returns:
Dict[str, bool]: 各设备校准结果
"""
results = {}
try:
self.logger.info("开始校准所有设备...")
# 并行校准所有设备
futures = []
for device_name, device in self.devices.items():
if hasattr(device, 'calibrate'):
future = self.executor.submit(device.calibrate)
futures.append((device_name, future))
# 等待所有设备校准完成
for device_name, future in futures:
try:
result = future.result(timeout=30) # 30秒超时
results[device_name] = result
if result:
self.logger.info(f"{device_name}校准成功")
else:
self.logger.error(f"{device_name}校准失败")
except Exception as e:
self.logger.error(f"{device_name}校准异常: {e}")
results[device_name] = False
success_count = sum(results.values())
self.logger.info(f"设备校准完成,成功: {success_count}/{len(results)}")
self._emit_event('calibration_completed', results)
return results
except Exception as e:
self.logger.error(f"设备校准失败: {e}")
return results
def get_device_status(self, device_name: Optional[str] = None) -> Dict[str, Any]:
"""
获取设备状态
Args:
device_name: 设备名称None表示获取所有设备状态
Returns:
Dict[str, Any]: 设备状态信息
"""
if device_name:
if device_name in self.devices:
return self.devices[device_name].get_status()
else:
return {'error': f'设备 {device_name} 不存在'}
else:
# 获取所有设备状态
status = {
'coordinator': {
'is_initialized': self.is_initialized,
'is_running': self.is_running,
'device_count': len(self.devices),
'uptime': time.time() - self.stats['start_time'] if self.stats['start_time'] else 0
},
'devices': {}
}
for name, device in self.devices.items():
try:
status['devices'][name] = device.get_status()
except Exception as e:
status['devices'][name] = {'error': str(e)}
return status
def get_device(self, device_name: str) -> Optional[Any]:
"""
获取指定设备实例
Args:
device_name: 设备名称
Returns:
Optional[Any]: 设备实例不存在返回None
"""
return self.devices.get(device_name)
def restart_device(self, device_name: str) -> bool:
"""
重启指定设备
Args:
device_name: 设备名称
Returns:
bool: 重启是否成功
"""
if device_name not in self.devices:
self.logger.error(f"设备 {device_name} 不存在")
return False
try:
self.logger.info(f"重启设备: {device_name}")
device = self.devices[device_name]
# 停止数据流
if hasattr(device, 'stop_streaming'):
device.stop_streaming()
# 清理资源
if hasattr(device, 'cleanup'):
device.cleanup()
# 重新初始化
if device.initialize():
self.logger.info(f"设备 {device_name} 重启成功")
self._emit_event('device_restarted', {'device': device_name})
return True
else:
self.logger.error(f"设备 {device_name} 重启失败")
return False
except Exception as e:
self.logger.error(f"重启设备 {device_name} 异常: {e}")
return False
def _start_monitor(self):
"""
启动监控线程
"""
if self.monitor_thread and self.monitor_thread.is_alive():
return
self.monitor_thread = threading.Thread(
target=self._monitor_worker,
name="DeviceMonitor",
daemon=True
)
self.monitor_thread.start()
self.logger.info("设备监控线程已启动")
def _monitor_worker(self):
"""
监控工作线程
"""
self.logger.info("设备监控线程开始运行")
while self.is_initialized:
try:
# 检查设备健康状态
for device_name, device in self.devices.items():
try:
status = device.get_status()
if not status.get('is_connected', False):
self.logger.warning(f"设备 {device_name} 连接丢失")
self.stats['device_errors'][device_name] += 1
# 尝试重连
if self.stats['device_errors'][device_name] <= 3:
self.logger.info(f"尝试重连设备: {device_name}")
if self.restart_device(device_name):
self.stats['device_errors'][device_name] = 0
except Exception as e:
self.logger.error(f"检查设备 {device_name} 状态异常: {e}")
# 发送状态更新
if self.is_running:
status = self.get_device_status()
self.socket_manager.emit_to_namespace(
'/coordinator', 'status_update', status
)
time.sleep(self.health_check_interval)
except Exception as e:
self.logger.error(f"监控线程异常: {e}")
time.sleep(1.0)
self.logger.info("设备监控线程结束")
def register_event_callback(self, event_name: str, callback: Callable):
"""
注册事件回调
Args:
event_name: 事件名称
callback: 回调函数
"""
self.event_callbacks[event_name].append(callback)
def _emit_event(self, event_name: str, data: Dict[str, Any]):
"""
触发事件
Args:
event_name: 事件名称
data: 事件数据
"""
# 调用注册的回调
for callback in self.event_callbacks[event_name]:
try:
callback(data)
except Exception as e:
self.logger.error(f"事件回调异常 {event_name}: {e}")
# 发送到Socket.IO
self.socket_manager.emit_to_namespace(
'/coordinator', event_name, data
)
def _cleanup_devices(self):
"""
清理所有设备
"""
for device_name, device in self.devices.items():
try:
if hasattr(device, 'cleanup'):
device.cleanup()
self.logger.info(f"设备 {device_name} 清理完成")
except Exception as e:
self.logger.error(f"清理设备 {device_name} 失败: {e}")
self.devices.clear()
def shutdown(self):
"""
关闭设备协调器
"""
with self.coordinator_lock:
try:
self.logger.info("关闭设备协调器...")
# 停止数据流
self.stop_all_streaming()
# 停止监控
self.is_initialized = False
if self.monitor_thread and self.monitor_thread.is_alive():
self.monitor_thread.join(timeout=5.0)
# 清理设备
self._cleanup_devices()
# 关闭线程池
self.executor.shutdown(wait=True)
# 清理Socket管理器
self.socket_manager.cleanup()
self.logger.info("设备协调器已关闭")
except Exception as e:
self.logger.error(f"关闭设备协调器失败: {e}")
def __enter__(self):
"""上下文管理器入口"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口"""
self.shutdown()