98 lines
3.2 KiB
Python
98 lines
3.2 KiB
Python
|
#!/usr/bin/env python3
|
||
|
# -*- coding: utf-8 -*-
|
||
|
"""
|
||
|
设备重连机制测试脚本
|
||
|
测试设备断开后的自动重连功能
|
||
|
"""
|
||
|
|
||
|
import time
|
||
|
import threading
|
||
|
from devices.camera_manager import CameraManager
|
||
|
from devices.imu_manager import IMUManager
|
||
|
from devices.femtobolt_manager import FemtoBoltManager
|
||
|
from devices.pressure_manager import PressureManager
|
||
|
import logging
|
||
|
|
||
|
# 配置日志
|
||
|
logging.basicConfig(
|
||
|
level=logging.INFO,
|
||
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
|
)
|
||
|
|
||
|
class MockSocketIO:
|
||
|
"""模拟SocketIO用于测试"""
|
||
|
def emit(self, event, data):
|
||
|
print(f"[SocketIO] 发送事件: {event}, 数据: {data}")
|
||
|
|
||
|
def test_device_reconnection(device_manager, device_name):
|
||
|
"""测试设备重连机制"""
|
||
|
print(f"\n=== 测试 {device_name} 重连机制 ===")
|
||
|
|
||
|
# 初始化设备
|
||
|
print(f"1. 初始化 {device_name} 设备...")
|
||
|
success = device_manager.initialize()
|
||
|
print(f" 初始化结果: {'成功' if success else '失败'}")
|
||
|
|
||
|
if success:
|
||
|
print(f" 设备连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
|
||
|
|
||
|
# 等待一段时间让连接稳定
|
||
|
print("2. 等待连接稳定...")
|
||
|
time.sleep(3)
|
||
|
|
||
|
# 模拟设备断开
|
||
|
print("3. 模拟设备断开连接...")
|
||
|
device_manager.disconnect()
|
||
|
print(f" 断开后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
|
||
|
|
||
|
# 等待一段时间
|
||
|
print("4. 等待重连机制触发...")
|
||
|
time.sleep(5)
|
||
|
|
||
|
# 尝试重新连接
|
||
|
print("5. 尝试重新连接...")
|
||
|
reconnect_success = device_manager.initialize()
|
||
|
print(f" 重连结果: {'成功' if reconnect_success else '失败'}")
|
||
|
print(f" 重连后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
|
||
|
|
||
|
# 清理
|
||
|
device_manager.disconnect()
|
||
|
|
||
|
print(f"=== {device_name} 重连测试完成 ===\n")
|
||
|
return success
|
||
|
|
||
|
def main():
|
||
|
"""主测试函数"""
|
||
|
print("开始设备重连机制测试...")
|
||
|
|
||
|
# 创建模拟SocketIO
|
||
|
mock_socketio = MockSocketIO()
|
||
|
|
||
|
# 测试相机重连
|
||
|
print("\n测试相机重连机制...")
|
||
|
camera_manager = CameraManager(mock_socketio)
|
||
|
test_device_reconnection(camera_manager, "相机")
|
||
|
|
||
|
# 测试IMU重连
|
||
|
print("\n测试IMU重连机制...")
|
||
|
imu_manager = IMUManager(mock_socketio)
|
||
|
test_device_reconnection(imu_manager, "IMU")
|
||
|
|
||
|
# 测试FemtoBolt重连
|
||
|
print("\n测试FemtoBolt重连机制...")
|
||
|
femtobolt_manager = FemtoBoltManager(mock_socketio)
|
||
|
test_device_reconnection(femtobolt_manager, "FemtoBolt")
|
||
|
|
||
|
# 测试压力传感器重连
|
||
|
print("\n测试压力传感器重连机制...")
|
||
|
pressure_manager = PressureManager(mock_socketio)
|
||
|
test_device_reconnection(pressure_manager, "压力传感器")
|
||
|
|
||
|
print("\n所有设备重连测试完成!")
|
||
|
print("\n注意事项:")
|
||
|
print("1. 某些设备可能需要物理连接才能成功初始化")
|
||
|
print("2. 重连机制的效果取决于设备的实际可用性")
|
||
|
print("3. 观察日志中的连接监控线程启动和停止信息")
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|