BodyBalanceEvaluation/backend/check_monitor_status.py

186 lines
6.5 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查设备连接监控线程状态的测试脚本
"""
import sys
import os
import threading
import time
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.utils.config_manager import ConfigManager
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.pressure_manager import PressureManager
from devices.femtobolt_manager import FemtoBoltManager
class MockCameraManager(CameraManager):
"""模拟摄像头管理器,用于测试监控线程"""
def __init__(self, socketio, config_manager):
super().__init__(socketio, config_manager)
self._mock_hardware_connected = True
def check_hardware_connection(self) -> bool:
"""模拟硬件连接检查"""
return self._mock_hardware_connected
def set_mock_hardware_status(self, connected: bool):
"""设置模拟硬件连接状态"""
self._mock_hardware_connected = connected
def check_device_monitor_status(device_manager, device_name: str):
"""
检查单个设备的监控线程状态
Args:
device_manager: 设备管理器实例
device_name: 设备名称
"""
print(f"\n=== {device_name.upper()} 设备监控状态检查 ===")
# 检查基本状态
print(f"设备连接状态: {device_manager.is_connected}")
print(f"设备流状态: {device_manager.is_streaming}")
# 检查监控线程相关属性
if hasattr(device_manager, '_connection_monitor_thread'):
monitor_thread = device_manager._connection_monitor_thread
print(f"监控线程对象: {monitor_thread}")
if monitor_thread:
print(f"监控线程存活状态: {monitor_thread.is_alive()}")
print(f"监控线程名称: {monitor_thread.name}")
print(f"监控线程守护状态: {monitor_thread.daemon}")
else:
print("监控线程对象: None")
else:
print("设备管理器没有监控线程属性")
# 检查监控停止事件
if hasattr(device_manager, '_monitor_stop_event'):
stop_event = device_manager._monitor_stop_event
print(f"监控停止事件: {stop_event}")
print(f"监控停止事件状态: {stop_event.is_set()}")
else:
print("设备管理器没有监控停止事件属性")
# 检查监控配置
if hasattr(device_manager, '_connection_check_interval'):
print(f"连接检查间隔: {device_manager._connection_check_interval}")
if hasattr(device_manager, '_connection_timeout'):
print(f"连接超时时间: {device_manager._connection_timeout}")
# 检查心跳时间
if hasattr(device_manager, '_last_heartbeat'):
last_heartbeat = device_manager._last_heartbeat
current_time = time.time()
heartbeat_age = current_time - last_heartbeat
print(f"上次心跳时间: {time.ctime(last_heartbeat)}")
print(f"心跳间隔: {heartbeat_age:.2f}秒前")
def main():
"""
主函数检查所有设备的监控状态
"""
print("设备连接监控状态检查工具")
print("=" * 50)
try:
# 初始化配置管理器
config_manager = ConfigManager()
# 创建模拟设备管理器实例
mock_camera = MockCameraManager(None, config_manager)
print("\n=== 初始状态检查 ===")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
# 测试连接监控启动
print("\n=== 测试连接监控启动 ===")
print("\n1. 设置模拟硬件为连接状态...")
mock_camera.set_mock_hardware_status(True)
print("\n2. 设置设备为连接状态...")
mock_camera.set_connected(True)
time.sleep(0.5) # 等待线程启动
print("\n连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 (启动监控后) ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
print("\n3. 等待3秒观察监控线程工作...")
time.sleep(3)
print("\n监控运行中的状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n4. 模拟硬件断开...")
mock_camera.set_mock_hardware_status(False)
time.sleep(6) # 等待监控检测到断开检查间隔是5秒
print("\n硬件断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n5. 重新连接测试...")
mock_camera.set_mock_hardware_status(True)
mock_camera.set_connected(True)
time.sleep(0.5)
print("\n重新连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n6. 手动断开连接...")
mock_camera.set_connected(False)
time.sleep(0.5)
print("\n手动断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 最终系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
except Exception as e:
print(f"检查过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理资源
print("\n=== 清理资源 ===")
try:
if 'mock_camera' in locals():
mock_camera.cleanup()
print("mock_camera 设备资源已清理")
except Exception as e:
print(f"清理资源时发生错误: {e}")
if __name__ == '__main__':
main()