系统清理

This commit is contained in:
root 2025-10-28 11:41:02 +08:00
parent 566c199413
commit a948501aaf
23 changed files with 138 additions and 8100 deletions

517
README.md
View File

@ -1,458 +1,129 @@
# 身体平衡评估系统
一个基于多传感器融合技术的专业身体平衡评估与分析系统,为用户提供准确的平衡能力评估和康复指导
一个基于多传感器融合的身体平衡评估与分析系统,提供姿态检测、视频录制、数据采集与评估报告等功能
## 系统特性
### 🎯 核心功能
- **实时姿态检测**: 基于MediaPipe的高精度人体姿态识别
- **多传感器融合**: 整合摄像头、IMU传感器和压力传感器数据
- **智能分析引擎**: 多维度平衡能力评估算法
- **实时视频流**: WebSocket实时视频传输和显示
- **检测数据采集**: 一键采集当前检测状态的数据快照
- **视频录制功能**: 支持检测过程的视频录制和回放
- **可视化报告**: 直观的数据图表和分析报告
- **历史数据管理**: 完整的检测记录存储和对比分析
- 实时姿态检测与数据采集
- 多设备管理摄像头、IMU、压力传感器
- 视频录制与文件存储规范化
- 本地数据库管理与历史记录
- 静态文件HTTP访问映射
- 开发与打包环境兼容
- 数据安全:本地存储,保护用户隐私
### 🔧 技术特点
- **现代化架构**: Vue 3 + Python Flask 前后端分离
- **实时通信**: WebSocket 实时数据传输和视频流
- **多媒体支持**: 集成视频录制、截图和数据采集功能
- **跨平台支持**: Windows、macOS、Linux
- **模块化设计**: 清晰的目录结构,易于扩展和维护
- **数据安全**: 本地存储,保护用户隐私
- **开发友好**: 独立的前后端开发环境,支持热重载
- **部署简化**: 一键安装和启动脚本,降低部署复杂度
## 系统架构
## 目录结构(当前)
```
身体平衡评估系统/
├── backend/ # 后端服务
│ ├── main.py # 主启动脚本
│ ├── app.py # Flask 主应用
│ ├── database.py # 数据库管理
│ ├── device_manager.py # 设备管理
│ ├── detection_engine.py # 检测引擎
│ ├── data_processor.py # 数据处理
│ ├── utils.py # 工具函数
│ └── requirements.txt # Python 依赖
├── frontend/ # 前端应用
│ └── src/renderer/ # 前端源码
│ ├── src/
│ │ ├── views/ # 页面组件
│ │ ├── stores/ # 状态管理
│ │ ├── services/ # API 服务
│ │ └── router/ # 路由配置
│ ├── package.json # Node.js 依赖
│ └── vite.config.js # 构建配置
├── data/ # 数据目录
├── logs/ # 日志目录
├── venv/ # Python 虚拟环境
├── install.bat # 安装脚本
├── start_dev.bat # 开发模式启动脚本
└── start_prod.bat # 生产模式启动脚本
BodyBalanceEvaluation/
├── backend/
│ ├── main.py # AppServer 后端入口(推荐)
│ ├── app.py # 备用后端入口(历史版本)
│ ├── database.py # 数据库管理
│ ├── device_manager.py # 设备统一管理(旧版)
│ ├── check_monitor_status.py # 设备连接状态检查脚本
│ ├── build_app.py # 打包相关脚本
│ ├── config.ini # 后端配置文件(可选)
│ ├── data/ # 默认数据目录(开发环境)
│ ├── devices/
│ │ ├── camera_manager.py
│ │ ├── screen_recorder.py
│ │ ├── imu_manager.py
│ │ ├── pressure_manager.py
│ │ ├── femtobolt_manager.py
│ │ ├── device_coordinator.py
│ │ └── utils/
│ │ └── config_manager.py # 配置管理器(设备侧)
│ ├── requirements.txt # 运行依赖
│ └── requirements_build.txt # 打包依赖
├── frontend/
│ └── src/renderer/ # Electron + 前端资源
├── config.ini # 顶层配置(可选,优先就近)
├── data/
│ └── patients/ # 示例数据目录
└── README.md
```
## 快速开始
## 安装与运行
### 环境要求
### 后端(开发)
- **Python**: 3.8 或更高版本
- **Node.js**: 16.0 或更高版本 (开发模式)
- **操作系统**: Windows 10/11, macOS 10.15+, Ubuntu 18.04+
- 创建并激活虚拟环境
- `python -m venv venv`
- `venv\Scripts\activate`Windows
- 安装依赖
- `pip install -r backend/requirements.txt`
- 启动后端服务(推荐入口)
- `python backend/main.py --host 0.0.0.0 --port 5000 --debug`
### 安装步骤
说明:`main.py` 使用 `AppServer` 类,启动时会初始化 `ConfigManager`、数据库管理器、设备协调器及相关路由。
#### 方式一:一键安装 (Windows 推荐)
1. **克隆项目**
```bash
git clone <repository-url>
cd BodyBalanceEvaluation
```
### 前端与打包
2. **运行安装脚本**
```bash
install.bat
```
安装脚本会自动完成:
- 检查 Python 和 Node.js 环境
- 创建 Python 虚拟环境
- 安装后端依赖
- 安装前端依赖
- 创建必要的目录结构
- Electron 在 `frontend/src/renderer/main/main.js` 中启动本地静态服务器,并在打包环境下拉起后端可执行文件:
- `resources/backend/BodyBalanceBackend/BodyBalanceBackend.exe`
- 前端调用后端 API 与静态文件路由相互独立。
#### 方式二:手动安装
## 配置说明
1. **克隆项目**
```bash
git clone <repository-url>
cd BodyBalanceEvaluation
```
系统配置由设备侧 `ConfigManager` 统一读取(`backend/devices/utils/config_manager.py`)。常用节:
2. **创建虚拟环境**
```bash
python -m venv venv
venv\Scripts\activate # Windows
# source venv/bin/activate # macOS/Linux
```
- `[FILEPATH]`
- `path`:文件存储根目录。支持绝对路径或相对路径。
- 绝对路径示例:`D:/BodyCheck/file`
- 相对路径示例:`data` 或 `../data`(相对 `main.py` 所在目录或打包 `exe` 同级目录)
- `[DATABASE]`
- `path`:数据库文件路径(支持相对/绝对)。
3. **安装 Python 依赖**
```bash
pip install -r backend/requirements.txt
```
示例(`backend/config.ini` 或项目根 `config.ini`
4. **安装前端依赖** (开发模式)
```bash
cd frontend/src/renderer
npm install
cd ../../..
```
```
[FILEPATH]
path = D:/BodyCheck/file
### 启动应用程序
**Windows 用户 (推荐)**:
```bash
# 一键安装 (首次使用)
install.bat
# 开发模式
start_dev.bat
# 生产模式
start_prod.bat
```
**手动启动**:
```bash
# 激活虚拟环境
venv\Scripts\activate
# 进入后端目录
cd backend
# 开发模式
python main.py --mode development
# 生产模式
python main.py --mode production
```
### 命令行参数
```bash
cd backend
python main.py [选项]
选项:
--mode {development,production} 运行模式 (默认: development)
--host HOST 服务器主机 (默认: 127.0.0.1)
--port PORT 服务器端口 (默认: 5000)
--no-browser 不自动打开浏览器
--log-level {DEBUG,INFO,WARNING,ERROR} 日志级别 (默认: INFO)
[DATABASE]
path = data/body_balance.db
```
## 使用指南
## 静态文件访问
### 1. 系统设置
- 后端提供静态文件映射路由(`backend/main.py`
- `GET /<path:filename>`
- 访问路径会在服务端拼接至 `[FILEPATH].path` 指定的存储根目录下。
- 例如:配置 `path = D:/BodyCheck/file`,则访问 `http://host:port/202508190001/20251014111329/video_111331358/screen.mp4`
会映射到 `D:/BodyCheck/file/202508190001/20251014111329/video_111331358/screen.mp4`
- 安全校验:
- 路径规范化与越界拦截(拒绝 `..`、绝对路径、UNC 路径)。
- 仅允许访问 `[FILEPATH].path` 根目录内的资源。
- 视频类文件设置正确的 `Content-Type``Accept-Ranges`(流式播放友好)。
首次使用前,请进入「系统设置」页面配置:
注:如果需要固定前缀路由(如 `/data/<path:filename>`),可以在 `main.py` 中将路由前缀改为 `/data`,逻辑保持不变。
- **设备配置**: 选择摄像头、配置串口设备
- **检测参数**: 设置默认检测时长、采样频率等
- **数据管理**: 配置数据存储路径和清理策略
## 录制与数据存储
### 2. 患者管理
- `screen_recorder.py` 在类初始化时注入 `ConfigManager`,并在录制/采集流程中统一从 `[FILEPATH].path` 读取文件根目录。
- 目录规范建议:`<root>/<patient_id>/<session_id>/<timestamp>`,业务层可按需要扩展子目录与文件命名。
- 当配置为相对路径时,开发环境相对 `backend/` 目录,打包环境相对 `exe` 同级目录。
- 添加患者基本信息(姓名、年龄、性别等)
- 记录患者病史和康复目标
- 管理患者档案和检测记录
## 主要 API摘录
### 3. 姿态检测
- `GET /health`:健康检查
- `POST /api/detection/start`:开始检测(创建会话)
- `POST /api/detection/<session_id>/stop`:停止检测并保存录制内容
- `GET /<path:filename>`:静态文件映射至 `[FILEPATH].path`
1. **选择患者**: 从患者列表中选择或新建患者
2. **设备准备**: 确保摄像头和传感器正常连接
3. **开始检测**: 点击开始按钮进行实时检测
4. **实时监控**: 通过实时视频流观察检测过程
5. **数据采集**: 在检测过程中点击"检测数据采集"按钮获取当前状态数据
6. **视频录制**: 使用"开始录制"/"停止录制"按钮记录检测过程
7. **停止检测**: 完成检测并自动计算检测时长
8. **查看结果**: 检测完成后查看分析结果和建议
说明:更多设备管理、状态广播与检测过程接口,参见 `backend/main.py` 内路由与 SocketIO 事件注册。
### 4. 数据分析
## 开发建议
- **单次分析**: 查看单次检测的详细数据和图表
- **对比分析**: 比较多次检测结果,观察变化趋势
- **报告生成**: 生成 PDF 格式的专业评估报告
- **数据导出**: 导出原始数据用于进一步分析
- 统一通过 `ConfigManager` 访问配置,避免硬编码路径。
- 读取与写入文件路径时使用 `os.path.normpath/abspath/realpath` 进行规范化。
- 对外提供文件访问统一通过静态路由,确保安全校验与越界防护。
- 开发与打包环境下路径基础目录不同,尽量通过配置与规范化处理屏蔽差异。
### 5. 历史记录
## 数据安全
- 浏览所有历史检测记录
- 按患者、日期、状态等条件筛选
- 批量导出和删除操作
- 时间线视图查看检测历史
## 设备支持
### 摄像头
- USB 摄像头 (推荐 1080p 30fps)
- 内置摄像头
- 网络摄像头
### IMU 传感器
- 支持串口通信的 9 轴 IMU
- 波特率: 9600, 115200, 230400
- 数据格式: 加速度、陀螺仪、磁力计
### 压力传感器
- 多点压力传感器阵列
- 串口通信接口
- 支持 1-16 个传感器点
## 开发指南
### 后端开发
后端使用 Flask 框架,主要模块:
- `main.py`: 主启动脚本和进程管理
- `app.py`: 主应用和 API 路由
- `database.py`: SQLite 数据库操作
- `device_manager.py`: 硬件设备管理和视频流处理
- `detection_engine.py`: 检测算法引擎
- `data_processor.py`: 数据分析和处理
### 主要API端点
- `POST /api/detection/start`: 开始检测会话
- `POST /api/detection/{session_id}/stop`: 停止检测会话
- `POST /api/detection/{session_id}/collect`: 采集检测数据
- `WebSocket /ws`: 实时数据和视频流传输
### 前端开发
前端使用 Vue 3 + Element Plus主要特性
- **组合式 API**: 使用 Vue 3 Composition API
- **状态管理**: Pinia 状态管理库
- **UI 组件**: Element Plus 组件库
- **图表库**: ECharts 数据可视化
- **构建工具**: Vite 快速构建
### 添加新功能
1. **后端 API**:
```python
@app.route('/api/new-feature', methods=['POST'])
def new_feature():
# 实现新功能逻辑
return jsonify({'status': 'success'})
```
2. **前端服务**:
```javascript
// frontend/src/renderer/src/services/api.js
export const newFeatureAPI = {
doSomething: (data) => api.post('/new-feature', data)
}
```
3. **前端组件**:
```vue
<template>
<!-- 新功能界面 -->
</template>
<script setup>
import { newFeatureAPI } from '../services/api'
</script>
```
### 项目结构优势
新的项目结构带来以下优势:
1. **清晰分离**: 前后端代码完全分离,便于团队协作开发
2. **独立部署**: 前后端可以独立部署和扩展
3. **开发效率**: 前后端可以并行开发,提高开发效率
4. **维护性**: 模块化结构便于代码维护和功能扩展
5. **版本管理**: 前后端可以独立进行版本控制
6. **技术栈**: 前后端可以选择最适合的技术栈
### 开发环境配置
**后端开发**:
```bash
# 激活虚拟环境
venv\Scripts\activate
# 进入后端目录
cd backend
# 启动开发服务器
python main.py --mode development --log-level DEBUG
```
python debug_server.py
**前端开发**:
```bash
# 进入前端目录
cd frontend/src/renderer
# 启动开发服务器
npm run dev
```
**同时开发** (推荐):
```bash
# 使用开发脚本同时启动前后端
start_dev.bat
```
## 故障排除
### 常见问题
**Q: 摄像头无法识别**
A: 检查摄像头连接,确保没有被其他应用占用,在设备设置中刷新摄像头列表。
**Q: 传感器连接失败**
A: 确认串口设置正确,检查设备驱动是否安装,尝试不同的波特率设置。
**Q: 前端页面无法加载**
A: 检查后端服务是否正常启动,确认防火墙设置,查看浏览器控制台错误信息。
**Q: 检测结果不准确**
A: 确保设备已正确校准,检查环境光线条件,调整检测参数设置。
**Q: 视频录制失败**
A: 检查磁盘空间是否充足,确认录制权限设置,查看后端日志中的错误信息。
**Q: WebSocket连接断开**
A: 检查网络连接稳定性确认防火墙未阻止WebSocket连接尝试刷新页面重新连接。
### 日志查看
系统日志保存在 `logs/` 目录下:
- `app.log`: 应用程序主日志
- `device.log`: 设备管理日志
- `detection.log`: 检测引擎日志
- `error.log`: 错误日志
### 性能优化
1. **硬件要求**:
- CPU: Intel i5 或同等性能
- 内存: 8GB RAM
- 存储: 10GB 可用空间
2. **软件优化**:
- 关闭不必要的后台程序
- 使用 SSD 存储提高 I/O 性能
- 定期清理历史数据和日志
## 数据格式
### 检测数据结构
```json
{
"session_id": "uuid",
"patient_id": "patient_uuid",
"timestamp": "2024-01-01T12:00:00Z",
"duration": 60,
"data": {
"camera": {
"landmarks": [...],
"confidence": 0.95
},
"imu": {
"acceleration": [x, y, z],
"gyroscope": [x, y, z],
"magnetometer": [x, y, z]
},
"pressure": {
"sensors": [p1, p2, p3, p4],
"center_of_pressure": [x, y]
}
},
"recording": {
"video_path": "path/to/video.mp4",
"screenshots": ["path/to/screenshot1.png"]
}
}
```
### 分析结果格式
```json
{
"session_id": "uuid",
"analysis_time": "2024-01-01T12:01:00Z",
"overall_assessment": "good",
"balance_score": 85,
"posture_score": 78,
"metrics": {
"sway_area": 2.5,
"sway_velocity": 1.2,
"postural_stability": 0.85
},
"recommendations": [
"建议加强核心肌群训练",
"注意保持正确站姿"
]
}
```
## 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
## 贡献指南
欢迎贡献代码!请遵循以下步骤:
1. Fork 本仓库
2. 创建特性分支 (`git checkout -b feature/AmazingFeature`)
3. 提交更改 (`git commit -m 'Add some AmazingFeature'`)
4. 推送到分支 (`git push origin feature/AmazingFeature`)
5. 开启 Pull Request
## 支持
如果您遇到问题或有建议,请:
- 查看 [FAQ](docs/FAQ.md)
- 提交 [Issue](issues)
- 发送邮件至: support@example.com
## 更新日志
### v1.2.0 (2024-01-20)
- **检测功能增强**: 新增检测数据采集功能,支持实时数据快照
- **视频录制**: 集成视频录制功能,支持检测过程的完整记录
- **实时视频流**: 优化WebSocket视频传输提供流畅的实时监控
- **API优化**: 重构检测相关API使用RESTful设计模式
- **用户体验**: 改进检测界面,添加录制控制和数据采集按钮
- **生命周期管理**: 完善组件生命周期处理,确保资源正确释放
### v1.1.0 (2024-01-15)
- **项目重构**: 前后端完全分离,优化项目结构
- **新增脚本**: 添加一键安装和启动脚本 (install.bat, start_dev.bat, start_prod.bat)
- **开发体验**: 改进开发环境配置,支持独立的前后端开发
- **文档更新**: 完善 README 文档,添加详细的安装和使用说明
- **路径优化**: 统一使用虚拟环境,规范化依赖管理
### v1.0.0 (2024-01-01)
- 初始版本发布
- 基础检测功能
- 患者管理系统
- 数据分析和报告生成
---
**身体平衡评估系统** - 专业的平衡能力评估解决方案
- 所有数据采用本地存储,避免敏感信息外泄。
- 静态文件访问包含越界保护,限制访问至配置的存储根目录内。
- 建议对患者身份信息进行匿名化处理如ID映射

File diff suppressed because it is too large Load Diff

View File

@ -1,118 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['main.py'],
pathex=['D:/Trae_space/pyKinectAzure'],
binaries=[
# FemtoBolt相关库文件
('dll/femtobolt/k4a.dll', 'dll/femtobolt'), # K4A动态库
('dll/femtobolt/k4arecord.dll', 'dll/femtobolt'), # K4A录制库
('dll/femtobolt/depthengine_2_0.dll', 'dll/femtobolt'), # 深度引擎
('dll/femtobolt/OrbbecSDK.dll', 'dll/femtobolt'), # Orbbec SDK
('dll/femtobolt/k4a.lib', 'dll/femtobolt'), # K4A静态库
('dll/femtobolt/k4arecord.lib', 'dll/femtobolt'), # K4A录制静态库
('dll/femtobolt/k4arecorder.exe', 'dll/femtobolt'), # K4A录制工具
('dll/femtobolt/k4aviewer.exe', 'dll/femtobolt'), # K4A查看器
# SMiTSense相关库文件
('dll/smitsense/SMiTSenseUsb-F3.0.dll', 'dll/smitsense'), # SMiTSense传感器库
('dll/smitsense/Wrapper.dll', 'dll/smitsense'), # SMiTSense传感器库包装类
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'cv2',
'numpy',
'pandas',
'scipy',
'matplotlib',
'seaborn',
'sklearn',
'PIL',
'reportlab',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'pykinect_azure.k4a',
'pykinect_azure.k4abt',
'pykinect_azure.k4arecord',
'pykinect_azure.pykinect',
'pykinect_azure.utils',
'pykinect_azure._k4a',
'pykinect_azure._k4abt',
'pyserial',
'requests',
'yaml',
'click',
'colorama',
'tqdm',
'database',
'device_manager',
'utils',
'eventlet',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'engineio.async_drivers.eventlet',
'engineio.async_eventlet',
'socketio.async_eventlet',
'greenlet',
'gevent',
'gevent.socket',
'gevent.select',
'dns',
'dns.resolver',
'dns.reversename',
'dns.e164',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
[],
exclude_binaries=True,
name='BodyBalanceBackend',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)
coll = COLLECT(
exe,
a.binaries,
a.zipfiles,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='BodyBalanceBackend'
)

View File

@ -1,186 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
检查设备连接监控线程状态的测试脚本
"""
import sys
import os
import threading
import time
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.utils.config_manager import ConfigManager
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.pressure_manager import PressureManager
from devices.femtobolt_manager import FemtoBoltManager
class MockCameraManager(CameraManager):
"""模拟摄像头管理器,用于测试监控线程"""
def __init__(self, socketio, config_manager):
super().__init__(socketio, config_manager)
self._mock_hardware_connected = True
def check_hardware_connection(self) -> bool:
"""模拟硬件连接检查"""
return self._mock_hardware_connected
def set_mock_hardware_status(self, connected: bool):
"""设置模拟硬件连接状态"""
self._mock_hardware_connected = connected
def check_device_monitor_status(device_manager, device_name: str):
"""
检查单个设备的监控线程状态
Args:
device_manager: 设备管理器实例
device_name: 设备名称
"""
print(f"\n=== {device_name.upper()} 设备监控状态检查 ===")
# 检查基本状态
print(f"设备连接状态: {device_manager.is_connected}")
print(f"设备流状态: {device_manager.is_streaming}")
# 检查监控线程相关属性
if hasattr(device_manager, '_connection_monitor_thread'):
monitor_thread = device_manager._connection_monitor_thread
print(f"监控线程对象: {monitor_thread}")
if monitor_thread:
print(f"监控线程存活状态: {monitor_thread.is_alive()}")
print(f"监控线程名称: {monitor_thread.name}")
print(f"监控线程守护状态: {monitor_thread.daemon}")
else:
print("监控线程对象: None")
else:
print("设备管理器没有监控线程属性")
# 检查监控停止事件
if hasattr(device_manager, '_monitor_stop_event'):
stop_event = device_manager._monitor_stop_event
print(f"监控停止事件: {stop_event}")
print(f"监控停止事件状态: {stop_event.is_set()}")
else:
print("设备管理器没有监控停止事件属性")
# 检查监控配置
if hasattr(device_manager, '_connection_check_interval'):
print(f"连接检查间隔: {device_manager._connection_check_interval}")
if hasattr(device_manager, '_connection_timeout'):
print(f"连接超时时间: {device_manager._connection_timeout}")
# 检查心跳时间
if hasattr(device_manager, '_last_heartbeat'):
last_heartbeat = device_manager._last_heartbeat
current_time = time.time()
heartbeat_age = current_time - last_heartbeat
print(f"上次心跳时间: {time.ctime(last_heartbeat)}")
print(f"心跳间隔: {heartbeat_age:.2f}秒前")
def main():
"""
主函数检查所有设备的监控状态
"""
print("设备连接监控状态检查工具")
print("=" * 50)
try:
# 初始化配置管理器
config_manager = ConfigManager()
# 创建模拟设备管理器实例
mock_camera = MockCameraManager(None, config_manager)
print("\n=== 初始状态检查 ===")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
# 测试连接监控启动
print("\n=== 测试连接监控启动 ===")
print("\n1. 设置模拟硬件为连接状态...")
mock_camera.set_mock_hardware_status(True)
print("\n2. 设置设备为连接状态...")
mock_camera.set_connected(True)
time.sleep(0.5) # 等待线程启动
print("\n连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 系统线程信息 (启动监控后) ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
print("\n3. 等待3秒观察监控线程工作...")
time.sleep(3)
print("\n监控运行中的状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n4. 模拟硬件断开...")
mock_camera.set_mock_hardware_status(False)
time.sleep(6) # 等待监控检测到断开检查间隔是5秒
print("\n硬件断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n5. 重新连接测试...")
mock_camera.set_mock_hardware_status(True)
mock_camera.set_connected(True)
time.sleep(0.5)
print("\n重新连接后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n6. 手动断开连接...")
mock_camera.set_connected(False)
time.sleep(0.5)
print("\n手动断开后的监控状态:")
check_device_monitor_status(mock_camera, 'mock_camera')
print("\n=== 最终系统线程信息 ===")
active_threads = threading.active_count()
print(f"当前活跃线程数: {active_threads}")
print("\n活跃线程列表:")
for thread in threading.enumerate():
print(f" - {thread.name} (守护: {thread.daemon}, 存活: {thread.is_alive()})")
except Exception as e:
print(f"检查过程中发生错误: {e}")
import traceback
traceback.print_exc()
finally:
# 清理资源
print("\n=== 清理资源 ===")
try:
if 'mock_camera' in locals():
mock_camera.cleanup()
print("mock_camera 设备资源已清理")
except Exception as e:
print(f"清理资源时发生错误: {e}")
if __name__ == '__main__':
main()

View File

@ -33,8 +33,8 @@ algorithm_type = plt
color_resolution = 1080P
depth_mode = NFOV_2X2BINNED
camera_fps = 20
depth_range_min = 800
depth_range_max = 1200
depth_range_min = 1200
depth_range_max = 1600
fps = 15
synchronized_images_only = False

File diff suppressed because it is too large Load Diff

View File

@ -1,133 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化的相机性能测试
"""
import sys
import os
import time
import logging
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.utils.config_manager import ConfigManager
from devices.camera_manager import CameraManager
# 设置日志级别
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def test_camera_init_time():
"""
测试相机初始化时间
"""
print("相机初始化性能测试")
print("=" * 50)
try:
# 创建管理器
config_manager = ConfigManager()
camera_manager = CameraManager(None, config_manager)
print("\n开始相机初始化测试...")
# 记录总时间
total_start = time.time()
# 执行初始化
success = camera_manager.initialize()
total_time = (time.time() - total_start) * 1000
print(f"\n初始化结果: {'成功' if success else '失败'}")
print(f"总耗时: {total_time:.1f}ms ({total_time/1000:.1f}秒)")
# 性能评估
if total_time < 1000:
print("性能评级: 优秀 ⭐⭐⭐ (< 1秒)")
elif total_time < 3000:
print("性能评级: 良好 ⭐⭐ (< 3秒)")
elif total_time < 5000:
print("性能评级: 一般 ⭐ (< 5秒)")
else:
print("性能评级: 需要优化 ❌ (> 5秒)")
if success:
# 测试校准
print("\n测试校准性能...")
calibrate_start = time.time()
calibrate_success = camera_manager.calibrate()
calibrate_time = (time.time() - calibrate_start) * 1000
print(f"校准结果: {'成功' if calibrate_success else '失败'}")
print(f"校准耗时: {calibrate_time:.1f}ms")
# 测试首帧获取
if camera_manager.cap:
print("\n测试首帧获取...")
frame_start = time.time()
ret, frame = camera_manager.cap.read()
frame_time = (time.time() - frame_start) * 1000
if ret and frame is not None:
print(f"首帧获取成功 - 耗时: {frame_time:.1f}ms, 帧大小: {frame.shape}")
del frame
else:
print(f"首帧获取失败 - 耗时: {frame_time:.1f}ms")
# 获取设备信息
print("\n设备信息:")
device_info = camera_manager.get_device_info()
for key, value in device_info.items():
if key in ['width', 'height', 'fps', 'fourcc']:
print(f" {key}: {value}")
# 清理
camera_manager.cleanup()
except Exception as e:
print(f"\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
def analyze_performance_bottlenecks():
"""
分析性能瓶颈
"""
print("\n" + "=" * 50)
print("性能瓶颈分析")
print("=" * 50)
print("\n根据测试结果,主要性能瓶颈可能包括:")
print("1. 相机打开 (CAP_PROP设置) - 通常耗时3-4秒")
print("2. 分辨率设置 - 可能耗时5-6秒")
print("3. FPS设置 - 可能耗时2-3秒")
print("4. 首帧读取 - 通常耗时300-400ms")
print("\n优化建议:")
print("• 使用更快的相机后端 (如DirectShow)")
print("• 减少不必要的属性设置")
print("• 使用较低的分辨率进行初始化")
print("• 启用OpenCV优化")
print("• 设置合适的缓冲区大小")
def main():
print("相机启动性能优化测试")
print("目标: 将启动时间从10+秒优化到3秒以内")
# 执行测试
test_camera_init_time()
# 分析结果
analyze_performance_bottlenecks()
print("\n" + "=" * 50)
print("测试完成")
print("=" * 50)
if __name__ == '__main__':
main()

View File

@ -1,75 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试避免重复初始化功能
"""
import requests
import time
import json
def test_avoid_duplicate_initialization():
"""
测试避免重复初始化功能
"""
base_url = "http://localhost:5000"
print("=== 测试避免重复初始化功能 ===")
# 1. 获取初始设备状态
print("\n1. 获取初始设备状态")
try:
response = requests.get(f"{base_url}/api/devices/status")
if response.status_code == 200:
data = response.json()
print(f"设备状态: {json.dumps(data, indent=2, ensure_ascii=False)}")
else:
print(f"获取设备状态失败: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
# 2. 第一次启动设备数据推送
print("\n2. 第一次启动设备数据推送")
try:
response = requests.post(f"{base_url}/api/devices/start_push")
if response.status_code == 200:
data = response.json()
print(f"第一次启动结果: {json.dumps(data, indent=2, ensure_ascii=False)}")
else:
print(f"第一次启动失败: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
# 等待一段时间
print("\n等待5秒...")
time.sleep(5)
# 3. 第二次启动设备数据推送(应该避免重复初始化)
print("\n3. 第二次启动设备数据推送(测试避免重复初始化)")
try:
response = requests.post(f"{base_url}/api/devices/start_push")
if response.status_code == 200:
data = response.json()
print(f"第二次启动结果: {json.dumps(data, indent=2, ensure_ascii=False)}")
else:
print(f"第二次启动失败: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
# 4. 再次获取设备状态
print("\n4. 获取最终设备状态")
try:
response = requests.get(f"{base_url}/api/devices/status")
if response.status_code == 200:
data = response.json()
print(f"最终设备状态: {json.dumps(data, indent=2, ensure_ascii=False)}")
else:
print(f"获取设备状态失败: {response.status_code}")
except Exception as e:
print(f"请求失败: {e}")
print("\n=== 测试完成 ===")
print("请查看终端日志,确认第二次启动时是否显示'已连接,跳过初始化'的消息")
if __name__ == "__main__":
test_avoid_duplicate_initialization()

View File

@ -1,235 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenCV后端优化验证脚本
测试DirectShow后端相对于MSMF的性能提升
"""
import sys
import os
import time
import logging
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_backend_performance(backend_name, test_name):
"""
测试指定后端的性能
Args:
backend_name: 后端名称 (directshow, msmf)
test_name: 测试名称
Returns:
dict: 性能数据
"""
print(f"\n{'='*60}")
print(f"📷 测试 {test_name} 后端性能")
print(f"{'='*60}")
# 创建配置管理器并设置后端
config_manager = ConfigManager()
# 获取原始配置
original_config = config_manager.get_device_config('camera')
# 设置测试后端
test_config = {
'backend': backend_name
}
config_manager.set_camera_config(test_config)
try:
# 创建相机管理器
camera = CameraManager(None, config_manager)
# 测试初始化性能
start_time = time.time()
success = camera.initialize()
total_time = (time.time() - start_time) * 1000
if success:
print(f"✅ 相机初始化成功: {total_time:.1f}ms")
# 获取实际后端信息
if camera.cap:
backend_info = camera.cap.getBackendName() if hasattr(camera.cap, 'getBackendName') else 'Unknown'
actual_width = int(camera.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(camera.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = camera.cap.get(cv2.CAP_PROP_FPS)
print(f"🎯 实际后端: {backend_info}")
print(f"📐 实际分辨率: {actual_width}x{actual_height}@{actual_fps:.1f}fps")
# 测试首帧获取
frame_start = time.time()
ret, frame = camera.cap.read() if camera.cap else (False, None)
frame_time = (time.time() - frame_start) * 1000
if ret and frame is not None:
print(f"🖼️ 首帧获取: {frame_time:.1f}ms, 帧大小: {frame.shape}")
else:
print(f"❌ 首帧获取失败")
frame_time = -1
# 测试连续帧性能
print(f"🎬 测试连续帧获取性能...")
frame_times = []
for i in range(10):
frame_start = time.time()
ret, frame = camera.cap.read() if camera.cap else (False, None)
if ret:
frame_times.append((time.time() - frame_start) * 1000)
time.sleep(0.01) # 小延迟
if frame_times:
avg_frame_time = sum(frame_times) / len(frame_times)
min_frame_time = min(frame_times)
max_frame_time = max(frame_times)
print(f"📈 连续帧性能: 平均 {avg_frame_time:.1f}ms, 最快 {min_frame_time:.1f}ms, 最慢 {max_frame_time:.1f}ms")
else:
avg_frame_time = -1
print(f"❌ 连续帧测试失败")
# 清理资源
camera.cleanup()
print(f"🧹 相机资源已释放")
return {
'backend': backend_name,
'success': True,
'init_time': total_time,
'first_frame_time': frame_time,
'avg_frame_time': avg_frame_time,
'backend_info': backend_info if camera.cap else 'Unknown',
'resolution': f"{actual_width}x{actual_height}@{actual_fps:.1f}fps" if camera.cap else "未知"
}
else:
print(f"❌ 初始化失败")
return {
'backend': backend_name,
'success': False,
'init_time': total_time,
'first_frame_time': -1,
'avg_frame_time': -1,
'backend_info': 'Failed',
'resolution': "失败"
}
except Exception as e:
print(f"❌ 测试异常: {e}")
return {
'backend': backend_name,
'success': False,
'init_time': -1,
'first_frame_time': -1,
'avg_frame_time': -1,
'backend_info': 'Exception',
'resolution': "异常",
'error': str(e)
}
finally:
# 恢复原始配置
try:
restore_config = {
'backend': original_config['backend']
}
config_manager.set_camera_config(restore_config)
except Exception as e:
print(f"⚠️ 恢复配置失败: {e}")
def main():
"""
主函数测试不同后端的性能
"""
print("\n" + "="*80)
print("🚀 OpenCV后端性能优化验证")
print("="*80)
# 测试用例
test_cases = [
('directshow', 'DirectShow (推荐)'),
('msmf', 'MSMF (默认)')
]
results = []
# 执行测试
for backend, name in test_cases:
result = test_backend_performance(backend, name)
results.append(result)
time.sleep(1) # 测试间隔
# 汇总结果
print(f"\n\n{'='*80}")
print(f"📊 后端性能优化验证汇总")
print(f"{'='*80}")
# 表格头
print(f"{'后端':<12} {'状态':<8} {'初始化':<12} {'首帧':<12} {'连续帧':<12} {'实际后端':<15}")
print("-" * 80)
successful_results = [r for r in results if r['success']]
for result in results:
status = "✅成功" if result['success'] else "❌失败"
init_time = f"{result['init_time']:.1f}ms" if result['init_time'] > 0 else "失败"
first_frame = f"{result['first_frame_time']:.1f}ms" if result['first_frame_time'] > 0 else "失败"
avg_frame = f"{result['avg_frame_time']:.1f}ms" if result['avg_frame_time'] > 0 else "失败"
backend_info = result['backend_info'][:14] if len(result['backend_info']) > 14 else result['backend_info']
print(f"{result['backend']:<12} {status:<8} {init_time:<12} {first_frame:<12} {avg_frame:<12} {backend_info:<15}")
# 性能分析
if len(successful_results) >= 2:
print(f"\n📈 性能分析:")
# 找到最快的后端
fastest = min(successful_results, key=lambda x: x['init_time'])
slowest = max(successful_results, key=lambda x: x['init_time'])
print(f"🏆 最快后端: {fastest['backend']} - {fastest['init_time']:.1f}ms")
print(f"🐌 最慢后端: {slowest['backend']} - {slowest['init_time']:.1f}ms")
if fastest['init_time'] > 0 and slowest['init_time'] > 0:
improvement = ((slowest['init_time'] - fastest['init_time']) / slowest['init_time']) * 100
print(f"💡 性能提升: {improvement:.1f}% (使用最快后端)")
print(f"\n📋 详细性能对比:")
for result in successful_results:
if result != fastest:
slowdown = ((result['init_time'] - fastest['init_time']) / fastest['init_time']) * 100
print(f" {result['backend']}: 比最快后端慢 {slowdown:.1f}% ({result['init_time']:.1f}ms vs {fastest['init_time']:.1f}ms)")
print(f"\n🎯 建议:")
if successful_results:
fastest = min(successful_results, key=lambda x: x['init_time'])
print(f"✅ 推荐使用 {fastest['backend']} 后端以获得最佳性能")
print(f"📝 配置建议: 在配置文件中设置 backend = {fastest['backend']}")
if fastest['init_time'] > 5000:
print(f"⚠️ 性能评级: 需要优化 (> 5秒)")
elif fastest['init_time'] > 2000:
print(f"⚠️ 性能评级: 一般 (2-5秒)")
else:
print(f"✅ 性能评级: 良好 (< 2秒)")
else:
print(f"❌ 所有后端测试失败,请检查相机连接")
print(f"\n{'='*80}")
print(f"测试完成")
print(f"{'='*80}")
if __name__ == "__main__":
import cv2 # 需要导入cv2用于相机操作
main()

View File

@ -1,143 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
深入分析相机设备的行为
"""
import cv2
import numpy as np
import time
def analyze_camera_frame(frame, device_index):
"""
分析相机帧的特征
"""
print(f"\n=== 设备 {device_index} 帧分析 ===")
print(f"帧形状: {frame.shape}")
print(f"数据类型: {frame.dtype}")
print(f"数据范围: {frame.min()} - {frame.max()}")
# 检查是否是纯色帧(可能是虚拟设备)
unique_colors = len(np.unique(frame.reshape(-1, frame.shape[-1]), axis=0))
print(f"唯一颜色数量: {unique_colors}")
# 检查帧的统计信息
mean_values = np.mean(frame, axis=(0, 1))
std_values = np.std(frame, axis=(0, 1))
print(f"各通道均值: {mean_values}")
print(f"各通道标准差: {std_values}")
# 检查是否是静态帧
if np.all(std_values < 1.0):
print("⚠️ 这可能是一个静态/虚拟帧(标准差很小)")
# 检查是否是纯黑帧
if np.all(mean_values < 10):
print("⚠️ 这可能是一个黑色帧")
# 检查帧的变化
return frame
def test_camera_devices():
"""
测试多个相机设备并比较帧内容
"""
print("=== 相机设备详细分析 ===")
devices_to_test = [0, 1]
frames = {}
for device_index in devices_to_test:
print(f"\n--- 测试设备 {device_index} ---")
try:
cap = cv2.VideoCapture(device_index, cv2.CAP_DSHOW)
if cap.isOpened():
print(f"设备 {device_index}: 成功打开")
# 获取设备属性
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
fourcc = int(cap.get(cv2.CAP_PROP_FOURCC))
print(f"分辨率: {width}x{height}")
print(f"帧率: {fps}")
print(f"编码: {fourcc}")
# 读取多帧进行分析
frames_list = []
for i in range(5):
ret, frame = cap.read()
if ret and frame is not None:
frames_list.append(frame.copy())
if i == 0: # 只分析第一帧
frames[device_index] = analyze_camera_frame(frame, device_index)
else:
print(f"{i+1}帧读取失败")
break
# 检查帧间变化
if len(frames_list) > 1:
diff = cv2.absdiff(frames_list[0], frames_list[-1])
total_diff = np.sum(diff)
print(f"首末帧差异总和: {total_diff}")
if total_diff < 1000: # 阈值可调整
print("⚠️ 帧内容几乎没有变化,可能是虚拟设备")
else:
print("✓ 帧内容有变化,可能是真实相机")
else:
print(f"设备 {device_index}: 无法打开")
cap.release()
except Exception as e:
print(f"设备 {device_index} 测试异常: {e}")
# 比较不同设备的帧
if 0 in frames and 1 in frames:
print("\n=== 设备间帧比较 ===")
diff = cv2.absdiff(frames[0], frames[1])
total_diff = np.sum(diff)
print(f"设备0和设备1帧差异总和: {total_diff}")
if total_diff < 1000:
print("⚠️ 两个设备的帧几乎相同设备1可能是设备0的镜像或虚拟设备")
else:
print("✓ 两个设备的帧不同,可能是独立的相机")
def check_system_cameras():
"""
检查系统中可用的相机设备
"""
print("\n=== 系统相机设备检查 ===")
available_cameras = []
# 测试前10个设备索引
for i in range(10):
cap = cv2.VideoCapture(i, cv2.CAP_DSHOW)
if cap.isOpened():
ret, _ = cap.read()
if ret:
available_cameras.append(i)
print(f"设备 {i}: 可用")
else:
print(f"设备 {i}: 打开但无法读取")
else:
print(f"设备 {i}: 不可用")
cap.release()
# 避免测试太多设备
if len(available_cameras) >= 3:
break
print(f"\n发现 {len(available_cameras)} 个可用相机设备: {available_cameras}")
return available_cameras
if __name__ == "__main__":
check_system_cameras()
test_camera_devices()

View File

@ -1,194 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
相机断开连接测试脚本
测试相机USB拔出时是否能正常检测设备断连并发送socket信息
"""
import time
import threading
import logging
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
from unittest.mock import Mock
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def clear_events(self):
"""清空事件记录"""
with self.lock:
self.events.clear()
def test_camera_disconnect_detection():
"""
测试相机断开连接检测功能
"""
logger.info("="*60)
logger.info("开始测试相机断开连接检测功能")
logger.info("="*60)
# 创建模拟SocketIO
mock_socketio = MockSocketIO()
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(mock_socketio, config_manager)
try:
# 1. 初始化相机
logger.info("\n步骤1: 初始化相机设备")
if not camera_manager.initialize():
logger.error("相机初始化失败,无法进行测试")
return False
logger.info(f"相机初始化成功 - 连接状态: {camera_manager.is_connected}")
# 2. 启动数据流
logger.info("\n步骤2: 启动相机数据流")
if not camera_manager.start_streaming():
logger.error("相机数据流启动失败")
return False
logger.info("相机数据流启动成功")
# 3. 等待一段时间让系统稳定
logger.info("\n步骤3: 等待系统稳定 (5秒)")
time.sleep(5)
# 清空之前的事件记录
mock_socketio.clear_events()
# 4. 提示用户拔出USB
logger.info("\n步骤4: 请拔出相机USB连接线")
logger.info("等待30秒来检测断开连接...")
start_time = time.time()
disconnect_detected = False
# 监控30秒
while time.time() - start_time < 30:
# 检查连接状态
if camera_manager.is_connected:
logger.debug(f"相机仍然连接中... (已等待 {time.time() - start_time:.1f}秒)")
else:
logger.info(f"检测到相机断开连接! (耗时 {time.time() - start_time:.1f}秒)")
disconnect_detected = True
break
time.sleep(1)
# 5. 分析结果
logger.info("\n步骤5: 分析测试结果")
if disconnect_detected:
logger.info("✓ 相机断开连接检测: 成功")
else:
logger.warning("✗ 相机断开连接检测: 失败 (30秒内未检测到断开)")
# 检查Socket事件
events = mock_socketio.get_events()
disconnect_events = [e for e in events if 'status' in str(e.get('data', {})) and 'disconnect' in str(e.get('data', {})).lower()]
if disconnect_events:
logger.info(f"✓ Socket断开通知: 成功 (发送了 {len(disconnect_events)} 个断开事件)")
for event in disconnect_events:
logger.info(f" - 事件: {event['event']}, 数据: {event['data']}")
else:
logger.warning("✗ Socket断开通知: 失败 (未发送断开事件)")
# 6. 测试重连机制
if disconnect_detected:
logger.info("\n步骤6: 测试重连机制")
logger.info("请重新插入相机USB连接线")
logger.info("等待30秒来检测重新连接...")
start_time = time.time()
reconnect_detected = False
while time.time() - start_time < 30:
if camera_manager.is_connected:
logger.info(f"检测到相机重新连接! (耗时 {time.time() - start_time:.1f}秒)")
reconnect_detected = True
break
time.sleep(1)
if reconnect_detected:
logger.info("✓ 相机重连检测: 成功")
else:
logger.warning("✗ 相机重连检测: 失败 (30秒内未检测到重连)")
# 7. 显示所有Socket事件
logger.info("\n步骤7: 所有Socket事件记录")
all_events = mock_socketio.get_events()
if all_events:
for i, event in enumerate(all_events, 1):
logger.info(f" {i}. 事件: {event['event']}, 数据: {event['data']}, 时间: {time.strftime('%H:%M:%S', time.localtime(event['timestamp']))}")
else:
logger.info(" 无Socket事件记录")
return disconnect_detected
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
return False
finally:
# 清理资源
try:
camera_manager.stop_streaming()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""
主函数
"""
logger.info("相机断开连接测试脚本")
logger.info("此脚本将测试相机USB拔出时的断连检测和Socket通知功能")
logger.info("")
# 运行测试
success = test_camera_disconnect_detection()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测功能正常")
else:
logger.info("测试完成: 相机断开连接检测功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()

View File

@ -1,212 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
完整的相机断开连接测试脚本
模拟主程序的完整流程
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import time
import threading
import logging
from datetime import datetime
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def clear_events(self):
"""清空事件记录"""
with self.lock:
self.events.clear()
class MockAppServer:
"""模拟主程序服务器"""
def __init__(self):
self.socketio = MockSocketIO()
self.logger = logger
self.device_managers = {}
def broadcast_device_status(self, device_name: str, is_connected: bool):
"""广播单个设备状态"""
if self.socketio:
try:
status_data = {
'device_type': device_name,
'status': is_connected,
'timestamp': datetime.now().isoformat()
}
self.socketio.emit('device_status', status_data, namespace='/devices')
self.logger.info(f'广播设备状态: {device_name} -> {"已连接" if is_connected else "未连接"}')
except Exception as e:
self.logger.error(f'广播设备状态失败: {e}')
def _on_device_status_change(self, device_name: str, is_connected: bool):
"""设备状态变化回调函数"""
self.logger.info(f'设备状态变化: {device_name} -> {"已连接" if is_connected else "未连接"}')
self.broadcast_device_status(device_name, is_connected)
def test_camera_disconnect_with_socket():
"""测试相机断开连接和Socket通知"""
logger.info("="*60)
logger.info("开始测试相机断开连接和Socket通知功能")
logger.info("="*60)
# 创建模拟服务器
app_server = MockAppServer()
try:
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(app_server.socketio, config_manager)
app_server.device_managers['camera'] = camera_manager
# 添加状态变化回调(模拟主程序的回调注册)
camera_manager.add_status_change_callback(app_server._on_device_status_change)
# 1. 测试初始化
logger.info("\n步骤1: 初始化相机设备")
if camera_manager.initialize():
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
else:
logger.warning("✗ 相机初始化失败")
return False
# 清空初始化时的事件
app_server.socketio.clear_events()
# 2. 启动数据流(可选)
logger.info("\n步骤2: 启动相机数据流")
try:
if camera_manager.start_streaming(app_server.socketio):
logger.info("✓ 相机数据流启动成功")
time.sleep(2) # 让数据流稳定
else:
logger.warning("✗ 相机数据流启动失败")
except Exception as e:
logger.warning(f"数据流启动异常: {e}")
# 3. 监控连接状态变化
logger.info("\n步骤3: 监控连接状态变化 (30秒)")
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
start_time = time.time()
last_status = camera_manager.is_connected
disconnect_detected = False
while time.time() - start_time < 30:
current_status = camera_manager.is_connected
if current_status != last_status:
elapsed_time = time.time() - start_time
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {elapsed_time:.1f}秒)")
last_status = current_status
if not current_status:
logger.info("✓ 成功检测到相机断开!")
disconnect_detected = True
time.sleep(2) # 等待事件处理完成
break
time.sleep(0.5)
# 4. 检查Socket事件
logger.info("\n步骤4: 检查Socket事件")
events = app_server.socketio.get_events()
if events:
logger.info(f"✓ 共记录到 {len(events)} 个Socket事件:")
disconnect_events = 0
for i, event in enumerate(events, 1):
logger.info(f" {i}. 事件: {event['event']}, 数据: {event['data']}, 命名空间: {event['namespace']}")
if event['event'] == 'device_status' and event['data'].get('status') == False:
disconnect_events += 1
if disconnect_events > 0:
logger.info(f"✓ 检测到 {disconnect_events} 个设备断开事件")
else:
logger.warning("✗ 未检测到设备断开事件")
else:
logger.warning("✗ 未记录到任何Socket事件")
# 5. 测试结果总结
logger.info("\n步骤5: 测试结果总结")
if disconnect_detected:
logger.info("✓ 硬件断开检测: 成功")
else:
logger.warning("✗ 硬件断开检测: 失败 (30秒内未检测到断开)")
if events and any(e['event'] == 'device_status' and e['data'].get('status') == False for e in events):
logger.info("✓ Socket断开通知: 成功")
else:
logger.warning("✗ Socket断开通知: 失败")
return disconnect_detected and len(events) > 0
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理资源
try:
if 'camera_manager' in locals():
camera_manager.stop_streaming()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""主函数"""
logger.info("相机断开连接完整测试脚本")
logger.info("此脚本将模拟主程序的完整流程测试相机USB拔出时的断连检测和Socket通知功能")
success = test_camera_disconnect_with_socket()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测和Socket通知功能正常")
else:
logger.info("测试完成: 相机断开连接检测和Socket通知功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()

View File

@ -1,217 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
相机启动性能测试脚本
"""
import sys
import os
import time
import logging
from typing import Dict, Any
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.utils.config_manager import ConfigManager
from devices.camera_manager import CameraManager
# 设置日志级别为DEBUG以查看详细信息
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def test_camera_startup_performance():
"""
测试相机启动性能
"""
print("=" * 60)
print("相机启动性能测试")
print("=" * 60)
try:
# 初始化配置管理器
print("\n1. 初始化配置管理器...")
config_start = time.time()
config_manager = ConfigManager()
config_time = (time.time() - config_start) * 1000
print(f"配置管理器初始化完成 (耗时: {config_time:.1f}ms)")
# 创建相机管理器
print("\n2. 创建相机管理器...")
manager_start = time.time()
camera_manager = CameraManager(None, config_manager)
manager_time = (time.time() - manager_start) * 1000
print(f"相机管理器创建完成 (耗时: {manager_time:.1f}ms)")
# 测试多次初始化以获得平均性能
print("\n3. 执行相机初始化性能测试...")
test_rounds = 3
init_times = []
for i in range(test_rounds):
print(f"\n--- 第 {i+1} 轮测试 ---")
# 如果之前已连接,先断开
if camera_manager.is_connected:
disconnect_start = time.time()
camera_manager.disconnect()
disconnect_time = (time.time() - disconnect_start) * 1000
print(f"断开连接耗时: {disconnect_time:.1f}ms")
time.sleep(0.5) # 等待设备完全断开
# 执行初始化
init_start = time.time()
success = camera_manager.initialize()
init_time = (time.time() - init_start) * 1000
if success:
print(f"✓ 初始化成功 (总耗时: {init_time:.1f}ms)")
init_times.append(init_time)
# 测试校准性能
calibrate_start = time.time()
calibrate_success = camera_manager.calibrate()
calibrate_time = (time.time() - calibrate_start) * 1000
if calibrate_success:
print(f"✓ 校准成功 (耗时: {calibrate_time:.1f}ms)")
else:
print(f"✗ 校准失败 (耗时: {calibrate_time:.1f}ms)")
# 测试第一帧获取时间
if camera_manager.cap:
first_frame_start = time.time()
ret, frame = camera_manager.cap.read()
first_frame_time = (time.time() - first_frame_start) * 1000
if ret and frame is not None:
print(f"✓ 首帧获取成功 (耗时: {first_frame_time:.1f}ms, 帧大小: {frame.shape})")
del frame # 释放内存
else:
print(f"✗ 首帧获取失败 (耗时: {first_frame_time:.1f}ms)")
else:
print(f"✗ 初始化失败 (耗时: {init_time:.1f}ms)")
time.sleep(1) # 测试间隔
# 性能统计
print("\n" + "=" * 60)
print("性能统计结果")
print("=" * 60)
if init_times:
avg_init_time = sum(init_times) / len(init_times)
min_init_time = min(init_times)
max_init_time = max(init_times)
print(f"初始化性能统计 ({len(init_times)} 次成功测试):")
print(f" 平均耗时: {avg_init_time:.1f}ms")
print(f" 最快耗时: {min_init_time:.1f}ms")
print(f" 最慢耗时: {max_init_time:.1f}ms")
# 性能评估
if avg_init_time < 1000: # 1秒以内
print(f" 性能评级: 优秀 ⭐⭐⭐")
elif avg_init_time < 3000: # 3秒以内
print(f" 性能评级: 良好 ⭐⭐")
elif avg_init_time < 5000: # 5秒以内
print(f" 性能评级: 一般 ⭐")
else:
print(f" 性能评级: 需要优化 ❌")
else:
print("❌ 所有初始化测试都失败了")
# 获取设备信息
if camera_manager.is_connected:
print("\n设备信息:")
device_info = camera_manager.get_device_info()
for key, value in device_info.items():
print(f" {key}: {value}")
# 清理资源
print("\n4. 清理资源...")
cleanup_start = time.time()
camera_manager.cleanup()
cleanup_time = (time.time() - cleanup_start) * 1000
print(f"资源清理完成 (耗时: {cleanup_time:.1f}ms)")
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
def test_streaming_startup():
"""
测试流媒体启动性能
"""
print("\n" + "=" * 60)
print("流媒体启动性能测试")
print("=" * 60)
try:
config_manager = ConfigManager()
camera_manager = CameraManager(None, config_manager)
# 初始化相机
print("\n1. 初始化相机...")
if not camera_manager.initialize():
print("❌ 相机初始化失败,无法进行流媒体测试")
return
# 测试流媒体启动
print("\n2. 启动流媒体...")
streaming_start = time.time()
streaming_success = camera_manager.start_streaming()
streaming_time = (time.time() - streaming_start) * 1000
if streaming_success:
print(f"✓ 流媒体启动成功 (耗时: {streaming_time:.1f}ms)")
# 等待几秒钟收集帧数据
print("\n3. 收集性能数据...")
time.sleep(3)
# 获取统计信息
stats = camera_manager.get_stats()
print(f"\n流媒体性能统计:")
for key, value in stats.items():
print(f" {key}: {value}")
# 停止流媒体
print("\n4. 停止流媒体...")
stop_start = time.time()
camera_manager.stop_streaming()
stop_time = (time.time() - stop_start) * 1000
print(f"✓ 流媒体停止完成 (耗时: {stop_time:.1f}ms)")
else:
print(f"❌ 流媒体启动失败 (耗时: {streaming_time:.1f}ms)")
# 清理
camera_manager.cleanup()
except Exception as e:
print(f"\n❌ 流媒体测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
def main():
"""
主函数
"""
print("相机性能测试工具")
print("测试目标优化相机启动时间目标从10+秒降低到3秒以内")
# 执行基本启动性能测试
test_camera_startup_performance()
# 执行流媒体启动性能测试
test_streaming_startup()
print("\n" + "=" * 60)
print("测试完成!")
print("=" * 60)
if __name__ == '__main__':
main()

View File

@ -1,148 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
简化的相机断开连接测试脚本
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
import time
import threading
import logging
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def __init__(self):
self.events = []
self.lock = threading.Lock()
def emit(self, event, data, namespace=None):
"""记录发送的事件"""
with self.lock:
self.events.append({
'event': event,
'data': data,
'namespace': namespace,
'timestamp': time.time()
})
logger.info(f"Socket事件: {event} -> {data} (namespace: {namespace})")
def get_events(self):
"""获取所有事件"""
with self.lock:
return self.events.copy()
def test_camera_connection():
"""测试相机连接和断开检测"""
logger.info("="*60)
logger.info("开始测试相机连接和断开检测功能")
logger.info("="*60)
# 创建模拟SocketIO
mock_socketio = MockSocketIO()
try:
# 创建配置管理器
config_manager = ConfigManager()
# 创建相机管理器
camera_manager = CameraManager(mock_socketio, config_manager)
# 添加状态变化回调
def status_callback(device_name, is_connected):
logger.info(f"状态回调: {device_name} -> {'连接' if is_connected else '断开'}")
camera_manager.add_status_change_callback(status_callback)
# 1. 测试初始化
logger.info("\n步骤1: 初始化相机设备")
if camera_manager.initialize():
logger.info(f"✓ 相机初始化成功 - 连接状态: {camera_manager.is_connected}")
else:
logger.warning("✗ 相机初始化失败")
return False
# 2. 测试硬件连接检查
logger.info("\n步骤2: 测试硬件连接检查")
hardware_connected = camera_manager.check_hardware_connection()
logger.info(f"硬件连接状态: {hardware_connected}")
# 3. 启动连接监控
logger.info("\n步骤3: 启动连接监控")
camera_manager._start_connection_monitor()
logger.info("连接监控已启动")
# 4. 监控连接状态变化
logger.info("\n步骤4: 监控连接状态 (30秒)")
logger.info("请在此期间拔出相机USB连接线来测试断开检测...")
start_time = time.time()
last_status = camera_manager.is_connected
while time.time() - start_time < 30:
current_status = camera_manager.is_connected
if current_status != last_status:
logger.info(f"检测到状态变化: {'连接' if current_status else '断开'} (耗时: {time.time() - start_time:.1f}秒)")
last_status = current_status
if not current_status:
logger.info("✓ 成功检测到相机断开!")
break
time.sleep(1)
# 5. 检查Socket事件
logger.info("\n步骤5: 检查Socket事件")
events = mock_socketio.get_events()
if events:
logger.info(f"共记录到 {len(events)} 个Socket事件:")
for i, event in enumerate(events, 1):
logger.info(f" {i}. {event['event']} -> {event['data']}")
else:
logger.info("未记录到Socket事件")
return True
except Exception as e:
logger.error(f"测试过程中发生异常: {e}")
return False
finally:
# 清理资源
try:
if 'camera_manager' in locals():
camera_manager._stop_connection_monitor()
camera_manager.disconnect()
logger.info("测试资源清理完成")
except Exception as e:
logger.error(f"清理资源时发生异常: {e}")
def main():
"""主函数"""
logger.info("相机断开连接测试脚本")
success = test_camera_connection()
logger.info("\n" + "="*60)
if success:
logger.info("测试完成: 相机断开连接检测功能测试完成")
else:
logger.info("测试完成: 相机断开连接检测功能存在问题")
logger.info("="*60)
if __name__ == "__main__":
main()

View File

@ -1,305 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenCV后端性能测试脚本
测试不同OpenCV后端DirectShow vs MSMF的性能差异
"""
import sys
import os
import time
import logging
import cv2
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_opencv_backend(backend_name, backend_id, device_index=0, width=1280, height=720, fps=30):
"""
测试指定OpenCV后端的性能
Args:
backend_name: 后端名称
backend_id: 后端ID
device_index: 设备索引
width: 宽度
height: 高度
fps: 帧率
Returns:
dict: 性能数据
"""
print(f"\n{'='*70}")
print(f"测试 {backend_name} 后端 (ID: {backend_id})")
print(f"分辨率: {width}x{height}, FPS: {fps}")
print(f"{'='*70}")
result = {
'backend_name': backend_name,
'backend_id': backend_id,
'success': False,
'init_time': -1,
'config_time': -1,
'first_frame_time': -1,
'total_time': -1,
'actual_resolution': 'N/A',
'error': None
}
cap = None
try:
# 1. 测试相机初始化时间
print(f"📷 初始化相机 (后端: {backend_name})...")
init_start = time.time()
# 创建VideoCapture对象并指定后端
cap = cv2.VideoCapture(device_index, backend_id)
if not cap.isOpened():
print(f"❌ 无法打开相机 (后端: {backend_name})")
result['error'] = f"无法打开相机 (后端: {backend_name})"
return result
init_time = (time.time() - init_start) * 1000
result['init_time'] = init_time
print(f"✅ 相机初始化成功: {init_time:.1f}ms")
# 2. 测试配置时间
print(f"⚙️ 配置相机参数...")
config_start = time.time()
# 设置分辨率和帧率
cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
cap.set(cv2.CAP_PROP_FPS, fps)
# 设置缓冲区大小
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# 性能优化设置
try:
cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # 手动曝光
cap.set(cv2.CAP_PROP_AUTO_WB, 0) # 禁用自动白平衡
cap.set(cv2.CAP_PROP_EXPOSURE, -6) # 设置曝光值
except Exception as e:
print(f"⚠️ 性能优化设置警告: {e}")
config_time = (time.time() - config_start) * 1000
result['config_time'] = config_time
print(f"✅ 配置完成: {config_time:.1f}ms")
# 3. 获取实际分辨率
actual_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
actual_fps = cap.get(cv2.CAP_PROP_FPS)
result['actual_resolution'] = f"{actual_width}x{actual_height}@{actual_fps:.1f}fps"
print(f"🎯 实际参数: {actual_width}x{actual_height}, FPS: {actual_fps:.1f}")
# 4. 测试首帧获取时间
print(f"🖼️ 获取首帧...")
frame_start = time.time()
ret, frame = cap.read()
if ret and frame is not None:
first_frame_time = (time.time() - frame_start) * 1000
result['first_frame_time'] = first_frame_time
print(f"✅ 首帧获取成功: {first_frame_time:.1f}ms, 帧大小: {frame.shape}")
else:
print(f"❌ 首帧获取失败")
result['error'] = "首帧获取失败"
return result
# 5. 计算总时间
total_time = init_time + config_time + first_frame_time
result['total_time'] = total_time
result['success'] = True
print(f"📊 总耗时: {total_time:.1f}ms ({total_time/1000:.2f}秒)")
# 6. 测试连续帧获取性能
print(f"🎬 测试连续帧获取性能...")
frame_times = []
test_frames = 10
for i in range(test_frames):
frame_start = time.time()
ret, frame = cap.read()
if ret:
frame_time = (time.time() - frame_start) * 1000
frame_times.append(frame_time)
else:
break
if frame_times:
avg_frame_time = sum(frame_times) / len(frame_times)
max_frame_time = max(frame_times)
min_frame_time = min(frame_times)
print(f"📈 连续帧性能: 平均 {avg_frame_time:.1f}ms, 最快 {min_frame_time:.1f}ms, 最慢 {max_frame_time:.1f}ms")
result['avg_frame_time'] = avg_frame_time
result['max_frame_time'] = max_frame_time
result['min_frame_time'] = min_frame_time
return result
except Exception as e:
print(f"❌ 测试异常: {e}")
result['error'] = str(e)
return result
finally:
if cap:
cap.release()
print(f"🧹 相机资源已释放")
def get_available_backends():
"""
获取可用的OpenCV后端
Returns:
list: 可用后端列表
"""
backends = [
('DirectShow', cv2.CAP_DSHOW),
('MSMF', cv2.CAP_MSMF),
('V4L2', cv2.CAP_V4L2),
('GStreamer', cv2.CAP_GSTREAMER),
('Any', cv2.CAP_ANY)
]
available_backends = []
for name, backend_id in backends:
try:
# 尝试创建VideoCapture对象
cap = cv2.VideoCapture(0, backend_id)
if cap.isOpened():
available_backends.append((name, backend_id))
cap.release()
except Exception:
pass
return available_backends
def main():
"""
主测试函数
"""
print("🚀 OpenCV后端性能测试")
print(f"OpenCV版本: {cv2.__version__}")
# 获取可用后端
print("\n🔍 检测可用的OpenCV后端...")
available_backends = get_available_backends()
if not available_backends:
print("❌ 未找到可用的相机后端")
return
print(f"✅ 找到 {len(available_backends)} 个可用后端:")
for name, backend_id in available_backends:
print(f" - {name} (ID: {backend_id})")
# 测试参数
test_params = {
'device_index': 0,
'width': 1280,
'height': 720,
'fps': 30
}
print(f"\n📋 测试参数: {test_params['width']}x{test_params['height']}@{test_params['fps']}fps")
# 执行测试
results = []
for backend_name, backend_id in available_backends:
result = test_opencv_backend(
backend_name,
backend_id,
**test_params
)
results.append(result)
# 等待一下,避免设备冲突
time.sleep(2)
# 输出汇总结果
print(f"\n{'='*90}")
print("📈 OpenCV后端性能测试汇总")
print(f"{'='*90}")
print(f"{'后端':<12} {'状态':<8} {'初始化':<10} {'配置':<10} {'首帧':<10} {'总计':<10} {'实际分辨率':<20}")
print("-" * 90)
successful_results = []
for result in results:
status = "✅成功" if result['success'] else "❌失败"
init_time = f"{result['init_time']:.1f}ms" if result['init_time'] > 0 else "N/A"
config_time = f"{result['config_time']:.1f}ms" if result['config_time'] > 0 else "N/A"
frame_time = f"{result['first_frame_time']:.1f}ms" if result['first_frame_time'] > 0 else "N/A"
total_time = f"{result['total_time']:.1f}ms" if result['total_time'] > 0 else "N/A"
print(f"{result['backend_name']:<12} {status:<8} {init_time:<10} {config_time:<10} {frame_time:<10} {total_time:<10} {result['actual_resolution']:<20}")
if result['success']:
successful_results.append(result)
# 性能分析
if len(successful_results) >= 2:
print(f"\n📊 性能分析:")
# 找到最快和最慢的后端
fastest = min(successful_results, key=lambda x: x['total_time'])
slowest = max(successful_results, key=lambda x: x['total_time'])
print(f"🏆 最快后端: {fastest['backend_name']} - {fastest['total_time']:.1f}ms")
print(f"🐌 最慢后端: {slowest['backend_name']} - {slowest['total_time']:.1f}ms")
if slowest['total_time'] > fastest['total_time']:
improvement = ((slowest['total_time'] - fastest['total_time']) / slowest['total_time']) * 100
print(f"💡 性能提升: {improvement:.1f}% (使用最快后端)")
# 详细对比
print(f"\n📋 详细性能对比:")
for result in successful_results:
if result != fastest:
if result['total_time'] > fastest['total_time']:
slower = ((result['total_time'] - fastest['total_time']) / fastest['total_time']) * 100
print(f" {result['backend_name']}: 比最快后端慢 {slower:.1f}% ({result['total_time']:.1f}ms vs {fastest['total_time']:.1f}ms)")
elif len(successful_results) == 1:
result = successful_results[0]
print(f"\n📊 只有一个后端测试成功: {result['backend_name']} - {result['total_time']:.1f}ms")
# 推荐建议
print(f"\n🎯 建议:")
if successful_results:
fastest = min(successful_results, key=lambda x: x['total_time'])
print(f"✅ 推荐使用 {fastest['backend_name']} 后端以获得最佳性能")
print(f"📝 配置建议: 在相机初始化时指定后端 cv2.VideoCapture(device_index, cv2.CAP_{fastest['backend_name'].upper()})")
if fastest['total_time'] < 3000:
print(f"🚀 性能评级: 优秀 (< 3秒)")
elif fastest['total_time'] < 5000:
print(f"⚡ 性能评级: 良好 (< 5秒)")
else:
print(f"⚠️ 性能评级: 需要优化 (> 5秒)")
else:
print(f"❌ 所有后端测试都失败了,请检查相机连接和驱动")
print(f"\n{'='*90}")
print("测试完成")
print(f"{'='*90}")
if __name__ == "__main__":
main()

View File

@ -1,113 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试OpenCV VideoCapture的行为
验证当设备索引不存在时VideoCapture的表现
"""
import cv2
import time
def test_video_capture_behavior():
"""
测试不同设备索引的VideoCapture行为
"""
print("=== OpenCV VideoCapture 行为测试 ===")
print(f"OpenCV版本: {cv2.__version__}")
print()
# 测试不同的设备索引
test_indices = [0, 1, 2, 3, -1]
backends = [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
backend_names = ['CAP_DSHOW', 'CAP_MSMF', 'CAP_ANY']
for device_index in test_indices:
print(f"\n--- 测试设备索引 {device_index} ---")
for backend, backend_name in zip(backends, backend_names):
print(f"\n后端: {backend_name}")
try:
start_time = time.time()
cap = cv2.VideoCapture(device_index, backend)
open_time = (time.time() - start_time) * 1000
print(f" VideoCapture创建: 成功 (耗时: {open_time:.1f}ms)")
print(f" isOpened(): {cap.isOpened()}")
if cap.isOpened():
# 尝试读取帧
start_time = time.time()
ret, frame = cap.read()
read_time = (time.time() - start_time) * 1000
print(f" read()返回值: ret={ret}")
if ret and frame is not None:
print(f" 帧形状: {frame.shape}")
print(f" 读取耗时: {read_time:.1f}ms")
else:
print(f" 读取失败 (耗时: {read_time:.1f}ms)")
# 获取一些属性
try:
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH)
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = cap.get(cv2.CAP_PROP_FPS)
print(f" 分辨率: {int(width)}x{int(height)}")
print(f" 帧率: {fps}")
except Exception as e:
print(f" 获取属性失败: {e}")
else:
print(" 相机未打开")
cap.release()
except Exception as e:
print(f" 异常: {e}")
print("\n=== 测试完成 ===")
def test_specific_case():
"""
专门测试device_index=1的情况
"""
print("\n=== 专门测试 device_index=1 ===")
try:
# 使用DSHOW后端Windows默认
cap = cv2.VideoCapture(1, cv2.CAP_DSHOW)
print(f"VideoCapture(1, CAP_DSHOW) 创建成功")
print(f"isOpened(): {cap.isOpened()}")
if cap.isOpened():
print("相机显示为已打开,但这可能是虚假的")
# 尝试多次读取
for i in range(3):
print(f"\n{i+1}次读取:")
start_time = time.time()
ret, frame = cap.read()
read_time = (time.time() - start_time) * 1000
print(f" ret: {ret}")
print(f" frame is None: {frame is None}")
print(f" 耗时: {read_time:.1f}ms")
if ret and frame is not None:
print(f" 帧形状: {frame.shape}")
print(f" 帧数据类型: {frame.dtype}")
print(f" 帧数据范围: {frame.min()} - {frame.max()}")
else:
print(" 读取失败或帧为空")
break
else:
print("相机未打开")
cap.release()
except Exception as e:
print(f"异常: {e}")
if __name__ == "__main__":
test_video_capture_behavior()
test_specific_case()

View File

@ -1,98 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备重连机制测试脚本
测试设备断开后的自动重连功能
"""
import time
import threading
from devices.camera_manager import CameraManager
from devices.imu_manager import IMUManager
from devices.femtobolt_manager import FemtoBoltManager
from devices.pressure_manager import PressureManager
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class MockSocketIO:
"""模拟SocketIO用于测试"""
def emit(self, event, data):
print(f"[SocketIO] 发送事件: {event}, 数据: {data}")
def test_device_reconnection(device_manager, device_name):
"""测试设备重连机制"""
print(f"\n=== 测试 {device_name} 重连机制 ===")
# 初始化设备
print(f"1. 初始化 {device_name} 设备...")
success = device_manager.initialize()
print(f" 初始化结果: {'成功' if success else '失败'}")
if success:
print(f" 设备连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 等待一段时间让连接稳定
print("2. 等待连接稳定...")
time.sleep(3)
# 模拟设备断开
print("3. 模拟设备断开连接...")
device_manager.disconnect()
print(f" 断开后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 等待一段时间
print("4. 等待重连机制触发...")
time.sleep(5)
# 尝试重新连接
print("5. 尝试重新连接...")
reconnect_success = device_manager.initialize()
print(f" 重连结果: {'成功' if reconnect_success else '失败'}")
print(f" 重连后连接状态: {'已连接' if device_manager.is_connected else '未连接'}")
# 清理
device_manager.disconnect()
print(f"=== {device_name} 重连测试完成 ===\n")
return success
def main():
"""主测试函数"""
print("开始设备重连机制测试...")
# 创建模拟SocketIO
mock_socketio = MockSocketIO()
# 测试相机重连
print("\n测试相机重连机制...")
camera_manager = CameraManager(mock_socketio)
test_device_reconnection(camera_manager, "相机")
# 测试IMU重连
print("\n测试IMU重连机制...")
imu_manager = IMUManager(mock_socketio)
test_device_reconnection(imu_manager, "IMU")
# 测试FemtoBolt重连
print("\n测试FemtoBolt重连机制...")
femtobolt_manager = FemtoBoltManager(mock_socketio)
test_device_reconnection(femtobolt_manager, "FemtoBolt")
# 测试压力传感器重连
print("\n测试压力传感器重连机制...")
pressure_manager = PressureManager(mock_socketio)
test_device_reconnection(pressure_manager, "压力传感器")
print("\n所有设备重连测试完成!")
print("\n注意事项:")
print("1. 某些设备可能需要物理连接才能成功初始化")
print("2. 重连机制的效果取决于设备的实际可用性")
print("3. 观察日志中的连接监控线程启动和停止信息")
if __name__ == "__main__":
main()

View File

@ -1,214 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
分辨率性能测试脚本
测试不同分辨率下相机配置的性能差异
"""
import sys
import os
import time
import logging
# 添加项目路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from devices.camera_manager import CameraManager
from devices.utils.config_manager import ConfigManager
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_resolution_performance(width, height, test_name):
"""
测试指定分辨率的性能
Args:
width: 宽度
height: 高度
test_name: 测试名称
Returns:
dict: 性能数据
"""
print(f"\n{'='*60}")
print(f"测试 {test_name}: {width}x{height}")
print(f"{'='*60}")
# 创建配置管理器并设置分辨率
config_manager = ConfigManager()
# 获取原始配置
original_config = config_manager.get_device_config('camera')
# 临时设置测试分辨率
test_config = {
'width': width,
'height': height
}
config_manager.set_camera_config(test_config)
try:
# 创建相机管理器
camera = CameraManager(None, config_manager)
# 测试初始化性能
start_time = time.time()
success = camera.initialize()
total_time = (time.time() - start_time) * 1000
if success:
print(f"✅ 初始化成功")
print(f"📊 总耗时: {total_time:.1f}ms ({total_time/1000:.1f}秒)")
# 获取实际分辨率
if camera.cap:
actual_width = int(camera.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
actual_height = int(camera.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(f"🎯 实际分辨率: {actual_width}x{actual_height}")
# 测试首帧获取
frame_start = time.time()
ret, frame = camera.cap.read() if camera.cap else (False, None)
frame_time = (time.time() - frame_start) * 1000
if ret and frame is not None:
print(f"🖼️ 首帧获取: {frame_time:.1f}ms, 帧大小: {frame.shape}")
else:
print(f"❌ 首帧获取失败")
frame_time = -1
# 清理资源
camera.cleanup()
return {
'resolution': f"{width}x{height}",
'success': True,
'total_time': total_time,
'frame_time': frame_time,
'actual_resolution': f"{actual_width}x{actual_height}" if camera.cap else "未知"
}
else:
print(f"❌ 初始化失败")
return {
'resolution': f"{width}x{height}",
'success': False,
'total_time': total_time,
'frame_time': -1,
'actual_resolution': "失败"
}
except Exception as e:
print(f"❌ 测试异常: {e}")
return {
'resolution': f"{width}x{height}",
'success': False,
'total_time': -1,
'frame_time': -1,
'actual_resolution': "异常",
'error': str(e)
}
finally:
# 恢复原始配置
try:
restore_config = {
'width': original_config['width'],
'height': original_config['height']
}
config_manager.set_camera_config(restore_config)
except Exception as e:
print(f"⚠️ 恢复配置失败: {e}")
def main():
"""
主测试函数
"""
print("🚀 开始分辨率性能测试")
# 测试不同分辨率
test_cases = [
(1280, 720, "当前分辨率"),
(640, 480, "标准VGA"),
(320, 240, "QVGA小分辨率"),
(160, 120, "极小分辨率")
]
results = []
for width, height, name in test_cases:
result = test_resolution_performance(width, height, name)
results.append(result)
# 等待一下,避免设备冲突
time.sleep(1)
# 输出汇总结果
print(f"\n{'='*80}")
print("📈 性能测试汇总")
print(f"{'='*80}")
print(f"{'分辨率':<15} {'状态':<8} {'初始化耗时':<12} {'首帧耗时':<10} {'实际分辨率':<15}")
print("-" * 80)
successful_results = []
for result in results:
status = "✅成功" if result['success'] else "❌失败"
init_time = f"{result['total_time']:.1f}ms" if result['total_time'] > 0 else "N/A"
frame_time = f"{result['frame_time']:.1f}ms" if result['frame_time'] > 0 else "N/A"
print(f"{result['resolution']:<15} {status:<8} {init_time:<12} {frame_time:<10} {result['actual_resolution']:<15}")
if result['success'] and result['total_time'] > 0:
successful_results.append(result)
# 性能分析
if len(successful_results) >= 2:
print(f"\n📊 性能分析:")
# 找到最快和最慢的
fastest = min(successful_results, key=lambda x: x['total_time'])
slowest = max(successful_results, key=lambda x: x['total_time'])
print(f"🏆 最快配置: {fastest['resolution']} - {fastest['total_time']:.1f}ms")
print(f"🐌 最慢配置: {slowest['resolution']} - {slowest['total_time']:.1f}ms")
if slowest['total_time'] > fastest['total_time']:
improvement = ((slowest['total_time'] - fastest['total_time']) / slowest['total_time']) * 100
print(f"💡 性能提升: {improvement:.1f}% (使用最小分辨率)")
# 基准对比
baseline = next((r for r in successful_results if "1280x720" in r['resolution']), None)
if baseline:
print(f"\n📋 相对于当前分辨率(1280x720)的性能对比:")
for result in successful_results:
if result != baseline:
if result['total_time'] < baseline['total_time']:
improvement = ((baseline['total_time'] - result['total_time']) / baseline['total_time']) * 100
print(f" {result['resolution']}: 快 {improvement:.1f}% ({result['total_time']:.1f}ms vs {baseline['total_time']:.1f}ms)")
else:
degradation = ((result['total_time'] - baseline['total_time']) / baseline['total_time']) * 100
print(f" {result['resolution']}: 慢 {degradation:.1f}% ({result['total_time']:.1f}ms vs {baseline['total_time']:.1f}ms)")
print(f"\n🎯 建议:")
if successful_results:
fastest = min(successful_results, key=lambda x: x['total_time'])
if fastest['total_time'] < 3000: # 小于3秒
print(f"✅ 推荐使用 {fastest['resolution']} 以获得最佳性能")
else:
print(f"⚠️ 即使最快的分辨率 {fastest['resolution']} 仍需 {fastest['total_time']:.1f}ms")
print(f" 建议考虑其他优化方案(如更换相机后端)")
else:
print(f"❌ 所有测试都失败了,请检查相机连接")
print(f"\n{'='*80}")
print("测试完成")
print(f"{'='*80}")
if __name__ == "__main__":
import cv2 # 在这里导入cv2避免在函数中导入
main()

View File

@ -1,126 +0,0 @@
import cv2
class CameraViewer:
def __init__(self, device_index=0):
self.device_index = device_index
self.window_name = "Camera Viewer"
def start_stream(self):
cap = cv2.VideoCapture(self.device_index)
if not cap.isOpened():
print(f"无法打开摄像头设备 {self.device_index}")
return
cv2.namedWindow(self.window_name, cv2.WINDOW_NORMAL)
while True:
ret, frame = cap.read()
if not ret:
print("无法获取视频帧")
break
cv2.imshow(self.window_name, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if __name__ == "__main__":
# 修改这里的数字可以切换不同摄像头设备
viewer = CameraViewer(device_index=3)
viewer.start_stream()
# import ctypes
# from ctypes import c_int, c_uint16, c_uint8, c_char, c_char_p, Structure, POINTER, byref
# # 设备结构体对应wrapper中FPMS_DEVICE_C
# class FPMS_DEVICE_C(Structure):
# _pack_ = 1
# _fields_ = [
# ("mn", c_uint16),
# ("sn", c_char * 64),
# ("fwVersion", c_uint16),
# ("protoVer", c_uint8),
# ("pid", c_uint16),
# ("vid", c_uint16),
# ("rows", c_uint16),
# ("cols", c_uint16),
# ]
# # 加载DLL
# dll_path = r"D:\BodyBalanceEvaluation\backend\SMiTSenseUsbWrapper.dll"
# dll = ctypes.windll.LoadLibrary(dll_path)
# # 函数原型声明
# # int fpms_usb_init_c(int debugFlag);
# dll.fpms_usb_init_c.argtypes = [c_int]
# dll.fpms_usb_init_c.restype = c_int
# dll.fpms_usb_get_device_list_c.argtypes = [POINTER(FPMS_DEVICE_C), c_int]
# dll.fpms_usb_get_device_list_c.restype = c_int
# dll.fpms_usb_open_c.argtypes = [POINTER(FPMS_DEVICE_C), POINTER(ctypes.c_void_p)]
# dll.fpms_usb_open_c.restype = c_int
# # int fpms_usb_read_frame_c(void* handle, uint16_t* frame);
# dll.fpms_usb_read_frame_c.argtypes = [ctypes.c_void_p, POINTER(c_uint16)]
# dll.fpms_usb_read_frame_c.restype = c_int
# # int fpms_usb_close_c(void* handle);
# dll.fpms_usb_close_c.argtypes = [ctypes.c_void_p]
# dll.fpms_usb_close_c.restype = c_int
# # 其他函数如果需要可以类似声明
# def main():
# # 初始化
# ret = dll.fpms_usb_init_c(0)
# print(f"fpms_usb_init_c 返回值: {ret}")
# if ret != 0:
# print("初始化失败")
# return
# MAX_DEVICES = 8
# devices = (FPMS_DEVICE_C * MAX_DEVICES)() # 创建数组
# count = dll.fpms_usb_get_device_list_c(devices, MAX_DEVICES)
# print(f"设备数量: {count}")
# if count <= 0:
# print("未找到设备或错误")
# return
# for i in range(count):
# dev = devices[i]
# print(f"设备{i}: mn={dev.mn}, sn={dev.sn.decode(errors='ignore').rstrip(chr(0))}, fwVersion={dev.fwVersion}")
# # 打开第一个设备
# handle = ctypes.c_void_p()
# ret = dll.fpms_usb_open_c(byref(devices[0]), byref(handle))
# print(f"fpms_usb_open_c 返回值: {ret}")
# if ret != 0:
# print("打开设备失败")
# return
# # 假设帧大小是 rows * cols
# rows = devices[0].rows
# cols = devices[0].cols
# frame_size = rows * cols
# frame_buffer = (c_uint16 * frame_size)()
# ret = dll.fpms_usb_read_frame_c(handle, frame_buffer)
# print(f"fpms_usb_read_frame_c 返回值: {ret}")
# if ret == 0:
# # 打印前10个数据看看
# print("帧数据前10个点:", list(frame_buffer[:10]))
# else:
# print("读取帧失败")
# # 关闭设备
# ret = dll.fpms_usb_close_c(handle)
# print(f"fpms_usb_close_c 返回值: {ret}")
# if __name__ == "__main__":
# main()

View File

@ -35,7 +35,7 @@ chart_dpi = 300
export_format = csv
[SECURITY]
secret_key = 332fe6a0e5b58a60e61eeee09cad362a7c47051202db7fa334256c2527371ecf
secret_key = 74d2f5ad774b449e6958cc5d30d77411c3560c9d0279e48154a847b744688989
session_timeout = 3600
max_login_attempts = 5

View File

@ -0,0 +1,41 @@
[APP]
name = Body Balance Evaluation System
version = 1.0.0
debug = false
log_level = INFO
[SERVER]
host = 0.0.0.0
port = 5000
cors_origins = *
[DATABASE]
path = backend/data/body_balance.db
backup_interval = 24
max_backups = 7
[DEVICES]
camera_index = 0
camera_width = 640
camera_height = 480
camera_fps = 30
imu_port = COM3
pressure_port = COM4
[DETECTION]
default_duration = 60
sampling_rate = 30
balance_threshold = 0.2
posture_threshold = 5.0
[DATA_PROCESSING]
filter_window = 5
outlier_threshold = 2.0
chart_dpi = 300
export_format = csv
[SECURITY]
secret_key = c4939b252df4fff97f62644697ab798d7c0ccff8a8d9a592d0ffeb7675a44f92
session_timeout = 3600
max_login_attempts = 5

View File

@ -1,301 +0,0 @@
# 设备管理优化方案
## 1. 现状分析
### 1.1 当前架构问题
当前的 `device_manager.py` 文件3694行存在以下问题
1. **单一职责原则违反**:一个类管理四种不同类型的设备
2. **代码耦合度高**:设备间相互依赖,一个设备故障可能影响其他设备
3. **维护困难**:代码量庞大,修改一个设备功能可能影响其他设备
4. **性能瓶颈**:所有设备共享同一个推流线程池,资源竞争严重
5. **扩展性差**:添加新设备类型需要修改核心管理器
6. **测试复杂**:单元测试需要模拟所有设备
### 1.2 当前设备类型
- **FemtoBolt深度相机**:负责身体姿态检测和深度图像采集
- **普通相机**:负责足部监控视频流
- **IMU传感器**:负责头部姿态数据采集
- **压力板传感器**:负责足底压力数据采集
## 2. 优化方案设计
### 2.1 架构设计原则
1. **单一职责原则**:每个设备类只负责自身的管理
2. **开闭原则**:对扩展开放,对修改封闭
3. **依赖倒置原则**:依赖抽象而非具体实现
4. **接口隔离原则**:设备间通过标准接口通信
### 2.2 目标架构
```
设备管理系统
├── 抽象基类 (BaseDevice)
├── FemtoBolt深度相机管理器 (FemtoBoltManager)
├── 普通相机管理器 (CameraManager)
├── IMU传感器管理器 (IMUManager)
├── 压力板管理器 (PressureManager)
└── 设备协调器 (DeviceCoordinator)
```
### 2.3 文件结构
```
backend/devices/
├── __init__.py
├── base_device.py # 抽象基类
├── femtobolt_manager.py # FemtoBolt深度相机管理
├── camera_manager.py # 普通相机管理
├── imu_manager.py # IMU传感器管理
├── pressure_manager.py # 压力板管理
├── device_coordinator.py # 设备协调器
└── utils/
├── __init__.py
├── socket_manager.py # Socket连接管理
└── config_manager.py # 配置管理
```
## 3. 详细设计
### 3.1 抽象基类设计
```python
# base_device.py
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional
import threading
import logging
class BaseDevice(ABC):
"""设备抽象基类"""
def __init__(self, device_name: str, config: Dict[str, Any]):
self.device_name = device_name
self.config = config
self.is_connected = False
self.is_streaming = False
self.socket_namespace = f"/{device_name}"
self.logger = logging.getLogger(f"device.{device_name}")
self._lock = threading.RLock()
@abstractmethod
def initialize(self) -> bool:
"""初始化设备"""
pass
@abstractmethod
def calibrate(self) -> Dict[str, Any]:
"""校准设备"""
pass
@abstractmethod
def start_streaming(self, socketio) -> bool:
"""启动数据推流"""
pass
@abstractmethod
def stop_streaming(self) -> bool:
"""停止数据推流"""
pass
@abstractmethod
def get_status(self) -> Dict[str, Any]:
"""获取设备状态"""
pass
@abstractmethod
def cleanup(self) -> None:
"""清理资源"""
pass
```
### 3.2 FemtoBolt深度相机管理器
```python
# femtobolt_manager.py
class FemtoBoltManager(BaseDevice):
"""FemtoBolt深度相机管理器"""
def __init__(self, config: Dict[str, Any]):
super().__init__("femtobolt", config)
self.camera = None
self.streaming_thread = None
self.frame_cache = {}
def initialize(self) -> bool:
"""初始化FemtoBolt深度相机"""
try:
# FemtoBolt初始化逻辑
return True
except Exception as e:
self.logger.error(f"FemtoBolt初始化失败: {e}")
return False
def start_streaming(self, socketio) -> bool:
"""启动深度图像推流"""
# 独立的Socket.IO命名空间
# 独立的推流线程
pass
```
### 3.3 设备协调器
```python
# device_coordinator.py
class DeviceCoordinator:
"""设备协调器 - 管理所有设备的生命周期"""
def __init__(self):
self.devices = {}
self.socketio = None
def register_device(self, device: BaseDevice):
"""注册设备"""
self.devices[device.device_name] = device
def initialize_all(self) -> Dict[str, bool]:
"""初始化所有设备"""
results = {}
for name, device in self.devices.items():
results[name] = device.initialize()
return results
def start_all_streaming(self) -> Dict[str, bool]:
"""启动所有设备推流"""
results = {}
for name, device in self.devices.items():
if device.is_connected:
results[name] = device.start_streaming(self.socketio)
return results
```
## 4. 优势分析
### 4.1 性能优势
1. **并行处理**每个设备独立的Socket.IO命名空间减少数据传输冲突
2. **资源隔离**:每个设备独立的线程池,避免资源竞争
3. **内存优化**:设备级别的缓存管理,减少内存占用
4. **故障隔离**:单个设备故障不影响其他设备运行
### 4.2 开发优势
1. **代码可维护性**每个设备类代码量控制在500-800行
2. **团队协作**:不同开发者可以并行开发不同设备
3. **单元测试**:每个设备可以独立测试
4. **版本控制**:设备功能变更影响范围小
### 4.3 扩展优势
1. **新设备接入**只需实现BaseDevice接口
2. **功能扩展**:设备功能扩展不影响其他设备
3. **配置管理**:每个设备独立配置文件
4. **部署灵活**:可以选择性部署某些设备
## 5. 劣势分析
### 5.1 复杂性增加
1. **架构复杂度**:从单一类变为多类协作
2. **通信开销**:设备间通信需要额外的协调机制
3. **状态同步**:多设备状态同步复杂度增加
### 5.2 开发成本
1. **重构工作量**:需要大量重构现有代码
2. **测试工作量**:需要重新设计集成测试
3. **文档更新**需要更新相关文档和API
### 5.3 运维复杂度
1. **监控复杂**:需要监控多个独立服务
2. **故障排查**:跨设备问题排查难度增加
3. **配置管理**:多个配置文件管理复杂
## 6. 实施方案
### 6.1 分阶段实施
#### 第一阶段基础架构搭建1-2周
- 创建抽象基类和工具类
- 设计Socket.IO命名空间方案
- 搭建设备协调器框架
#### 第二阶段设备迁移3-4周
- 按优先级迁移设备Camera → IMU → Pressure → FemtoBolt
- 每个设备迁移后进行充分测试
- 保持向后兼容性
#### 第三阶段优化和集成1-2周
- 性能优化和内存管理
- 集成测试和压力测试
- 文档更新和代码审查
### 6.2 风险控制
1. **渐进式迁移**:保留原有代码作为备份
2. **功能开关**:通过配置控制使用新旧架构
3. **充分测试**:每个阶段都进行完整测试
4. **回滚方案**:准备快速回滚到原架构的方案
### 6.3 Socket.IO命名空间设计
```javascript
// 前端连接示例
const cameraSocket = io('/camera');
const femtoboltSocket = io('/femtobolt');
const imuSocket = io('/imu');
const pressureSocket = io('/pressure');
// 独立的事件监听
cameraSocket.on('video_frame', handleCameraFrame);
femtoboltSocket.on('depth_frame', handleDepthFrame);
imuSocket.on('imu_data', handleIMUData);
pressureSocket.on('pressure_data', handlePressureData);
```
## 7. 性能预期
### 7.1 性能提升预期
- **并发处理能力**提升40-60%
- **内存使用效率**降低20-30%
- **故障恢复时间**减少50-70%
- **开发效率**提升30-50%
### 7.2 资源消耗
- **CPU使用**可能增加5-10%(多线程开销)
- **内存使用**减少20-30%(更好的缓存管理)
- **网络带宽**:基本持平(优化的数据传输)
## 8. 结论和建议
### 8.1 可行性评估
**高度可行** - 该优化方案在技术上完全可行,且能显著改善系统的可维护性和性能。
### 8.2 推荐实施
**强烈推荐** - 考虑到当前代码的复杂度和未来的扩展需求,建议尽快实施该优化方案。
### 8.3 关键成功因素
1. **充分的测试**:确保每个阶段都有完整的测试覆盖
2. **团队协作**:需要前后端团队密切配合
3. **渐进式实施**:避免一次性大规模重构的风险
4. **性能监控**:实施过程中持续监控系统性能
### 8.4 后续优化方向
1. **微服务化**:将设备管理器进一步拆分为独立的微服务
2. **容器化部署**使用Docker容器化部署各个设备服务
3. **负载均衡**:为高负载设备添加负载均衡机制
4. **监控告警**:建立完善的设备监控和告警系统
---
*本优化方案基于对现有代码的深入分析,结合软件工程最佳实践制定。实施过程中应根据实际情况灵活调整。*