BodyBalanceEvaluation/backend/test_camera_full.py

212 lines
7.6 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的相机断开连接测试脚本
模拟主程序的完整流程
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import time
import threading
import logging
from datetime import datetime
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def clear_events(self):
"""清空事件记录"""
with self.lock:
self.events.clear()
class MockAppServer:
"""模拟主程序服务器"""
def __init__(self):
self.socketio = MockSocketIO()
self.logger = logger
self.device_managers = {}
def broadcast_device_status(self, device_name: str, is_connected: bool):
"""广播单个设备状态"""
if self.socketio:
try:
status_data = {
'device_type': device_name,
'status': is_connected,
'timestamp': datetime.now().isoformat()
}
self.socketio.emit('device_status', status_data, namespace='/devices')
self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}')
except Exception as e:
self.logger.error(f'广播设备状态失败: {e}')
def _on_device_status_change(self, device_name: str, is_connected: bool):
"""设备状态变化回调函数"""
self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}')
self.broadcast_device_status(device_name, is_connected)
def test_camera_disconnect_with_socket():
"""测试相机断开连接和Socket通知"""
logger.info("="*60)
logger.info("开始测试相机断开连接和Socket通知功能")
logger.info("="*60)
# 创建模拟服务器
app_server = MockAppServer()
try:
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(app_server.socketio, config_manager)
app_server.device_managers['camera'] = camera_manager
# 添加状态变化回调(模拟主程序的回调注册)
camera_manager.add_status_change_callback(app_server._on_device_status_change)
# 1. 测试初始化
logger.info("\n步骤1: 初始化相机设备")
if camera_manager.initialize():
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
else:
logger.warning("✗ 相机初始化失败")
return False
# 清空初始化时的事件
app_server.socketio.clear_events()
# 2. 启动数据流(可选)
logger.info("\n步骤2: 启动相机数据流")
try:
if camera_manager.start_streaming(app_server.socketio):
logger.info("✓ 相机数据流启动成功")
time.sleep(2) # 让数据流稳定
else:
logger.warning("✗ 相机数据流启动失败")
except Exception as e:
logger.warning(f"数据流启动异常: {e}")
# 3. 监控连接状态变化
logger.info("\n步骤3: 监控连接状态变化 (30秒)")
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
start_time = time.time()
last_status = camera_manager.is_connected
disconnect_detected = False
while time.time() - start_time < 30:
current_status = camera_manager.is_connected
if current_status != last_status:
elapsed_time = time.time() - start_time
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {elapsed_time:.1f}秒)")
last_status = current_status
if not current_status:
logger.info("✓ 成功检测到相机断开!")
disconnect_detected = True
time.sleep(2) # 等待事件处理完成
break
time.sleep(0.5)
# 4. 检查Socket事件
logger.info("\n步骤4: 检查Socket事件")
events = app_server.socketio.get_events()
if events:
logger.info(f"✓ 共记录到 {len(events)} 个Socket事件:")
disconnect_events = 0
for i, event in enumerate(events, 1):
logger.info(f" {i}. 事件: {event['event']}, 数据: {event['data']}, 命名空间: {event['namespace']}")
if event['event'] == 'device_status' and event['data'].get('status') == False:
disconnect_events += 1
if disconnect_events > 0:
logger.info(f"✓ 检测到 {disconnect_events} 个设备断开事件")
else:
logger.warning("✗ 未检测到设备断开事件")
else:
logger.warning("✗ 未记录到任何Socket事件")
# 5. 测试结果总结
logger.info("\n步骤5: 测试结果总结")
if disconnect_detected:
logger.info("✓ 硬件断开检测: 成功")
else:
logger.warning("✗ 硬件断开检测: 失败 (30秒内未检测到断开)")
if events and any(e['event'] == 'device_status' and e['data'].get('status') == False for e in events):
logger.info("✓ Socket断开通知: 成功")
else:
logger.warning("✗ Socket断开通知: 失败")
return disconnect_detected and len(events) > 0
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理资源
try:
if 'camera_manager' in locals():
camera_manager.stop_streaming()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""主函数"""
logger.info("相机断开连接完整测试脚本")
logger.info("此脚本将模拟主程序的完整流程测试相机USB拔出时的断连检测和Socket通知功能")
success = test_camera_disconnect_with_socket()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测和Socket通知功能正常")
else:
logger.info("测试完成: 相机断开连接检测和Socket通知功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()