BodyBalanceEvaluation/backend/test_camera_simple.py

148 lines
4.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化的相机断开连接测试脚本
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import time
import threading
import logging
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def test_camera_connection():
"""测试相机连接和断开检测"""
logger.info("="*60)
logger.info("开始测试相机连接和断开检测功能")
logger.info("="*60)
# 创建模拟SocketIO
mock_socketio = MockSocketIO()
try:
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(mock_socketio, config_manager)
# 添加状态变化回调
def status_callback(device_name, is_connected):
logger.info(f"状态回调: {device_name} -> {'连接' if is_connected else '断开'}")
camera_manager.add_status_change_callback(status_callback)
# 1. 测试初始化
logger.info("\n步骤1: 初始化相机设备")
if camera_manager.initialize():
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
else:
logger.warning("✗ 相机初始化失败")
return False
# 2. 测试硬件连接检查
logger.info("\n步骤2: 测试硬件连接检查")
hardware_connected = camera_manager.check_hardware_connection()
logger.info(f"硬件连接状态: {hardware_connected}")
# 3. 启动连接监控
logger.info("\n步骤3: 启动连接监控")
camera_manager._start_connection_monitor()
logger.info("连接监控已启动")
# 4. 监控连接状态变化
logger.info("\n步骤4: 监控连接状态 (30秒)")
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
start_time = time.time()
last_status = camera_manager.is_connected
while time.time() - start_time < 30:
current_status = camera_manager.is_connected
if current_status != last_status:
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {time.time() - start_time:.1f}秒)")
last_status = current_status
if not current_status:
logger.info("✓ 成功检测到相机断开!")
break
time.sleep(1)
# 5. 检查Socket事件
logger.info("\n步骤5: 检查Socket事件")
events = mock_socketio.get_events()
if events:
logger.info(f"共记录到 {len(events)} 个Socket事件:")
for i, event in enumerate(events, 1):
logger.info(f" {i}. {event['event']} -> {event['data']}")
else:
logger.info("未记录到Socket事件")
return True
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
return False
finally:
# 清理资源
try:
if 'camera_manager' in locals():
camera_manager._stop_connection_monitor()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""主函数"""
logger.info("相机断开连接测试脚本")
success = test_camera_connection()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测功能测试完成")
else:
logger.info("测试完成: 相机断开连接检测功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()