BodyBalanceEvaluation/backend/test_reconnection.py

98 lines
3.2 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备重连机制测试脚本
测试设备断开后的自动重连功能
"""
import time
import threading
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.femtobolt_manager import FemtoBoltManager
from devices.pressure_manager import PressureManager
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def emit(self, event, data):
print(f"[SocketIO] 发送事件: {event}, 数据: {data}")
def test_device_reconnection(device_manager, device_name):
"""测试设备重连机制"""
print(f"\n=== 测试 {device_name} 重连机制 ===")
# 初始化设备
print(f"1. 初始化 {device_name} 设备...")
success = device_manager.initialize()
print(f" 初始化结果: {'成功' if success else '失败'}")
if success:
print(f" 设备连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 等待一段时间让连接稳定
print("2. 等待连接稳定...")
time.sleep(3)
# 模拟设备断开
print("3. 模拟设备断开连接...")
device_manager.disconnect()
print(f" 断开后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 等待一段时间
print("4. 等待重连机制触发...")
time.sleep(5)
# 尝试重新连接
print("5. 尝试重新连接...")
reconnect_success = device_manager.initialize()
print(f" 重连结果: {'成功' if reconnect_success else '失败'}")
print(f" 重连后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 清理
device_manager.disconnect()
print(f"=== {device_name} 重连测试完成 ===\n")
return success
def main():
"""主测试函数"""
print("开始设备重连机制测试...")
# 创建模拟SocketIO
mock_socketio = MockSocketIO()
# 测试相机重连
print("\n测试相机重连机制...")
camera_manager = CameraManager(mock_socketio)
test_device_reconnection(camera_manager, "相机")
# 测试IMU重连
print("\n测试IMU重连机制...")
imu_manager = IMUManager(mock_socketio)
test_device_reconnection(imu_manager, "IMU")
# 测试FemtoBolt重连
print("\n测试FemtoBolt重连机制...")
femtobolt_manager = FemtoBoltManager(mock_socketio)
test_device_reconnection(femtobolt_manager, "FemtoBolt")
# 测试压力传感器重连
print("\n测试压力传感器重连机制...")
pressure_manager = PressureManager(mock_socketio)
test_device_reconnection(pressure_manager, "压力传感器")
print("\n所有设备重连测试完成!")
print("\n注意事项:")
print("1. 某些设备可能需要物理连接才能成功初始化")
print("2. 重连机制的效果取决于设备的实际可用性")
print("3. 观察日志中的连接监控线程启动和停止信息")
if __name__ == "__main__":
main()