BodyBalanceEvaluation/backend/check_monitor_status.py

186 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查设备连接监控线程状态的测试脚本
"""
import sys
import os
import threading
import time
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.utils.config_manager import ConfigManager
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.pressure_manager import PressureManager
from devices.femtobolt_manager import FemtoBoltManager
class MockCameraManager(CameraManager):
"""模拟摄像头管理器,用于测试监控线程"""
def __init__(self, socketio, config_manager):
super().__init__(socketio, config_manager)
self._mock_hardware_connected = True
def check_hardware_connection(self) -> bool:
"""模拟硬件连接检查"""
return self._mock_hardware_connected
def set_mock_hardware_status(self, connected: bool):
"""设置模拟硬件连接状态"""
self._mock_hardware_connected = connected
def check_device_monitor_status(device_manager, device_name: str):
"""
检查单个设备的监控线程状态
Args:
device_manager: 设备管理器实例
device_name: 设备名称
"""
print(f"\n=== {device_name.upper()} 设备监控状态检查 ===")
# 检查基本状态
print(f"设备连接状态: {device_manager.is_connected}")
print(f"设备流状态: {device_manager.is_streaming}")
# 检查监控线程相关属性
if hasattr(device_manager, '_connection_monitor_thread'):
monitor_thread = device_manager._connection_monitor_thread
print(f"监控线程对象: {monitor_thread}")
if monitor_thread:
print(f"监控线程存活状态: {monitor_thread.is_alive()}")
print(f"监控线程名称: {monitor_thread.name}")
print(f"监控线程守护状态: {monitor_thread.daemon}")
else:
print("监控线程对象: None")
else:
print("设备管理器没有监控线程属性")
# 检查监控停止事件
if hasattr(device_manager, '_monitor_stop_event'):
stop_event = device_manager._monitor_stop_event
print(f"监控停止事件: {stop_event}")
print(f"监控停止事件状态: {stop_event.is_set()}")
else:
print("设备管理器没有监控停止事件属性")
# 检查监控配置
if hasattr(device_manager, '_connection_check_interval'):
print(f"连接检查间隔: {device_manager._connection_check_interval}")
if hasattr(device_manager, '_connection_timeout'):
print(f"连接超时时间: {device_manager._connection_timeout}")
# 检查心跳时间
if hasattr(device_manager, '_last_heartbeat'):
last_heartbeat = device_manager._last_heartbeat
current_time = time.time()
heartbeat_age = current_time - last_heartbeat
print(f"上次心跳时间: {time.ctime(last_heartbeat)}")
print(f"心跳间隔: {heartbeat_age:.2f}秒前")
def main():
"""
主函数:检查所有设备的监控状态
"""
print("设备连接监控状态检查工具")
print("=" * 50)
try:
# 初始化配置管理器
config_manager = ConfigManager()
# 创建模拟设备管理器实例
mock_camera = MockCameraManager(None, config_manager)
print("\n=== 初始状态检查 ===")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
# 测试连接监控启动
print("\n=== 测试连接监控启动 ===")
print("\n1. 设置模拟硬件为连接状态...")
mock_camera.set_mock_hardware_status(True)
print("\n2. 设置设备为连接状态...")
mock_camera.set_connected(True)
time.sleep(0.5) # 等待线程启动
print("\n连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 (启动监控后) ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
print("\n3. 等待3秒观察监控线程工作...")
time.sleep(3)
print("\n监控运行中的状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n4. 模拟硬件断开...")
mock_camera.set_mock_hardware_status(False)
time.sleep(6) # 等待监控检测到断开检查间隔是5秒
print("\n硬件断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n5. 重新连接测试...")
mock_camera.set_mock_hardware_status(True)
mock_camera.set_connected(True)
time.sleep(0.5)
print("\n重新连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n6. 手动断开连接...")
mock_camera.set_connected(False)
time.sleep(0.5)
print("\n手动断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 最终系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
except Exception as e:
print(f"检查过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理资源
print("\n=== 清理资源 ===")
try:
if 'mock_camera' in locals():
mock_camera.cleanup()
print("mock_camera 设备资源已清理")
except Exception as e:
print(f"清理资源时发生错误: {e}")
if __name__ == '__main__':
main()