212 lines
7.6 KiB
Python
212 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
完整的相机断开连接测试脚本
|
||
模拟主程序的完整流程
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
import time
|
||
import threading
|
||
import logging
|
||
from datetime import datetime
|
||
from devices.camera_manager import CameraManager
|
||
from devices.utils.config_manager import ConfigManager
|
||
|
||
# 配置日志
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class MockSocketIO:
|
||
"""模拟SocketIO用于测试"""
|
||
|
||
def __init__(self):
|
||
self.events = []
|
||
self.lock = threading.Lock()
|
||
|
||
def emit(self, event, data, namespace=None):
|
||
"""记录发送的事件"""
|
||
with self.lock:
|
||
self.events.append({
|
||
'event': event,
|
||
'data': data,
|
||
'namespace': namespace,
|
||
'timestamp': time.time()
|
||
})
|
||
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
|
||
|
||
def get_events(self):
|
||
"""获取所有事件"""
|
||
with self.lock:
|
||
return self.events.copy()
|
||
|
||
def clear_events(self):
|
||
"""清空事件记录"""
|
||
with self.lock:
|
||
self.events.clear()
|
||
|
||
class MockAppServer:
|
||
"""模拟主程序服务器"""
|
||
|
||
def __init__(self):
|
||
self.socketio = MockSocketIO()
|
||
self.logger = logger
|
||
self.device_managers = {}
|
||
|
||
def broadcast_device_status(self, device_name: str, is_connected: bool):
|
||
"""广播单个设备状态"""
|
||
if self.socketio:
|
||
try:
|
||
status_data = {
|
||
'device_type': device_name,
|
||
'status': is_connected,
|
||
'timestamp': datetime.now().isoformat()
|
||
}
|
||
self.socketio.emit('device_status', status_data, namespace='/devices')
|
||
self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}')
|
||
except Exception as e:
|
||
self.logger.error(f'广播设备状态失败: {e}')
|
||
|
||
def _on_device_status_change(self, device_name: str, is_connected: bool):
|
||
"""设备状态变化回调函数"""
|
||
self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}')
|
||
self.broadcast_device_status(device_name, is_connected)
|
||
|
||
def test_camera_disconnect_with_socket():
|
||
"""测试相机断开连接和Socket通知"""
|
||
logger.info("="*60)
|
||
logger.info("开始测试相机断开连接和Socket通知功能")
|
||
logger.info("="*60)
|
||
|
||
# 创建模拟服务器
|
||
app_server = MockAppServer()
|
||
|
||
try:
|
||
# 创建配置管理器
|
||
config_manager = ConfigManager()
|
||
|
||
# 创建相机管理器
|
||
camera_manager = CameraManager(app_server.socketio, config_manager)
|
||
app_server.device_managers['camera'] = camera_manager
|
||
|
||
# 添加状态变化回调(模拟主程序的回调注册)
|
||
camera_manager.add_status_change_callback(app_server._on_device_status_change)
|
||
|
||
# 1. 测试初始化
|
||
logger.info("\n步骤1: 初始化相机设备")
|
||
if camera_manager.initialize():
|
||
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
|
||
else:
|
||
logger.warning("✗ 相机初始化失败")
|
||
return False
|
||
|
||
# 清空初始化时的事件
|
||
app_server.socketio.clear_events()
|
||
|
||
# 2. 启动数据流(可选)
|
||
logger.info("\n步骤2: 启动相机数据流")
|
||
try:
|
||
if camera_manager.start_streaming(app_server.socketio):
|
||
logger.info("✓ 相机数据流启动成功")
|
||
time.sleep(2) # 让数据流稳定
|
||
else:
|
||
logger.warning("✗ 相机数据流启动失败")
|
||
except Exception as e:
|
||
logger.warning(f"数据流启动异常: {e}")
|
||
|
||
# 3. 监控连接状态变化
|
||
logger.info("\n步骤3: 监控连接状态变化 (30秒)")
|
||
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
|
||
|
||
start_time = time.time()
|
||
last_status = camera_manager.is_connected
|
||
disconnect_detected = False
|
||
|
||
while time.time() - start_time < 30:
|
||
current_status = camera_manager.is_connected
|
||
|
||
if current_status != last_status:
|
||
elapsed_time = time.time() - start_time
|
||
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {elapsed_time:.1f}秒)")
|
||
last_status = current_status
|
||
|
||
if not current_status:
|
||
logger.info("✓ 成功检测到相机断开!")
|
||
disconnect_detected = True
|
||
time.sleep(2) # 等待事件处理完成
|
||
break
|
||
|
||
time.sleep(0.5)
|
||
|
||
# 4. 检查Socket事件
|
||
logger.info("\n步骤4: 检查Socket事件")
|
||
events = app_server.socketio.get_events()
|
||
|
||
if events:
|
||
logger.info(f"✓ 共记录到 {len(events)} 个Socket事件:")
|
||
disconnect_events = 0
|
||
for i, event in enumerate(events, 1):
|
||
logger.info(f" {i}. 事件: {event['event']}, 数据: {event['data']}, 命名空间: {event['namespace']}")
|
||
if event['event'] == 'device_status' and event['data'].get('status') == False:
|
||
disconnect_events += 1
|
||
|
||
if disconnect_events > 0:
|
||
logger.info(f"✓ 检测到 {disconnect_events} 个设备断开事件")
|
||
else:
|
||
logger.warning("✗ 未检测到设备断开事件")
|
||
else:
|
||
logger.warning("✗ 未记录到任何Socket事件")
|
||
|
||
# 5. 测试结果总结
|
||
logger.info("\n步骤5: 测试结果总结")
|
||
|
||
if disconnect_detected:
|
||
logger.info("✓ 硬件断开检测: 成功")
|
||
else:
|
||
logger.warning("✗ 硬件断开检测: 失败 (30秒内未检测到断开)")
|
||
|
||
if events and any(e['event'] == 'device_status' and e['data'].get('status') == False for e in events):
|
||
logger.info("✓ Socket断开通知: 成功")
|
||
else:
|
||
logger.warning("✗ Socket断开通知: 失败")
|
||
|
||
return disconnect_detected and len(events) > 0
|
||
|
||
except Exception as e:
|
||
logger.error(f"测试过程中发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
finally:
|
||
# 清理资源
|
||
try:
|
||
if 'camera_manager' in locals():
|
||
camera_manager.stop_streaming()
|
||
camera_manager.disconnect()
|
||
logger.info("测试资源清理完成")
|
||
except Exception as e:
|
||
logger.error(f"清理资源时发生异常: {e}")
|
||
|
||
def main():
|
||
"""主函数"""
|
||
logger.info("相机断开连接完整测试脚本")
|
||
logger.info("此脚本将模拟主程序的完整流程,测试相机USB拔出时的断连检测和Socket通知功能")
|
||
|
||
success = test_camera_disconnect_with_socket()
|
||
|
||
logger.info("\n" + "="*60)
|
||
if success:
|
||
logger.info("测试完成: 相机断开连接检测和Socket通知功能正常")
|
||
else:
|
||
logger.info("测试完成: 相机断开连接检测和Socket通知功能存在问题")
|
||
logger.info("="*60)
|
||
|
||
if __name__ == "__main__":
|
||
main() |