BodyBalanceEvaluation/backend/test_camera_full.py

212 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的相机断开连接测试脚本
模拟主程序的完整流程
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import time
import threading
import logging
from datetime import datetime
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def clear_events(self):
"""清空事件记录"""
with self.lock:
self.events.clear()
class MockAppServer:
"""模拟主程序服务器"""
def __init__(self):
self.socketio = MockSocketIO()
self.logger = logger
self.device_managers = {}
def broadcast_device_status(self, device_name: str, is_connected: bool):
"""广播单个设备状态"""
if self.socketio:
try:
status_data = {
'device_type': device_name,
'status': is_connected,
'timestamp': datetime.now().isoformat()
}
self.socketio.emit('device_status', status_data, namespace='/devices')
self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}')
except Exception as e:
self.logger.error(f'广播设备状态失败: {e}')
def _on_device_status_change(self, device_name: str, is_connected: bool):
"""设备状态变化回调函数"""
self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}')
self.broadcast_device_status(device_name, is_connected)
def test_camera_disconnect_with_socket():
"""测试相机断开连接和Socket通知"""
logger.info("="*60)
logger.info("开始测试相机断开连接和Socket通知功能")
logger.info("="*60)
# 创建模拟服务器
app_server = MockAppServer()
try:
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(app_server.socketio, config_manager)
app_server.device_managers['camera'] = camera_manager
# 添加状态变化回调(模拟主程序的回调注册)
camera_manager.add_status_change_callback(app_server._on_device_status_change)
# 1. 测试初始化
logger.info("\n步骤1: 初始化相机设备")
if camera_manager.initialize():
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
else:
logger.warning("✗ 相机初始化失败")
return False
# 清空初始化时的事件
app_server.socketio.clear_events()
# 2. 启动数据流(可选)
logger.info("\n步骤2: 启动相机数据流")
try:
if camera_manager.start_streaming(app_server.socketio):
logger.info("✓ 相机数据流启动成功")
time.sleep(2) # 让数据流稳定
else:
logger.warning("✗ 相机数据流启动失败")
except Exception as e:
logger.warning(f"数据流启动异常: {e}")
# 3. 监控连接状态变化
logger.info("\n步骤3: 监控连接状态变化 (30秒)")
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
start_time = time.time()
last_status = camera_manager.is_connected
disconnect_detected = False
while time.time() - start_time < 30:
current_status = camera_manager.is_connected
if current_status != last_status:
elapsed_time = time.time() - start_time
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {elapsed_time:.1f}秒)")
last_status = current_status
if not current_status:
logger.info("✓ 成功检测到相机断开!")
disconnect_detected = True
time.sleep(2) # 等待事件处理完成
break
time.sleep(0.5)
# 4. 检查Socket事件
logger.info("\n步骤4: 检查Socket事件")
events = app_server.socketio.get_events()
if events:
logger.info(f"✓ 共记录到 {len(events)} 个Socket事件:")
disconnect_events = 0
for i, event in enumerate(events, 1):
logger.info(f" {i}. 事件: {event['event']}, 数据: {event['data']}, 命名空间: {event['namespace']}")
if event['event'] == 'device_status' and event['data'].get('status') == False:
disconnect_events += 1
if disconnect_events > 0:
logger.info(f"✓ 检测到 {disconnect_events} 个设备断开事件")
else:
logger.warning("✗ 未检测到设备断开事件")
else:
logger.warning("✗ 未记录到任何Socket事件")
# 5. 测试结果总结
logger.info("\n步骤5: 测试结果总结")
if disconnect_detected:
logger.info("✓ 硬件断开检测: 成功")
else:
logger.warning("✗ 硬件断开检测: 失败 (30秒内未检测到断开)")
if events and any(e['event'] == 'device_status' and e['data'].get('status') == False for e in events):
logger.info("✓ Socket断开通知: 成功")
else:
logger.warning("✗ Socket断开通知: 失败")
return disconnect_detected and len(events) > 0
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理资源
try:
if 'camera_manager' in locals():
camera_manager.stop_streaming()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""主函数"""
logger.info("相机断开连接完整测试脚本")
logger.info("此脚本将模拟主程序的完整流程测试相机USB拔出时的断连检测和Socket通知功能")
success = test_camera_disconnect_with_socket()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测和Socket通知功能正常")
else:
logger.info("测试完成: 相机断开连接检测和Socket通知功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()