This commit is contained in:
zhaozilong12 2025-08-13 17:11:09 +08:00
commit dcb06c996b
53 changed files with 12819 additions and 368589 deletions

9970
.gitignore vendored

File diff suppressed because it is too large Load Diff

View File

@ -1,243 +0,0 @@
# 调试指南
本文档介绍如何在开发过程中进行调试,包括断点调试、日志调试等方法。
## 调试方式概览
### 1. 批处理文件调试 (推荐新手)
```bash
# 启动调试模式
./start_debug.bat
```
### 2. Python脚本调试 (推荐)
```bash
# 直接运行调试服务器
python debug_server.py
```
### 3. VS Code调试 (推荐开发者)
- 打开VS Code
- 按F5或点击调试按钮
- 选择"Debug Backend Server"配置
### 4. 命令行调试
```bash
# 激活虚拟环境
backend\venv\Scripts\activate
# 设置环境变量
set FLASK_ENV=development
set FLASK_DEBUG=1
set PYTHONPATH=%cd%
# 启动调试服务器
python -u backend\app.py
```
## 详细调试方法
### VS Code 断点调试
1. **安装Python扩展**
- 确保VS Code已安装Python扩展
2. **打开项目**
```bash
code .
```
3. **设置断点**
- 在代码行号左侧点击设置断点
- 红色圆点表示断点已设置
4. **启动调试**
- 按F5或点击调试面板的播放按钮
- 选择"Debug Backend Server"配置
5. **调试操作**
- F10: 单步跳过
- F11: 单步进入
- Shift+F11: 单步跳出
- F5: 继续执行
- Shift+F5: 停止调试
### PyCharm 调试
1. **打开项目**
- File -> Open -> 选择项目目录
2. **配置Python解释器**
- File -> Settings -> Project -> Python Interpreter
- 选择backend/venv/Scripts/python.exe
3. **创建运行配置**
- Run -> Edit Configurations
- 添加新的Python配置
- Script path: debug_server.py
- Working directory: 项目根目录
4. **设置断点并调试**
- 点击行号左侧设置断点
- 点击调试按钮启动
### 命令行调试
1. **使用pdb调试器**
```python
import pdb
pdb.set_trace() # 在需要调试的地方插入
```
2. **使用ipdb (增强版pdb)**
```bash
pip install ipdb
```
```python
import ipdb
ipdb.set_trace()
```
## 调试配置说明
### 环境变量
- `FLASK_ENV=development`: 启用开发模式
- `FLASK_DEBUG=1`: 启用调试模式
- `PYTHONPATH`: 设置Python模块搜索路径
### 调试端口
- 后端服务: http://127.0.0.1:5000
- 健康检查: http://127.0.0.1:5000/health
- WebSocket: ws://127.0.0.1:5000/socket.io/
## 常见调试场景
### 1. WebSocket连接问题
```python
# 在handle_connect函数中设置断点
@socketio.on('connect')
def handle_connect():
print(f'客户端连接: {request.sid}') # 添加调试输出
# 设置断点在这里
emit('connect_status', {'status': 'connected'})
```
### 2. RTSP流问题
```python
# 在generate_video_frames函数中设置断点
def generate_video_frames():
print(f'RTSP URL: {rtsp_url}') # 调试输出
# 设置断点检查rtsp_url值
cap = cv2.VideoCapture(rtsp_url)
```
### 3. API请求问题
```python
# 在API路由中设置断点
@app.route('/api/patients', methods=['GET'])
def get_patients():
print(f'请求参数: {request.args}') # 调试输出
# 设置断点检查请求参数
```
### 4. 数据库操作问题
```python
# 在数据库操作中设置断点
def get_patients(self, page, size, keyword):
print(f'查询参数: page={page}, size={size}, keyword={keyword}')
# 设置断点检查SQL查询
```
## 日志调试
### 查看日志文件
```bash
# 实时查看日志
tail -f logs/debug.log
tail -f logs/backend.log
```
### 调整日志级别
```python
# 在代码中临时调整日志级别
import logging
logging.getLogger().setLevel(logging.DEBUG)
```
## 前端调试
### 浏览器开发者工具
1. 按F12打开开发者工具
2. Console标签页查看JavaScript错误
3. Network标签页查看网络请求
4. WebSocket连接在Network -> WS中查看
### 前端调试技巧
```javascript
// 在浏览器控制台中测试WebSocket连接
const socket = io('http://127.0.0.1:5000');
socket.on('connect', () => console.log('连接成功'));
socket.emit('start_video', {});
```
## 性能调试
### 使用cProfile
```bash
python -m cProfile -o profile_output.prof debug_server.py
```
### 内存使用监控
```bash
pip install memory-profiler
python -m memory_profiler debug_server.py
```
## 故障排除
### 常见问题
1. **端口被占用**
```bash
netstat -ano | findstr :5000
taskkill /PID <PID> /F
```
2. **模块导入错误**
- 检查PYTHONPATH设置
- 确认虚拟环境已激活
3. **权限问题**
- 以管理员身份运行
- 检查文件夹权限
4. **依赖包问题**
```bash
pip install -r backend/requirements.txt --force-reinstall
```
### 调试检查清单
- [ ] Python虚拟环境已激活
- [ ] 所有依赖包已安装
- [ ] 环境变量设置正确
- [ ] 端口5000未被占用
- [ ] config.ini文件存在且配置正确
- [ ] 日志文件可以正常写入
- [ ] 断点设置在正确位置
## 调试最佳实践
1. **逐步调试**: 从简单的断点开始,逐步深入
2. **日志记录**: 在关键位置添加详细的日志输出
3. **单元测试**: 编写测试用例验证功能
4. **代码审查**: 定期检查代码逻辑
5. **版本控制**: 使用Git跟踪代码变更
## 获取帮助
如果遇到调试问题,可以:
1. 查看logs目录下的日志文件
2. 检查控制台输出信息
3. 使用浏览器开发者工具
4. 参考项目文档和代码注释

File diff suppressed because it is too large Load Diff

View File

@ -1,20 +0,0 @@
[08/06 16:05:29.947597][debug][29212][Context.cpp:30] Context creating, work_dir=D:\Trae_space\BodyBalanceEvaluation\backend
[08/06 16:05:29.947770][debug][29212][Context.cpp:49] Config file version=1.1
[08/06 16:05:29.947818][debug][29212][FrameBufferManager.cpp:23] Max global frame buffer size updated! size=2048.000MB
[08/06 16:05:29.947857][info][29212][Context.cpp:68] Context created with config: default config!
[08/06 16:05:29.948048][info][29212][Context.cpp:73] Work directory=D:\Trae_space\BodyBalanceEvaluation\backend, SDK version=v1.10.11-20240724-aeaa107e5
[08/06 16:05:29.948256][debug][29212][DeviceManager.cpp:30] DeviceManager init ...
[08/06 16:05:29.948529][info][29212][MfPal.cpp:105] createObPal: create WinPal!
[08/06 16:05:29.948659][debug][29212][MfPal.cpp:110] WmfPal init ...
[08/06 16:05:29.984405][debug][29212][MfPal.cpp:117] WmfPal created!
[08/06 16:05:29.984501][debug][29212][DeviceManager.cpp:34] Enable USB Device Enumerator ...
[08/06 16:05:30.020645][debug][29212][EnumeratorLibusb.cpp:321] queryDevicesInfo done!
[08/06 16:05:30.021037][debug][29212][MfPal.cpp:216] Create WinEventDeviceWatcher!
[08/06 16:05:30.021274][debug][29212][UsbDeviceEnumerator.cpp:78] No matched usb device found!
[08/06 16:05:30.021308][info][29212][DeviceManager.cpp:15] Current found device(s): (0)
[08/06 16:05:30.021507][debug][29212][DeviceManager.cpp:52] DeviceManager construct done!
[08/06 16:05:30.021539][debug][29212][Context.cpp:81] Context destroying ...
[08/06 16:05:30.021552][debug][29212][DeviceManager.cpp:56] DeviceManager destroy ...
[08/06 16:05:30.021562][debug][29212][DeviceManager.cpp:64] DeviceManager Destructors done
[08/06 16:05:30.030937][debug][29212][MfPal.cpp:128] WmfPal destroyed!
[08/06 16:05:30.031392][info][29212][Context.cpp:84] Context destroyed

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -30,12 +30,24 @@ from database import DatabaseManager
from device_manager import DeviceManager, VideoStreamManager
from utils import config as app_config
# 确定日志文件路径
if getattr(sys, 'frozen', False):
# 如果是打包后的exe日志文件在exe同目录下的logs文件夹
log_dir = os.path.join(os.path.dirname(sys.executable), 'logs')
else:
# 如果是开发环境使用当前目录的logs文件夹
log_dir = 'logs'
# 确保日志目录存在
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, 'backend.log')
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/backend.log', encoding='utf-8'),
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
@ -44,11 +56,28 @@ logger = logging.getLogger(__name__)
# 创建Flask应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'body-balance-detection-system-2024'
socketio = SocketIO(app,
cors_allowed_origins='*',
async_mode='threading',
logger=False,
engineio_logger=False)
# 初始化SocketIO
try:
socketio = SocketIO(
app,
cors_allowed_origins='*',
async_mode='threading',
logger=False,
engineio_logger=False,
ping_timeout=60,
ping_interval=25,
manage_session=False,
always_connect=False,
transports=['polling', 'websocket'], # 优先使用polling
allow_upgrades=True, # 允许升级到websocket
cookie=None # 禁用cookie
)
logger.info('SocketIO初始化成功')
except Exception as e:
logger.error(f'SocketIO初始化失败: {e}')
socketio = None
import logging
logging.getLogger('socketio').setLevel(logging.WARNING)
@ -61,7 +90,16 @@ CORS(app, origins='*', supports_credentials=True, allow_headers=['Content-Type',
# 读取RTSP配置
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), '..', 'config.ini'), encoding='utf-8')
# 确定配置文件路径
if getattr(sys, 'frozen', False):
# 如果是打包后的exe配置文件在exe同目录下
config_path = os.path.join(os.path.dirname(sys.executable), 'config.ini')
else:
# 如果是开发环境,配置文件在上级目录
config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
config.read(config_path, encoding='utf-8')
device_index = config.get('CAMERA', 'device_index', fallback=None)
# 全局变量
@ -78,20 +116,28 @@ def init_app():
global db_manager, device_manager, video_stream_manager
try:
# 确定基础目录
if getattr(sys, 'frozen', False):
# 如果是打包后的exe使用exe同目录
base_dir = os.path.dirname(sys.executable)
else:
# 如果是开发环境,使用当前脚本目录
base_dir = os.path.dirname(os.path.abspath(__file__))
# 创建必要的目录
os.makedirs('logs', exist_ok=True)
os.makedirs('data', exist_ok=True)
logs_dir = os.path.join(base_dir, 'logs')
data_dir = os.path.join(base_dir, 'data')
os.makedirs(logs_dir, exist_ok=True)
os.makedirs(data_dir, exist_ok=True)
# 从配置文件读取数据库路径
db_path_config = app_config.get('DATABASE', 'path', 'backend/data/body_balance.db')
# 如果是相对路径,基于当前脚本目录解析
db_path_config = app_config.get('DATABASE', 'path', 'data/body_balance.db')
# 如果是相对路径,基于基础目录解析
if not os.path.isabs(db_path_config):
# 获取当前脚本所在目录backend目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 如果配置路径以 'backend/' 开头,去掉这个前缀
if db_path_config.startswith('backend/'):
db_path_config = db_path_config[8:] # 去掉 'backend/' 前缀
db_path = os.path.join(current_dir, db_path_config)
db_path = os.path.join(base_dir, db_path_config)
else:
db_path = db_path_config
@ -99,16 +145,51 @@ def init_app():
db_dir = os.path.dirname(db_path)
os.makedirs(db_dir, exist_ok=True)
# 输出数据库路径信息
print(f"\n=== 系统初始化 ===")
print(f"数据库配置路径: {db_path_config}")
print(f"数据库实际路径: {db_path}")
print(f"数据库目录: {db_dir}")
print(f"当前工作目录: {os.getcwd()}")
print(f"数据库文件存在: {'' if os.path.exists(db_path) else ''}")
print(f"==================\n")
# 初始化数据库
db_manager = DatabaseManager(db_path)
db_manager.init_database()
# 初始化设备管理器
# 初始化设备管理器(不自动初始化设备)
device_manager = DeviceManager(db_manager)
device_manager.set_socketio(socketio) # 设置WebSocket连接
# 初始化视频流管理器
video_stream_manager = VideoStreamManager(socketio, device_manager)
if socketio is not None:
logger.info('SocketIO已启用')
device_manager.set_socketio(socketio) # 设置WebSocket连接
# 初始化视频流管理器
video_stream_manager = VideoStreamManager(socketio, device_manager)
else:
logger.info('SocketIO未启用跳过WebSocket相关初始化')
video_stream_manager = None
# 可选:在后台线程中初始化设备,避免阻塞应用启动
def init_devices_async():
try:
device_manager._init_devices()
logger.info('后台设备初始化完成')
except Exception as e:
logger.error(f'后台设备初始化失败: {e}')
# 启动后台设备初始化线程
device_init_thread = threading.Thread(target=init_devices_async, daemon=True)
device_init_thread.start()
socketio.run(
app,
host='0.0.0.0', # 允许所有IP访问
port=5000,
debug=True,
use_reloader=False, # 禁用热重载以避免FemtoBolt设备资源冲突
log_output=True, # 输出详细日志
allow_unsafe_werkzeug=True
)
logger.info('应用初始化完成')
except Exception as e:
@ -120,13 +201,18 @@ def init_app():
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
"""健康检查接口"""
return jsonify({
'status': 'ok',
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0'
})
@app.route('/test-socketio')
def test_socketio():
"""SocketIO连接测试页面"""
return send_file('test_socketio_connection.html')
@app.route('/api/health', methods=['GET'])
def api_health_check():
"""API健康检查"""
@ -591,56 +677,6 @@ def calibrate_imu():
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 视频推流API ====================
@app.route('/api/streaming/start', methods=['POST'])
def start_video_streaming():
"""启动视频推流"""
try:
if not device_manager:
return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500
# 设置WebSocket连接
device_manager.set_socketio(socketio)
result = device_manager.start_streaming()
logger.info(f'视频推流启动结果: {result}')
return jsonify({
'success': True,
'message': '视频推流已启动',
'streaming_status': result
})
except Exception as e:
logger.error(f'启动视频推流失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/streaming/stop', methods=['POST'])
def stop_video_streaming():
"""停止视频推流"""
try:
if not device_manager:
return jsonify({'success': False, 'error': '设备管理器未初始化'}), 500
result = device_manager.stop_streaming()
if result:
logger.info('视频推流已停止')
return jsonify({
'success': True,
'message': '视频推流已停止'
})
else:
return jsonify({
'success': False,
'error': '停止推流失败'
}), 500
except Exception as e:
logger.error(f'停止视频推流失败: {e}')
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 检测API ====================
@app.route('/api/detection/start', methods=['POST'])
@ -995,115 +1031,77 @@ def get_detection_sessions():
return jsonify({'success': False, 'error': str(e)}), 500
# ==================== 错误处理 ====================
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': 'API接口不存在'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'error': '服务器内部错误'}), 500
if __name__ == '__main__':
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend')
parser.add_argument('--host', default=None, help='Host address to bind to')
parser.add_argument('--port', type=int, default=None, help='Port number to bind to')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
try:
# 初始化应用
init_app()
# 确定主机和端口
host = args.host if args.host else config.get('SERVER', 'host', fallback='127.0.0.1')
port = args.port if args.port else config.getint('SERVER', 'port', fallback=5000)
debug = args.debug if args.debug else config.getboolean('APP', 'debug', fallback=False)
# 启动Flask+SocketIO服务
logger.info(f'启动后端服务... Host: {host}, Port: {port}, Debug: {debug}')
socketio.run(app,
host=host,
port=port,
debug=debug,
use_reloader=False, # 禁用热重载以避免进程问题
log_output=True, # 启用详细日志
allow_unsafe_werkzeug=True
)
except KeyboardInterrupt:
logger.info('服务被用户中断')
except Exception as e:
logger.error(f'服务启动失败: {e}')
sys.exit(1)
finally:
logger.info('后端服务已停止')
# ==================== WebSocket 事件处理 ====================
# 简单的测试事件处理器
@socketio.on('connect')
def handle_connect():
# print('CLIENT CONNECTED!!!', flush=True) # 控制台打印测试已关闭
logger.info('客户端已连接')
# 只有当socketio不为None时才注册事件处理器
if socketio is not None:
# 简单的测试事件处理器
@socketio.on('connect')
def handle_connect():
# print('CLIENT CONNECTED!!!', flush=True) # 控制台打印测试已关闭
logger.info('客户端已连接')
return True
@socketio.on('disconnect')
def handle_disconnect():
# print('CLIENT DISCONNECTED!!!', flush=True) # 控制台打印测试已关闭
logger.info('客户端已断开连接')
@socketio.on('disconnect')
def handle_disconnect():
# print('CLIENT DISCONNECTED!!!', flush=True) # 控制台打印测试已关闭
logger.info('客户端已断开连接')
# 原始的start_video处理逻辑暂时注释
@socketio.on('start_video_stream')
def handle_start_video(data=None):
try:
results = {'status': 'success', 'cameras': {}}
logger.info(f'video_stream_manager状态: {video_stream_manager is not None}')
logger.info(f'device_manager状态: {device_manager is not None}')
# 启动视频流管理器(普通摄像头)
if video_stream_manager:
logger.info('正在启动视频流管理器...')
video_result = video_stream_manager.start_video_stream()
logger.info(f'视频流管理器启动结果: {video_result}')
results['cameras']['normal'] = video_result
else:
logger.error('视频流管理器未初始化')
results['cameras']['normal'] = {'status': 'error', 'message': '视频流管理器未初始化'}
# 启动FemtoBolt深度相机
if device_manager:
logger.info('正在启动FemtoBolt深度相机...')
femtobolt_result = device_manager.start_femtobolt_stream()
logger.info(f'FemtoBolt启动结果: {femtobolt_result}')
if femtobolt_result:
results['cameras']['femtobolt'] = {'status': 'success', 'message': 'FemtoBolt深度相机推流已启动'}
# 原始的start_video处理逻辑暂时注释
@socketio.on('start_video_stream')
def handle_start_video(data=None):
try:
results = {'status': 'success', 'cameras': {}}
logger.info(f'video_stream_manager状态: {video_stream_manager is not None}')
logger.info(f'device_manager状态: {device_manager is not None}')
# 启动视频流管理器(普通摄像头)
if video_stream_manager:
logger.info('正在启动视频流管理器...')
video_result = video_stream_manager.start_video_stream()
logger.info(f'视频流管理器启动结果: {video_result}')
results['cameras']['normal'] = video_result
else:
results['cameras']['femtobolt'] = {'status': 'error', 'message': 'FemtoBolt深度相机启动失败'}
else:
logger.error('设备管理器未初始化')
results['cameras']['femtobolt'] = {'status': 'error', 'message': '设备管理器未初始化'}
# 检查是否有任何相机启动成功
success_count = sum(1 for cam_result in results['cameras'].values() if cam_result.get('status') == 'success')
if success_count == 0:
results['status'] = 'error'
results['message'] = '所有相机启动失败'
elif success_count < len(results['cameras']):
results['status'] = 'partial'
results['message'] = f'{success_count}/{len(results["cameras"])}个相机启动成功'
else:
results['message'] = '所有相机启动成功'
logger.info(f'发送video_status事件: {results}')
emit('video_status', results)
except Exception as e:
logger.error(f'启动视频流失败: {e}', exc_info=True)
emit('video_status', {'status': 'error', 'message': f'启动失败: {str(e)}'})
logger.error('视频流管理器未初始化')
results['cameras']['normal'] = {'status': 'error', 'message': '视频流管理器未初始化'}
# 启动FemtoBolt深度相机
if device_manager:
logger.info('正在启动FemtoBolt深度相机...')
femtobolt_result = device_manager.start_femtobolt_stream()
logger.info(f'FemtoBolt启动结果: {femtobolt_result}')
if femtobolt_result:
results['cameras']['femtobolt'] = {'status': 'success', 'message': 'FemtoBolt深度相机推流已启动'}
else:
results['cameras']['femtobolt'] = {'status': 'error', 'message': 'FemtoBolt深度相机启动失败'}
else:
logger.error('设备管理器未初始化')
results['cameras']['femtobolt'] = {'status': 'error', 'message': '设备管理器未初始化'}
# 检查是否有任何相机启动成功
success_count = sum(1 for cam_result in results['cameras'].values() if cam_result.get('status') == 'success')
if success_count == 0:
results['status'] = 'error'
results['message'] = '所有相机启动失败'
elif success_count < len(results['cameras']):
results['status'] = 'partial'
results['message'] = f'{success_count}/{len(results["cameras"])}个相机启动成功'
else:
results['message'] = '所有相机启动成功'
logger.info(f'发送video_status事件: {results}')
try:
emit('video_status', results)
except Exception as emit_error:
logger.error(f'发送video_status事件失败: {emit_error}')
except Exception as e:
logger.error(f'启动视频流失败: {e}', exc_info=True)
try:
emit('video_status', {'status': 'error', 'message': f'启动失败: {str(e)}'})
except Exception as emit_error:
logger.error(f'发送错误状态失败: {emit_error}')
@socketio.on('stop_video_stream')
def handle_stop_video(data=None):
@ -1137,11 +1135,17 @@ def handle_stop_video(data=None):
else:
results['message'] = '所有相机停止成功'
emit('video_status', results)
try:
emit('video_status', results)
except Exception as emit_error:
logger.error(f'发送video_status事件失败: {emit_error}')
except Exception as e:
logger.error(f'停止视频流失败: {e}')
emit('video_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
try:
emit('video_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
except Exception as emit_error:
logger.error(f'发送错误状态失败: {emit_error}')
@socketio.on('start_imu_streaming')
def handle_start_imu_streaming(data=None):
@ -1150,17 +1154,29 @@ def handle_start_imu_streaming(data=None):
if device_manager:
result = device_manager.start_imu_streaming()
if result:
emit('imu_status', {'status': 'success', 'message': 'IMU头部姿态数据推流已启动'})
try:
emit('imu_status', {'status': 'success', 'message': 'IMU头部姿态数据推流已启动'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.info('IMU头部姿态数据推流已启动')
else:
emit('imu_status', {'status': 'error', 'message': 'IMU头部姿态数据推流启动失败'})
try:
emit('imu_status', {'status': 'error', 'message': 'IMU头部姿态数据推流启动失败'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.error('IMU头部姿态数据推流启动失败')
else:
emit('imu_status', {'status': 'error', 'message': '设备管理器未初始化'})
try:
emit('imu_status', {'status': 'error', 'message': '设备管理器未初始化'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.error('设备管理器未初始化')
except Exception as e:
logger.error(f'启动IMU数据推流失败: {e}')
emit('imu_status', {'status': 'error', 'message': f'启动失败: {str(e)}'})
try:
emit('imu_status', {'status': 'error', 'message': f'启动失败: {str(e)}'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
@socketio.on('stop_imu_streaming')
def handle_stop_imu_streaming(data=None):
@ -1169,17 +1185,29 @@ def handle_stop_imu_streaming(data=None):
if device_manager:
result = device_manager.stop_imu_streaming()
if result:
emit('imu_status', {'status': 'success', 'message': 'IMU头部姿态数据推流已停止'})
try:
emit('imu_status', {'status': 'success', 'message': 'IMU头部姿态数据推流已停止'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.info('IMU头部姿态数据推流已停止')
else:
emit('imu_status', {'status': 'error', 'message': 'IMU头部姿态数据推流停止失败'})
try:
emit('imu_status', {'status': 'error', 'message': 'IMU头部姿态数据推流停止失败'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.error('IMU头部姿态数据推流停止失败')
else:
emit('imu_status', {'status': 'error', 'message': '设备管理器未初始化'})
try:
emit('imu_status', {'status': 'error', 'message': '设备管理器未初始化'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
logger.error('设备管理器未初始化')
except Exception as e:
logger.error(f'停止IMU数据推流失败: {e}')
emit('imu_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
try:
emit('imu_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
except Exception as emit_error:
logger.error(f'发送IMU状态失败: {emit_error}')
@socketio.on('start_pressure_streaming')
def handle_start_pressure_streaming(data=None):
@ -1219,29 +1247,34 @@ def handle_stop_pressure_streaming(data=None):
logger.error(f'停止压力传感器数据推流失败: {e}')
emit('pressure_status', {'status': 'error', 'message': f'停止失败: {str(e)}'})
# 重复的事件处理器已删除,使用前面定义的版本
# ==================== 错误处理 ====================
@app.errorhandler(404)
def not_found(error):
return jsonify({'success': False, 'error': 'API接口不存在'}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({'success': False, 'error': '服务器内部错误'}), 500
if __name__ == '__main__':
"""主入口点 - 用于直接运行和PyInstaller打包"""
import argparse
# 解析命令行参数
parser = argparse.ArgumentParser(description='Body Balance Evaluation System Backend')
parser.add_argument('--host', default=None, help='Host address to bind to')
parser.add_argument('--port', type=int, default=None, help='Port number to bind to')
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
args = parser.parse_args()
try:
# 初始化应用
init_app()
# 启动服务器
logger.info("启动身体平衡评估系统后端服务...")
logger.info("服务器地址: http://localhost:5000")
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False,
allow_unsafe_werkzeug=True
)
init_app()
except KeyboardInterrupt:
logger.info("用户中断,正在关闭服务器...")
logger.info('服务被用户中断')
except Exception as e:
logger.error(f"启动失败: {e}")
input("按回车键退出...")
sys.exit(1)
logger.error(f'服务启动失败: {e}')
sys.exit(1)
finally:
logger.info('后端服务已停止')

102
backend/app.spec Normal file
View File

@ -0,0 +1,102 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app.py'],
pathex=[],
binaries=[
('dll/femtobolt/bin/k4a.dll', '.'), # K4A动态库
('dll/femtobolt/bin/k4arecord.dll', '.'), # K4A录制库
('dll/femtobolt/bin/depthengine_2_0.dll', '.'), # 深度引擎
('dll/femtobolt/bin/OrbbecSDK.dll', '.'), # Orbbec SDK
('dll/femtobolt/bin/ob_usb.dll', '.'), # Orbbec USB库
('dll/femtobolt/bin/live555.dll', '.'), # Live555库
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
('dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml', '.'), # Orbbec配置文件
],
datas=[
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'cv2',
'numpy',
'pandas',
'scipy',
'matplotlib',
'seaborn',
'sklearn',
'PIL',
'reportlab',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'pyserial',
'requests',
'yaml',
'click',
'colorama',
'tqdm',
'database',
'device_manager',
'utils',
'eventlet',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'engineio.async_drivers.eventlet',
'engineio.async_eventlet',
'socketio.async_eventlet',
'greenlet',
'gevent',
'gevent.socket',
'gevent.select',
'dns',
'dns.resolver',
'dns.reversename',
'dns.e164',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend_Full',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)

242
backend/app_minimal.py Normal file
View File

@ -0,0 +1,242 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
平衡体态检测系统 - 后端服务最小版本
基于Flask的本地API服务暂时移除SocketIO功能
"""
import os
import sys
import json
import time
import threading
from datetime import datetime
from flask import Flask, jsonify, send_file
from flask import request as flask_request
from flask_cors import CORS
import sqlite3
import logging
from pathlib import Path
import cv2
import configparser
# 添加当前目录到Python路径
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
# 导入自定义模块
from database import DatabaseManager
from device_manager import DeviceManager
from utils import config as app_config
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/backend.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# 创建Flask应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'body-balance-detection-system-2024'
# 启用CORS
CORS(app, origins=['http://localhost:3000', 'http://127.0.0.1:3000', 'http://192.168.1.58:3000'])
# 全局变量
db_manager = None
device_manager = None
socketio = None # 暂时禁用SocketIO
def init_app():
"""初始化应用"""
global db_manager, device_manager
try:
# 创建必要的目录
os.makedirs('logs', exist_ok=True)
os.makedirs('data', exist_ok=True)
# 从配置文件读取数据库路径
db_path_config = app_config.get('DATABASE', 'path', 'data/body_balance.db')
# 对于打包后的exe确保数据库路径基于exe所在目录
if getattr(sys, 'frozen', False):
# 如果是打包后的exe使用exe所在目录
exe_dir = os.path.dirname(sys.executable)
if not os.path.isabs(db_path_config):
db_path = os.path.join(exe_dir, db_path_config)
else:
db_path = db_path_config
else:
# 如果是开发环境,使用脚本所在目录
if not os.path.isabs(db_path_config):
current_dir = os.path.dirname(os.path.abspath(__file__))
if db_path_config.startswith('backend/'):
db_path_config = db_path_config[8:] # 去掉 'backend/' 前缀
db_path = os.path.join(current_dir, db_path_config)
else:
db_path = db_path_config
# 确保数据库目录存在
db_dir = os.path.dirname(db_path)
os.makedirs(db_dir, exist_ok=True)
# 打印数据库路径信息用于调试
logger.info(f'数据库配置路径: {db_path_config}')
logger.info(f'数据库实际路径: {db_path}')
logger.info(f'数据库目录: {db_dir}')
logger.info(f'当前工作目录: {os.getcwd()}')
# 初始化数据库
db_manager = DatabaseManager(db_path)
db_manager.init_database()
# 初始化设备管理器不使用SocketIO
device_manager = DeviceManager(db_manager)
logger.info('应用初始化完成最小版本无SocketIO')
except Exception as e:
logger.error(f'应用初始化失败: {e}')
logger.warning('部分功能可能不可用,但服务将继续运行')
# ==================== 基础API ====================
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0-minimal',
'socketio_enabled': False
})
@app.route('/api/health', methods=['GET'])
def api_health_check():
"""API健康检查"""
return jsonify({
'status': 'ok',
'timestamp': datetime.now().isoformat(),
'version': '1.0.0-minimal',
'socketio_enabled': False
})
@app.route('/api/test', methods=['GET'])
def test_endpoint():
"""测试端点"""
return jsonify({
'message': 'Backend is running!',
'timestamp': datetime.now().isoformat(),
'database_status': 'connected' if db_manager else 'not_connected',
'device_manager_status': 'initialized' if device_manager else 'not_initialized'
})
# ==================== 认证API ====================
@app.route('/api/auth/login', methods=['POST'])
def login():
"""用户登录"""
try:
data = flask_request.get_json()
username = data.get('username')
password = data.get('password')
remember = data.get('remember', False)
if not username or not password:
return jsonify({
'success': False,
'message': '用户名或密码不能为空'
}), 400
# 简单的测试登录逻辑
if username == 'admin' and password == 'admin':
# 生成token实际项目中应使用JWT等安全token
token = f"token_{username}_{int(time.time())}"
return jsonify({
'success': True,
'message': '登录成功',
'data': {
'token': token,
'user': {
'id': 1,
'username': username,
'role': 'admin'
}
}
})
else:
return jsonify({
'success': False,
'message': '用户名或密码错误'
}), 401
except Exception as e:
logger.error(f'登录失败: {e}')
return jsonify({
'success': False,
'message': '登录过程中发生错误'
}), 500
@app.route('/api/auth/logout', methods=['POST'])
def logout():
"""用户登出"""
return jsonify({
'success': True,
'message': '登出成功'
})
# ==================== 设备API ====================
@app.route('/api/devices/status', methods=['GET'])
def get_device_status():
"""获取设备状态"""
try:
if device_manager:
status = {
'cameras': device_manager.get_camera_status(),
'sensors': device_manager.get_sensor_status()
}
else:
status = {
'cameras': {},
'sensors': {},
'error': '设备管理器未初始化'
}
return jsonify({
'success': True,
'data': status
})
except Exception as e:
logger.error(f'获取设备状态失败: {e}')
return jsonify({
'success': False,
'message': f'获取设备状态失败: {str(e)}'
}), 500
# ==================== 错误处理 ====================
@app.errorhandler(404)
def not_found(error):
return jsonify({
'success': False,
'message': '接口不存在'
}), 404
@app.errorhandler(500)
def internal_error(error):
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
if __name__ == '__main__':
init_app()
app.run(host='0.0.0.0', port=5000, debug=True)

71
backend/app_minimal.spec Normal file
View File

@ -0,0 +1,71 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app_minimal.py'],
pathex=[],
binaries=[
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
],
datas=[
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
],
hiddenimports=[
'flask',
'flask_cors',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'database',
'device_manager',
'utils',
'cv2',
'numpy',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'flask_socketio',
'socketio',
'engineio',
'eventlet',
'gevent',
],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)

View File

@ -1,62 +1,62 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app_simple.py'],
['main_exe.py'],
pathex=[],
binaries=[
('dll/femtobolt/bin/k4a.dll', '.'), # K4A动态库
('dll/femtobolt/bin/k4arecord.dll', '.'), # K4A录制库
('dll/femtobolt/bin/depthengine_2_0.dll', '.'), # 深度引擎
('dll/femtobolt/bin/OrbbecSDK.dll', '.'), # Orbbec SDK
('dll/femtobolt/bin/ob_usb.dll', '.'), # Orbbec USB库
('dll/femtobolt/bin/live555.dll', '.'), # Live555库
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
('dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml', '.'), # Orbbec配置文件
# 只包含确实存在的DLL文件
],
datas=[
('..\config.ini', '.'), # 配置文件
('..\config.json', '.'), # JSON配置文件
('tests', 'tests'), # 测试文件夹
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
('tests', 'tests'), # 测试文件夹
('app_minimal.py', '.'), # 应用入口文件
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'socketio.namespace',
'cv2',
'numpy',
'psutil',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'requests',
'click',
'colorama',
'database',
'device_manager',
'utils',
'dns',
'dns.resolver',
'dns.asyncresolver'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
excludes=[
'eventlet',
'gevent',
'gevent_uwsgi'
],
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend',
@ -72,5 +72,5 @@ exe = EXE(
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='..\document\icon.ico' if os.path.exists('..\document\icon.ico') else None,
icon=None
)

367
backend/build_app.py Normal file
View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app.py 完整版打包脚本
使用PyInstaller将完整版Flask应用程序包含SocketIO打包成独立的exe文件
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def clean_build_dirs():
"""清理构建目录"""
print("清理构建目录...")
dirs_to_clean = ['build', 'dist', '__pycache__']
for dir_name in dirs_to_clean:
if os.path.exists(dir_name):
try:
shutil.rmtree(dir_name)
print(f"✓ 已删除 {dir_name}")
except Exception as e:
print(f"⚠️ 删除 {dir_name} 失败: {e}")
def create_app_spec():
"""创建app.py的PyInstaller spec文件"""
spec_content = '''
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app.py'],
pathex=[],
binaries=[
('dll/femtobolt/bin/k4a.dll', '.'), # K4A动态库
('dll/femtobolt/bin/k4arecord.dll', '.'), # K4A录制库
('dll/femtobolt/bin/depthengine_2_0.dll', '.'), # 深度引擎
('dll/femtobolt/bin/OrbbecSDK.dll', '.'), # Orbbec SDK
('dll/femtobolt/bin/ob_usb.dll', '.'), # Orbbec USB库
('dll/femtobolt/bin/live555.dll', '.'), # Live555库
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
('dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml', '.'), # Orbbec配置文件
],
datas=[
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'cv2',
'numpy',
'pandas',
'scipy',
'matplotlib',
'seaborn',
'sklearn',
'PIL',
'reportlab',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'pyserial',
'requests',
'yaml',
'click',
'colorama',
'tqdm',
'database',
'device_manager',
'utils',
'eventlet',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'engineio.async_drivers.eventlet',
'engineio.async_eventlet',
'socketio.async_eventlet',
'greenlet',
'gevent',
'gevent.socket',
'gevent.select',
'dns',
'dns.resolver',
'dns.reversename',
'dns.e164',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend_Full',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)
'''
with open('app.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 app.spec 文件")
def build_exe():
"""构建exe文件"""
print("\n开始构建exe文件...")
try:
# 使用PyInstaller构建
cmd = [sys.executable, '-m', 'PyInstaller', 'app.spec', '--clean', '--noconfirm']
print(f"执行命令: {' '.join(cmd)}")
# 修改编码处理避免UTF-8错误
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', errors='ignore')
if result.returncode == 0:
print("✓ 构建成功!")
return True
else:
print(f"✗ 构建失败")
print(f"错误输出: {result.stderr}")
return False
except Exception as e:
print(f"✗ 构建过程出错: {e}")
return False
def create_startup_script():
"""创建启动脚本"""
startup_script = '''@echo off
echo 启动身体平衡评估系统后端服务完整版...
echo.
echo 服务信息:
echo - HTTP API: http://localhost:5000
echo - SocketIO: ws://localhost:5000
echo - 管理界面: http://localhost:5000
echo.
echo 按Ctrl+C停止服务
echo.
"BodyBalanceBackend_Full.exe"
if %errorlevel% neq 0 (
echo.
echo 服务启动失败请检查错误信息
pause
)
'''
dist_dir = 'dist'
if not os.path.exists(dist_dir):
os.makedirs(dist_dir)
with open(os.path.join(dist_dir, 'start_backend_full.bat'), 'w', encoding='utf-8') as f:
f.write(startup_script)
print("✓ 创建启动脚本: dist/start_backend_full.bat")
def create_directories():
"""创建必要的目录结构"""
print("创建目录结构...")
dist_dir = 'dist'
directories = ['dll', 'data', 'logs']
for directory in directories:
dir_path = os.path.join(dist_dir, directory)
if not os.path.exists(dir_path):
try:
os.makedirs(dir_path)
print(f"✓ 创建目录: {directory}/")
except Exception as e:
print(f"⚠️ 创建目录 {directory} 失败: {e}")
else:
print(f"✓ 目录已存在: {directory}/")
def copy_dll_files():
"""复制DLL文件到dll目录"""
print("复制DLL文件...")
dll_dir = os.path.join('dist', 'dll')
# 查找所有DLL文件
dll_files = []
# 从当前目录查找DLL文件
for file in os.listdir('.'):
if file.endswith('.dll'):
dll_files.append(file)
# 从子目录查找DLL文件
for root, dirs, files in os.walk('.'):
if 'dist' in root or 'build' in root or '__pycache__' in root:
continue
for file in files:
if file.endswith('.dll'):
dll_files.append(os.path.join(root, file))
if not dll_files:
print("⚠️ 未找到DLL文件")
return
for dll_file in dll_files:
if os.path.exists(dll_file):
try:
shutil.copy2(dll_file, dll_dir)
print(f"✓ 已复制 {os.path.basename(dll_file)}")
except Exception as e:
print(f"⚠️ 复制 {dll_file} 失败: {e}")
def copy_data_files():
"""复制数据库文件到data目录"""
print("复制数据库文件...")
data_dir = os.path.join('dist', 'data')
# 数据库文件列表
db_files = ['body_balance.db', 'database.db']
for db_file in db_files:
if os.path.exists(db_file):
try:
shutil.copy2(db_file, data_dir)
print(f"✓ 已复制 {db_file}")
except Exception as e:
print(f"⚠️ 复制 {db_file} 失败: {e}")
else:
print(f"⚠️ 数据库文件不存在: {db_file}")
def copy_config_files():
"""复制配置文件到dist目录"""
print("复制配置文件...")
config_files = ['config.ini']
dist_dir = 'dist'
if not os.path.exists(dist_dir):
os.makedirs(dist_dir)
for config_file in config_files:
if os.path.exists(config_file):
try:
shutil.copy2(config_file, dist_dir)
print(f"✓ 已复制 {config_file}")
except Exception as e:
print(f"⚠️ 复制 {config_file} 失败: {e}")
else:
print(f"⚠️ 配置文件不存在: {config_file}")
def main():
"""主函数"""
print("=" * 60)
print("身体平衡评估系统 - app.py 完整版打包工具")
print("=" * 60)
print()
# 检查当前目录
if not os.path.exists('app.py'):
print("✗ 错误: 找不到 app.py 文件")
print("请确保在backend目录下运行此脚本")
input("按回车键退出...")
return
try:
# 清理构建目录
clean_build_dirs()
print()
# 创建spec文件
print("创建PyInstaller配置...")
create_app_spec()
print()
# 构建exe
if build_exe():
print()
print("后处理...")
# 检查生成的exe文件
exe_path = 'dist/BodyBalanceBackend_Full.exe'
if os.path.exists(exe_path):
print(f"✓ exe文件位置: {exe_path}")
# 创建目录结构
create_directories()
print()
# 复制DLL文件
copy_dll_files()
print()
# 复制数据库文件
copy_data_files()
print()
# 复制配置文件
copy_config_files()
print()
# 创建启动脚本
create_startup_script()
print()
print("🎉 打包完成!")
print()
print("输出文件:")
print(f"- 可执行文件: {exe_path}")
print("- 启动脚本: dist/start_backend_full.bat")
print("- 配置文件: dist/config.ini")
print()
print("目录结构:")
print("- dll/ - DLL文件")
print("- data/ - 数据库文件")
print("- logs/ - 日志文件")
print()
print("使用方式:")
print("1. 直接运行: dist/BodyBalanceBackend_Full.exe")
print("2. 使用脚本: dist/start_backend_full.bat")
print()
print("服务地址: http://localhost:5000")
print("SocketIO: ws://localhost:5000")
else:
print("✗ 错误: 未找到生成的exe文件")
else:
print("\n✗ 打包失败")
except Exception as e:
print(f"\n✗ 打包过程出错: {e}")
print()
input("按回车键退出...")
if __name__ == '__main__':
main()

View File

@ -1,76 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
后端应用程序打包脚本
使用PyInstaller将Flask应用程序打包成独立的exe文件
身体平衡评估系统后端打包脚本
基于最小测试框架的成功经验简化打包配置
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def check_dependencies():
"""检查必需的依赖"""
print("检查依赖模块...")
required_modules = [
'flask',
'flask_socketio',
'flask_cors',
'socketio',
'engineio',
'numpy',
'cv2',
'psutil'
]
missing_modules = []
for module in required_modules:
try:
__import__(module)
print(f"{module}")
except ImportError:
print(f"{module} (缺失)")
missing_modules.append(module)
if missing_modules:
print(f"\n缺失模块: {', '.join(missing_modules)}")
print("请运行: pip install -r requirements_build.txt")
return False
print("✓ 所有依赖模块检查通过")
return True
def create_spec_file():
"""创建PyInstaller spec文件"""
spec_content = '''
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app_simple.py'],
['main_exe.py'],
pathex=[],
binaries=[
('dll/femtobolt/bin/k4a.dll', '.'), # K4A动态库
('dll/femtobolt/bin/k4arecord.dll', '.'), # K4A录制库
('dll/femtobolt/bin/depthengine_2_0.dll', '.'), # 深度引擎
('dll/femtobolt/bin/OrbbecSDK.dll', '.'), # Orbbec SDK
('dll/femtobolt/bin/ob_usb.dll', '.'), # Orbbec USB库
('dll/femtobolt/bin/live555.dll', '.'), # Live555库
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
('dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml', '.'), # Orbbec配置文件
# 只包含确实存在的DLL文件
],
datas=[
('..\\config.ini', '.'), # 配置文件
('..\\config.json', '.'), # JSON配置文件
('tests', 'tests'), # 测试文件夹
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
('tests', 'tests'), # 测试文件夹
('app_minimal.py', '.'), # 应用入口文件
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'socketio.namespace',
'cv2',
'numpy',
'psutil',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'requests',
'click',
'colorama',
'database',
'device_manager',
'utils',
'dns',
'dns.resolver',
'dns.asyncresolver'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
excludes=[
'eventlet',
'gevent',
'gevent_uwsgi'
],
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend',
@ -86,106 +118,35 @@ exe = EXE(
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='..\\document\\icon.ico' if os.path.exists('..\\document\\icon.ico') else None,
icon=None
)
'''
with open('backend.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 backend.spec 文件")
def create_main_entry():
"""创建主入口文件"""
main_content = '''
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
身体平衡评估系统 - 后端服务入口
独立运行的exe版本
"""
import os
import sys
import logging
from pathlib import Path
# 设置工作目录
if getattr(sys, 'frozen', False):
# 如果是打包后的exe
application_path = os.path.dirname(sys.executable)
else:
# 如果是开发环境
application_path = os.path.dirname(os.path.abspath(__file__))
os.chdir(application_path)
sys.path.insert(0, application_path)
# 创建必要的目录
os.makedirs('logs', exist_ok=True)
os.makedirs('data', exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/backend.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def main():
"""主函数"""
try:
logger.info("启动身体平衡评估系统后端服务...")
logger.info(f"工作目录: {os.getcwd()}")
# 导入并启动Flask应用
from app import app, socketio, init_app
# 初始化应用
init_app()
# 启动服务器
logger.info("后端服务器启动在 http://localhost:5000")
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False,
allow_unsafe_werkzeug=True
)
except KeyboardInterrupt:
logger.info("用户中断,正在关闭服务器...")
except Exception as e:
logger.error(f"启动失败: {e}")
input("按回车键退出...")
sys.exit(1)
if __name__ == '__main__':
main()
'''
with open('main_exe.py', 'w', encoding='utf-8') as f:
f.write(main_content)
print("✓ 已创建 main_exe.py 入口文件")
print("✓ 已创建 backend.spec 文件")
def build_exe():
"""构建exe文件"""
print("开始构建exe文件...")
# 检查依赖模块
print("\n检查依赖模块...")
if not check_dependencies():
print("\n❌ 依赖检查失败,请先安装缺失的模块")
return False
# 检查PyInstaller是否安装
try:
import PyInstaller
print(f"✓ PyInstaller版本: {PyInstaller.__version__}")
print(f"\n✓ PyInstaller版本: {PyInstaller.__version__}")
except ImportError:
print("❌ PyInstaller未安装正在安装...")
os.system('pip install pyinstaller')
subprocess.run([sys.executable, '-m', 'pip', 'install', 'pyinstaller>=6.10.0'])
# 清理之前的构建
print("\n清理构建目录...")
if os.path.exists('build'):
shutil.rmtree('build')
print("✓ 清理build目录")
@ -194,20 +155,25 @@ def build_exe():
shutil.rmtree('dist')
print("✓ 清理dist目录")
# 使用spec文件构建
cmd = 'python -m PyInstaller backend.spec --clean --noconfirm'
print(f"执行命令: {cmd}")
result = os.system(cmd)
# 创建spec文件
create_spec_file()
if result == 0:
# 使用spec文件构建
print("\n开始打包...")
cmd = [sys.executable, '-m', 'PyInstaller', 'backend.spec', '--clean', '--noconfirm']
print(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
if result.returncode == 0:
print("\n✓ 构建成功!")
print("exe文件位置: dist/BodyBalanceBackend.exe")
# 复制额外的文件
dist_path = Path('dist')
if dist_path.exists():
# 复制配置文件
config_files = ['../config.ini', '../config.json']
# 复制配置文件(如果存在)
config_files = ['config.ini', 'config.json']
for config_file in config_files:
if os.path.exists(config_file):
shutil.copy2(config_file, dist_path)
@ -217,37 +183,38 @@ def build_exe():
start_script = dist_path / 'start_backend.bat'
with open(start_script, 'w', encoding='utf-8') as f:
f.write('@echo off\n')
f.write('echo 启动身体平衡评估系统后端服务...\n')
f.write('chcp 65001 >nul\n')
f.write('echo Starting Body Balance Backend...\n')
f.write('BodyBalanceBackend.exe\n')
f.write('pause\n')
print("✓ 创建启动脚本: start_backend.bat")
print("\n🎉 打包完成!")
print("运行方式:")
print("\n测试方法:")
print("1. 直接运行: dist/BodyBalanceBackend.exe")
print("2. 使用脚本: dist/start_backend.bat")
print("3. 在浏览器中访问: http://localhost:5000")
return True
else:
print("❌ 构建失败")
print("\n❌ 构建失败")
print(f"错误信息: {result.stderr}")
return False
return True
def main():
"""主函数"""
print("=" * 50)
print("身体平衡评估系统 - 后端打包工具")
print("=" * 50)
print("================================================")
print("身体平衡评估系统 - 后端打包工具 (简化版)")
print("基于最小测试框架的成功经验")
print("================================================")
print()
# 检查当前目录
if not os.path.exists('app.py'):
print("❌ 请在backend目录下运行此脚本")
return
try:
# 创建配置文件
create_spec_file()
create_main_entry()
# 检查是否在正确的目录
if not os.path.exists('main_exe.py'):
print("❌ 错误请在backend目录下运行此脚本")
print("当前目录应包含 main_exe.py 文件")
return
# 构建exe
if build_exe():
@ -257,6 +224,8 @@ def main():
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
input("\n按回车键退出...")

View File

@ -0,0 +1,524 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
后端应用程序打包脚本
使用PyInstaller将Flask应用程序打包成独立的exe文件
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def create_spec_file():
"""创建PyInstaller spec文件"""
spec_content = '''
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['main_exe.py'],
pathex=[],
binaries=[
('dll/femtobolt/bin/k4a.dll', '.'), # K4A动态库
('dll/femtobolt/bin/k4arecord.dll', '.'), # K4A录制库
('dll/femtobolt/bin/depthengine_2_0.dll', '.'), # 深度引擎
('dll/femtobolt/bin/OrbbecSDK.dll', '.'), # Orbbec SDK
('dll/femtobolt/bin/ob_usb.dll', '.'), # Orbbec USB库
('dll/femtobolt/bin/live555.dll', '.'), # Live555库
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
('dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml', '.'), # Orbbec配置文件
],
datas=[
('..\\config.ini', '.'), # 配置文件
('..\\config.json', '.'), # JSON配置文件
('tests', 'tests'), # 测试文件夹
('data', 'data'), # 数据文件夹
],
hiddenimports=[
'flask',
'flask_socketio',
'flask_cors',
'cv2',
'numpy',
'pandas',
'scipy',
'matplotlib',
'seaborn',
'sklearn',
'PIL',
'reportlab',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'pykinect_azure',
'pyserial',
'requests',
'yaml',
'click',
'colorama',
'tqdm',
'database',
'device_manager',
'utils',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None, # 图标文件不存在,暂时禁用
)
'''
with open('backend.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 backend.spec 文件")
def create_debug_spec():
"""创建诊断脚本的spec文件"""
import subprocess
spec_content = f'''# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['test_socketio_debug.py'],
pathex=['{os.getcwd()}'],
binaries=[],
datas=[],
hiddenimports=[
'flask',
'flask_socketio',
'eventlet',
'threading',
'engineio.async_drivers.threading',
'engineio.async_drivers.eventlet',
'engineio.async_eventlet',
'socketio.async_eventlet',
'greenlet',
],
hookspath=[],
hooksconfig={{}},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='test_socketio_debug',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
'''
with open('debug.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 debug.spec 文件")
# 直接构建诊断exe
print("开始构建诊断exe文件...")
result = subprocess.run([sys.executable, '-m', 'PyInstaller', 'debug.spec'],
capture_output=True, text=True, encoding='utf-8')
if result.returncode == 0:
print("✓ 诊断exe构建成功")
print("运行诊断: dist/test_socketio_debug.exe")
else:
print(f"❌ 诊断exe构建失败: {result.stderr}")
def create_minimal_spec():
"""创建最小化测试脚本的spec文件"""
print("创建最小化测试脚本的spec文件...")
spec_content = '''# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['test_socketio_minimal.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'flask',
'flask_socketio',
'eventlet',
'eventlet.hubs',
'eventlet.hubs.epolls',
'eventlet.hubs.kqueue',
'eventlet.hubs.selects',
'threading',
'engineio',
'engineio.async_drivers',
'engineio.async_drivers.threading',
'engineio.async_drivers.eventlet',
'engineio.async_drivers.gevent',
'socketio',
'socketio.namespace'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='test_socketio_minimal',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)
'''
with open('minimal.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 minimal.spec 文件")
# 直接构建
print("开始构建最小化测试exe...")
result = subprocess.run([sys.executable, '-m', 'PyInstaller', 'minimal.spec', '--clean'],
capture_output=True, text=True, encoding='utf-8')
if result.returncode == 0:
print("✓ 最小化测试exe构建完成: dist/test_socketio_minimal.exe")
else:
print(f"❌ 最小化测试exe构建失败: {result.stderr}")
def create_main_entry():
"""创建主入口文件"""
main_content = '''
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
身体平衡评估系统 - 后端服务入口
独立运行的exe版本
"""
import os
import sys
import logging
from pathlib import Path
# 设置工作目录
if getattr(sys, 'frozen', False):
# 如果是打包后的exe
application_path = os.path.dirname(sys.executable)
else:
# 如果是开发环境
application_path = os.path.dirname(os.path.abspath(__file__))
os.chdir(application_path)
sys.path.insert(0, application_path)
# 创建必要的目录
os.makedirs('logs', exist_ok=True)
os.makedirs('data', exist_ok=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/backend.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def main():
"""主函数"""
try:
logger.info("启动身体平衡评估系统后端服务...")
logger.info(f"工作目录: {os.getcwd()}")
# 导入并启动Flask应用
from app import app, socketio, init_app
# 初始化应用
init_app()
# 启动服务器
logger.info("后端服务器启动在 http://localhost:5000")
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False
)
except KeyboardInterrupt:
logger.info("用户中断,正在关闭服务器...")
except Exception as e:
logger.error(f"启动失败: {e}")
input("按回车键退出...")
sys.exit(1)
if __name__ == '__main__':
main()
'''
with open('main_exe.py', 'w', encoding='utf-8') as f:
f.write(main_content)
print("✓ 已创建 main_exe.py 入口文件")
def check_dependencies():
"""检查必需的依赖模块"""
required_modules = [
'flask', 'flask_cors', 'flask_socketio', 'numpy', 'pandas',
'cv2', 'matplotlib', 'seaborn', 'PIL', 'reportlab',
'serial', 'requests', 'sqlite3', 'configparser', 'scipy', 'eventlet'
]
missing_modules = []
for module in required_modules:
try:
__import__(module)
print(f"{module}")
except ImportError:
missing_modules.append(module)
print(f"{module} - 未安装")
if missing_modules:
print(f"\n警告: 发现 {len(missing_modules)} 个缺失的依赖模块")
print("建议运行: pip install -r requirements.txt")
return False
print("✓ 所有必需依赖模块检查通过")
return True
def build_exe():
"""构建exe文件"""
print("开始构建exe文件...")
# 检查依赖模块
print("\n检查依赖模块...")
if not check_dependencies():
print("\n❌ 依赖检查失败,请先安装缺失的模块")
return False
# 检查PyInstaller是否安装
try:
import PyInstaller
print(f"\n✓ PyInstaller版本: {PyInstaller.__version__}")
except ImportError:
print("❌ PyInstaller未安装正在安装...")
os.system('pip install pyinstaller')
# 清理之前的构建
if os.path.exists('build'):
shutil.rmtree('build')
print("✓ 清理build目录")
if os.path.exists('dist'):
shutil.rmtree('dist')
print("✓ 清理dist目录")
# 使用spec文件构建
cmd = 'python -m PyInstaller backend.spec --clean --noconfirm'
print(f"执行命令: {cmd}")
result = os.system(cmd)
if result == 0:
print("\n✓ 构建成功!")
print("exe文件位置: dist/BodyBalanceBackend.exe")
# 复制额外的文件
dist_path = Path('dist')
if dist_path.exists():
# 复制配置文件
config_files = ['../config.ini', '../config.json']
for config_file in config_files:
if os.path.exists(config_file):
shutil.copy2(config_file, dist_path)
print(f"✓ 复制配置文件: {config_file}")
# 创建启动脚本
start_script = dist_path / 'start_backend.bat'
with open(start_script, 'w', encoding='utf-8') as f:
f.write('@echo off\n')
f.write('echo 启动身体平衡评估系统后端服务...\n')
f.write('BodyBalanceBackend.exe\n')
f.write('pause\n')
print("✓ 创建启动脚本: start_backend.bat")
print("\n🎉 打包完成!")
print("运行方式:")
print("1. 直接运行: dist/BodyBalanceBackend.exe")
print("2. 使用脚本: dist/start_backend.bat")
else:
print("❌ 构建失败")
return False
return True
def check_required_files():
"""检查必需的文件是否存在"""
required_files = {
'app.py': '主应用文件',
'database.py': '数据库模块',
'device_manager.py': '设备管理模块',
'utils.py': '工具模块',
'../config.ini': '配置文件',
'../config.json': 'JSON配置文件'
}
required_dlls = {
'dll/femtobolt/bin/k4a.dll': 'K4A动态库',
'dll/femtobolt/bin/k4arecord.dll': 'K4A录制库',
'dll/femtobolt/bin/depthengine_2_0.dll': '深度引擎',
'dll/femtobolt/bin/OrbbecSDK.dll': 'Orbbec SDK',
'dll/femtobolt/bin/ob_usb.dll': 'Orbbec USB库',
'dll/femtobolt/bin/live555.dll': 'Live555库',
'dll/smitsense/SMiTSenseUsb-F3.0.dll': 'SMiTSense传感器库',
'dll/femtobolt/bin/OrbbecSDKConfig_v1.0.xml': 'Orbbec配置文件'
}
missing_files = []
print("检查必需文件...")
for file_path, description in required_files.items():
if os.path.exists(file_path):
print(f"{description}: {file_path}")
else:
missing_files.append((file_path, description))
print(f"{description}: {file_path} - 文件不存在")
print("\n检查DLL文件...")
for dll_path, description in required_dlls.items():
if os.path.exists(dll_path):
print(f"{description}: {dll_path}")
else:
missing_files.append((dll_path, description))
print(f"{description}: {dll_path} - 文件不存在")
if missing_files:
print(f"\n警告: 发现 {len(missing_files)} 个缺失文件")
print("缺失的文件可能导致打包失败或运行时错误")
return False
print("\n✓ 所有必需文件检查通过")
return True
def main():
"""主函数"""
print("=" * 50)
print("身体平衡评估系统 - 后端打包工具")
print("=" * 50)
print()
# 检查当前目录
if not os.path.exists('app.py'):
print("❌ 请在backend目录下运行此脚本")
return
# 检查必需文件
print("\n检查必需文件...")
if not check_required_files():
print("\n⚠️ 文件检查发现问题,但将继续执行打包")
print("如果打包失败,请检查上述缺失的文件")
input("按回车键继续...")
try:
# 创建配置文件
create_spec_file()
# create_main_entry() # 注释掉以保留现有的main_exe.py修改
# 临时为诊断脚本创建spec文件
if len(sys.argv) > 1 and sys.argv[1] == 'debug':
create_debug_spec()
return
elif len(sys.argv) > 1 and sys.argv[1] == 'minimal':
create_minimal_spec()
return
# 构建exe
if build_exe():
print("\n✅ 所有操作完成!")
else:
print("\n❌ 构建过程中出现错误")
except Exception as e:
print(f"❌ 错误: {e}")
input("\n按回车键退出...")
if __name__ == '__main__':
main()

244
backend/build_minimal.py Normal file
View File

@ -0,0 +1,244 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
app_minimal.py 打包脚本
使用PyInstaller将最小版本Flask应用程序打包成独立的exe文件
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def clean_build_dirs():
"""清理构建目录"""
print("清理构建目录...")
dirs_to_clean = ['build', 'dist', '__pycache__']
for dir_name in dirs_to_clean:
if os.path.exists(dir_name):
try:
shutil.rmtree(dir_name)
print(f"✓ 已删除 {dir_name}")
except Exception as e:
print(f"⚠️ 删除 {dir_name} 失败: {e}")
def create_minimal_spec():
"""创建app_minimal.py的PyInstaller spec文件"""
spec_content = '''
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['app_minimal.py'],
pathex=[],
binaries=[
('dll/smitsense/SMiTSenseUsb-F3.0.dll', '.'), # SMiTSense传感器库
],
datas=[
('config.ini', '.'), # 配置文件
('data', 'data'), # 数据文件夹
],
hiddenimports=[
'flask',
'flask_cors',
'sqlite3',
'configparser',
'logging',
'threading',
'queue',
'base64',
'psutil',
'database',
'device_manager',
'utils',
'cv2',
'numpy',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'flask_socketio',
'socketio',
'engineio',
'eventlet',
'gevent',
],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='BodyBalanceBackend',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)
'''
with open('app_minimal.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 app_minimal.spec 文件")
def build_exe():
"""构建exe文件"""
print("\n开始构建exe文件...")
try:
# 使用PyInstaller构建
cmd = [sys.executable, '-m', 'PyInstaller', 'app_minimal.spec', '--clean', '--noconfirm']
print(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8')
if result.returncode == 0:
print("✓ 构建成功!")
return True
else:
print(f"✗ 构建失败")
print(f"错误输出: {result.stderr}")
return False
except Exception as e:
print(f"✗ 构建过程出错: {e}")
return False
def create_startup_script():
"""创建启动脚本"""
startup_script = '''@echo off
echo 启动身体平衡评估系统后端服务最小版本...
echo.
echo 服务信息:
echo - HTTP API: http://localhost:5000
echo - 管理界面: http://localhost:5000
echo.
echo 按Ctrl+C停止服务
echo.
"BodyBalanceBackend.exe"
if %errorlevel% neq 0 (
echo.
echo 服务启动失败请检查错误信息
pause
)
'''
dist_dir = 'dist'
if not os.path.exists(dist_dir):
os.makedirs(dist_dir)
with open(os.path.join(dist_dir, 'start_backend.bat'), 'w', encoding='utf-8') as f:
f.write(startup_script)
print("✓ 创建启动脚本: dist/start_backend.bat")
def copy_config_files():
"""复制配置文件到dist目录"""
print("复制配置文件...")
config_files = ['config.ini']
dist_dir = 'dist'
if not os.path.exists(dist_dir):
os.makedirs(dist_dir)
for config_file in config_files:
if os.path.exists(config_file):
try:
shutil.copy2(config_file, dist_dir)
print(f"✓ 已复制 {config_file}")
except Exception as e:
print(f"⚠️ 复制 {config_file} 失败: {e}")
else:
print(f"⚠️ 配置文件不存在: {config_file}")
def main():
"""主函数"""
print("=" * 60)
print("身体平衡评估系统 - app_minimal.py 打包工具")
print("=" * 60)
print()
# 检查当前目录
if not os.path.exists('app_minimal.py'):
print("✗ 错误: 找不到 app_minimal.py 文件")
print("请确保在backend目录下运行此脚本")
input("按回车键退出...")
return
try:
# 清理构建目录
clean_build_dirs()
print()
# 创建spec文件
print("创建PyInstaller配置...")
create_minimal_spec()
print()
# 构建exe
if build_exe():
print()
print("后处理...")
# 检查生成的exe文件
exe_path = 'dist/BodyBalanceBackend.exe'
if os.path.exists(exe_path):
print(f"✓ exe文件位置: {exe_path}")
# 复制配置文件
copy_config_files()
# 创建启动脚本
create_startup_script()
print()
print("🎉 打包完成!")
print()
print("输出文件:")
print(f"- 可执行文件: {exe_path}")
print("- 启动脚本: dist/start_backend.bat")
print("- 配置文件: dist/config.ini")
print()
print("使用方式:")
print("1. 直接运行: dist/BodyBalanceBackend.exe")
print("2. 使用脚本: dist/start_backend.bat")
print()
print("服务地址: http://localhost:5000")
else:
print("✗ 错误: 未找到生成的exe文件")
else:
print("\n✗ 打包失败")
except Exception as e:
print(f"\n✗ 打包过程出错: {e}")
print()
input("按回车键退出...")
if __name__ == '__main__':
main()

View File

@ -1,7 +1,8 @@
[APP]
name = Body Balance Evaluation System
version = 1.0.0
debug = false
debug = True
log_level = INFO
[SERVER]
@ -10,7 +11,7 @@ port = 5000
cors_origins = *
[DATABASE]
path = backend/data/body_balance.db
path = data/body_balance.db
backup_interval = 24
max_backups = 7
@ -19,9 +20,11 @@ camera_index = 0
camera_width = 640
camera_height = 480
camera_fps = 30
imu_port = COM3
imu_port = COM4
imu_baudrate = 9600
pressure_port = COM4
[DETECTION]
default_duration = 60
sampling_rate = 30
@ -35,7 +38,12 @@ chart_dpi = 300
export_format = csv
[SECURITY]
secret_key = 9179711d0d1bed10e105f39c9210cce273cbd73f85fbdfcd41e2d1e20d5c50bd
secret_key = 79fcc4983d478c2ee672f3305d5e12c7c84fd1b58a18acb650e9f8125bfa805f
session_timeout = 3600
max_login_attempts = 5
[DEFAULT]
# FemtoBolt深度相机配置
femtobolt_color_resolution = 720P
femtobolt_depth_range_min = 1400
femtobolt_depth_range_max = 1900

View File

@ -43,7 +43,7 @@ from database import DatabaseManager
try:
import pykinect_azure as pykinect
# 重新启用FemtoBolt功能使用正确的Orbbec SDK K4A Wrapper路径
FEMTOBOLT_AVAILABLE = True
FEMTOBOLT_AVAILABLE = False
print("信息: pykinect_azure库已安装FemtoBolt深度相机功能已启用")
print("使用Orbbec SDK K4A Wrapper以确保与FemtoBolt设备的兼容性")
except ImportError:
@ -125,21 +125,38 @@ class DeviceManager:
# WebSocket连接用于推流
self.socketio = None
# 初始化设备
self._init_devices()
# 延迟设备初始化,避免启动时阻塞
# self._init_devices() # 注释掉自动初始化,改为按需初始化
def _init_devices(self):
"""初始化所有设备"""
# 分别初始化各个设备,单个设备失败不影响其他设备
try:
self._init_camera()
self._init_femtobolt_camera()
self._init_imu()
self._init_pressure_sensor()
logger.info('设备初始化完成')
except Exception as e:
logger.error(f'设备初始化失败: {e}')
logger.error(f'摄像头初始化失败: {e}')
try:
if FEMTOBOLT_AVAILABLE:
self._init_femtobolt_camera()
except Exception as e:
logger.error(f'FemtoBolt深度相机初始化失败: {e}')
try:
logger.error('IMU传感器初始化!!!!!!!!!!!!!!!!')
self._init_imu()
except Exception as e:
logger.error(f'IMU传感器初始化失败: {e}')
try:
self._init_pressure_sensor()
except Exception as e:
logger.error(f'压力传感器初始化失败: {e}')
logger.info('设备初始化完成(部分设备可能初始化失败)')
def _init_camera(self):
"""初始化足部监视摄像头"""
@ -157,23 +174,10 @@ class DeviceManager:
except Exception as e:
logger.warning(f'读取摄像头设备索引配置失败使用默认值0: {e}')
else:
logger.warning('数据库管理器未初始化使用默认摄像头索引0')
# 尝试连接指定索引的摄像头
# self.camera = cv2.VideoCapture(device_index)
# if self.camera.isOpened():
# # 设置摄像头参数
# self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
# self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
# self.camera.set(cv2.CAP_PROP_FPS, 30)
# # 设置缓冲区大小为1避免帧积累
# self.camera.set(cv2.CAP_PROP_BUFFERSIZE, 1)
logger.warning('数据库管理器未初始化使用默认摄像头索引0')
self.device_status['camera'] = True
# logger.info(f'摄像头初始化成功,设备索引: {device_index}')
# else:
# logger.warning(f'摄像头连接失败,设备索引: {device_index}')
# self.camera = None
except Exception as e:
logger.error(f'摄像头初始化异常: {e}')
self.camera = None
@ -206,7 +210,7 @@ class DeviceManager:
# 从config.ini读取配置
import configparser
config = configparser.ConfigParser()
config.read(os.path.join(os.path.dirname(__file__), '..', 'config.ini'))
config.read(os.path.join(os.path.dirname(__file__), 'config.ini'))
# color_res_str = config.get('DEFAULT', 'femtobolt_color_resolution', fallback='1080P')
# depth_range_min = config.getint('DEFAULT', 'femtobolt_depth_range_min', fallback=500)
# depth_range_max = config.getint('DEFAULT', 'femtobolt_depth_range_max', fallback=4500)
@ -283,7 +287,7 @@ class DeviceManager:
# 从config.ini读取串口配置
config = configparser.ConfigParser()
# 优先读取根目录config.ini否则读取backend/config.ini
root_config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
root_config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
app_root_config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
logger.debug(f'尝试读取配置文件: {root_config_path}, {app_root_config_path}')
@ -1921,7 +1925,7 @@ class DeviceManager:
logger.error(f'清理过期帧失败: {e}')
class RealIMUDevice:
"""真实IMU设备通过串口读取姿态数据"""
def __init__(self, port: str = 'COM7', baudrate: int = 9600):
def __init__(self, port: str = 'COM4', baudrate: int = 9600):
self.port = port
self.baudrate = baudrate
self.ser = None
@ -2286,9 +2290,9 @@ class VideoStreamManager:
"""加载RTSP配置"""
try:
config = configparser.ConfigParser()
config_path = os.path.join(os.path.dirname(__file__), '..', 'config.ini')
config_path = os.path.join(os.path.dirname(__file__), 'config.ini')
config.read(config_path, encoding='utf-8')
device_index_str = config.get('CAMERA', 'device_index', fallback='0')
device_index_str = config.get('DEVICES', 'camera_index', fallback='0')
self.device_index = int(device_index_str) if device_index_str else 0
logger.info(f'视频监控设备配置加载完成,设备号: {self.device_index}')
except Exception as e:

View File

@ -45,20 +45,27 @@ def main():
logger.info(f"工作目录: {os.getcwd()}")
# 导入并启动Flask应用
from app import app, socketio, init_app
from app_minimal import app, socketio, init_app
# 初始化应用
init_app()
# 启动服务器
logger.info("后端服务器启动在 http://localhost:5000")
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False,
allow_unsafe_werkzeug=True
)
if socketio is not None:
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False
)
else:
logger.info("使用基本Flask模式启动无SocketIO")
app.run(
host='0.0.0.0',
port=5000,
debug=False
)
except KeyboardInterrupt:
logger.info("用户中断,正在关闭服务器...")

View File

@ -2,6 +2,7 @@
Flask
Flask-CORS
Flask-SocketIO
eventlet
# Data processing and scientific computing
numpy

View File

@ -7,14 +7,14 @@ Flask-CORS==4.0.0
Flask-SocketIO==5.3.6
# Core dependencies
numpy==1.24.3
opencv-python==4.8.1.78
numpy>=1.24.0
opencv-python>=4.8.0
psutil==5.9.5
requests==2.31.0
python-dateutil==2.8.2
# PyInstaller for building
PyInstaller==6.2.0
PyInstaller>=6.10.0
# Optional - only if available
# pykinect_azure # Comment out if not available

View File

@ -1,58 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>Socket.IO Test</title>
<script src="https://cdn.socket.io/4.7.2/socket.io.min.js"></script>
</head>
<body>
<h1>Socket.IO Test Page</h1>
<div id="status">未连接</div>
<button onclick="sendStartVideo()">发送 start_video 事件</button>
<button onclick="sendTestEvent()">发送 test_event 事件</button>
<div id="messages"></div>
<script>
const socket = io('http://localhost:5001');
const statusDiv = document.getElementById('status');
const messagesDiv = document.getElementById('messages');
function addMessage(message) {
const div = document.createElement('div');
div.textContent = new Date().toLocaleTimeString() + ': ' + message;
messagesDiv.appendChild(div);
}
socket.on('connect', function() {
statusDiv.textContent = '已连接 - Socket ID: ' + socket.id;
addMessage('连接成功');
});
socket.on('disconnect', function() {
statusDiv.textContent = '已断开连接';
addMessage('连接断开');
});
socket.on('connect_response', function(data) {
addMessage('收到连接响应: ' + JSON.stringify(data));
});
socket.on('video_status', function(data) {
addMessage('收到视频状态: ' + JSON.stringify(data));
});
socket.on('test_response', function(data) {
addMessage('收到测试响应: ' + JSON.stringify(data));
});
function sendStartVideo() {
addMessage('发送 start_video 事件');
socket.emit('start_video', {});
}
function sendTestEvent() {
addMessage('发送 test_event 事件');
socket.emit('test_event', {});
}
</script>
</body>
</html>

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from flask import Flask
from flask_socketio import SocketIO, emit
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
app.config['SECRET_KEY'] = 'test-secret-key'
socketio = SocketIO(app, cors_allowed_origins='*', logger=True, engineio_logger=True)
@socketio.on('connect')
def handle_connect():
print('=== CLIENT CONNECTED ===', flush=True)
logger.info('客户端已连接')
emit('connect_response', {'message': '连接成功'})
@socketio.on('disconnect')
def handle_disconnect():
print('=== CLIENT DISCONNECTED ===', flush=True)
logger.info('客户端已断开连接')
@socketio.on('start_video')
def handle_start_video(data=None):
print('=== START VIDEO EVENT RECEIVED ===', flush=True)
print(f'Data: {data}', flush=True)
logger.info('=== START VIDEO EVENT RECEIVED ===')
logger.info(f'Data: {data}')
emit('video_status', {'status': 'received', 'message': 'start_video事件已接收'})
return {'status': 'success'}
@socketio.on('test_event')
def handle_test_event(data=None):
print('=== TEST EVENT RECEIVED ===', flush=True)
logger.info('=== TEST EVENT RECEIVED ===')
emit('test_response', {'message': 'Test event received'})
if __name__ == '__main__':
print('启动测试Socket.IO服务器...')
socketio.run(app, host='0.0.0.0', port=5001, debug=False)

View File

@ -30,7 +30,7 @@ class CameraViewer:
if __name__ == "__main__":
# 修改这里的数字可以切换不同摄像头设备
viewer = CameraViewer(device_index=3)
viewer = CameraViewer(device_index=0)
viewer.start_stream()
# import ctypes

View File

@ -19,11 +19,9 @@ camera_index = 0
camera_width = 640
camera_height = 480
camera_fps = 30
imu_port = COM9
imu_baudrate = 9600
imu_port = COM3
pressure_port = COM4
[DETECTION]
default_duration = 60
sampling_rate = 30
@ -37,15 +35,7 @@ chart_dpi = 300
export_format = csv
[SECURITY]
secret_key = 79fcc4983d478c2ee672f3305d5e12c7c84fd1b58a18acb650e9f8125bfa805f
secret_key = 4bfd3d60ddecef23973b64d761371610130034f23ce90dd6f405756ad2f89495
session_timeout = 3600
max_login_attempts = 5
[CAMERA]
device_index = 3
[DEFAULT]
# FemtoBolt深度相机配置
femtobolt_color_resolution = 720P
femtobolt_depth_range_min = 1400
femtobolt_depth_range_max = 1900

View File

@ -1,278 +0,0 @@
{
"app": {
"name": "身体平衡评估系统",
"version": "1.0.0",
"description": "基于多传感器融合技术的专业平衡能力评估与分析系统",
"author": "身体平衡评估系统开发团队",
"language": "zh-CN",
"theme": "light",
"auto_start": false,
"auto_update": true,
"window": {
"width": 1200,
"height": 800,
"min_width": 1000,
"min_height": 600,
"resizable": true,
"center": true,
"show": true,
"frame": true,
"transparent": false,
"always_on_top": false
}
},
"server": {
"host": "0.0.0.0",
"port": 5000,
"debug": false,
"cors": {
"enabled": true,
"origins": ["http://192.168.1.38:3000"]
},
"ssl": {
"enabled": false,
"cert_file": "",
"key_file": ""
},
"rate_limit": {
"enabled": true,
"requests_per_minute": 100
}
},
"database": {
"type": "sqlite",
"path": "backend/data/body_balance.db",
"backup": {
"enabled": true,
"interval_hours": 24,
"max_backups": 7
},
"connection": {
"timeout": 30,
"pool_size": 10
}
},
"devices": {
"camera": {
"enabled": true,
"device_id": 0,
"resolution": {
"width": 1280,
"height": 720
},
"fps": 30,
"format": "MJPG",
"auto_exposure": true,
"brightness": 0,
"contrast": 0,
"saturation": 0,
"calibration": {
"enabled": false,
"matrix": null,
"distortion": null
}
},
"imu": {
"enabled": true,
"port": "COM3",
"baudrate": 115200,
"timeout": 1.0,
"sample_rate": 100,
"range": {
"accelerometer": 16,
"gyroscope": 2000,
"magnetometer": 4800
},
"calibration": {
"enabled": false,
"offset": {
"accel": [0, 0, 0],
"gyro": [0, 0, 0],
"mag": [0, 0, 0]
},
"scale": {
"accel": [1, 1, 1],
"gyro": [1, 1, 1],
"mag": [1, 1, 1]
}
}
},
"pressure": {
"enabled": true,
"port": "COM4",
"baudrate": 9600,
"timeout": 1.0,
"sample_rate": 50,
"sensors": 4,
"range": {
"min": 0,
"max": 1000
},
"calibration": {
"enabled": false,
"zero_offset": [0, 0, 0, 0],
"scale_factor": [1, 1, 1, 1]
}
}
},
"detection": {
"default_duration": 60,
"min_duration": 10,
"max_duration": 300,
"sample_rate": 30,
"recording": {
"enabled": true,
"format": "mp4",
"quality": "medium",
"fps": 30
},
"analysis": {
"real_time": true,
"pose_detection": {
"enabled": true,
"model": "mediapipe",
"confidence": 0.5,
"tracking": true
},
"balance_metrics": {
"cop_analysis": true,
"sway_analysis": true,
"stability_index": true,
"frequency_analysis": true
},
"filters": {
"low_pass": {
"enabled": true,
"cutoff": 10.0
},
"median": {
"enabled": true,
"window_size": 5
}
}
},
"alerts": {
"enabled": true,
"thresholds": {
"excessive_sway": 50.0,
"instability": 0.8,
"device_disconnect": 5.0
}
}
},
"data": {
"storage": {
"base_path": "data",
"patients_path": "data/patients",
"sessions_path": "data/sessions",
"exports_path": "data/exports",
"backups_path": "data/backups",
"temp_path": "temp",
"logs_path": "logs"
},
"cleanup": {
"enabled": true,
"temp_files_days": 7,
"log_files_days": 30,
"session_files_days": 365
},
"compression": {
"enabled": true,
"algorithm": "gzip",
"level": 6
},
"export": {
"formats": ["csv", "json", "pdf"],
"include_raw_data": true,
"include_analysis": true,
"include_charts": true
}
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": {
"enabled": true,
"path": "logs/app.log",
"max_size": "10MB",
"backup_count": 5,
"rotation": "daily"
},
"console": {
"enabled": true,
"level": "INFO"
},
"modules": {
"flask": "WARNING",
"werkzeug": "WARNING",
"urllib3": "WARNING",
"matplotlib": "WARNING"
}
},
"security": {
"session": {
"timeout_minutes": 60,
"secret_key": "your-secret-key-here",
"secure_cookies": false
},
"api": {
"rate_limiting": true,
"request_timeout": 30,
"max_request_size": "100MB"
},
"data": {
"encryption": false,
"backup_encryption": false,
"anonymization": {
"enabled": false,
"fields": ["name", "phone", "email"]
}
}
},
"ui": {
"language": "zh-CN",
"theme": "light",
"animations": true,
"sound_effects": true,
"notifications": {
"enabled": true,
"position": "top-right",
"duration": 5000
},
"charts": {
"default_type": "line",
"colors": {
"primary": "#409EFF",
"success": "#67C23A",
"warning": "#E6A23C",
"danger": "#F56C6C",
"info": "#909399"
},
"animation_duration": 1000
},
"table": {
"page_size": 20,
"show_pagination": true,
"sortable": true,
"filterable": true
}
},
"performance": {
"monitoring": {
"enabled": true,
"interval_seconds": 60,
"metrics": ["cpu", "memory", "disk", "network"]
},
"optimization": {
"image_compression": true,
"data_caching": true,
"lazy_loading": true,
"batch_processing": true
},
"limits": {
"max_concurrent_sessions": 5,
"max_file_size_mb": 100,
"max_session_duration_minutes": 300
}
}
}

View File

@ -1,159 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
身体平衡评估系统 - 调试服务器启动脚本
这个脚本专门用于调试模式提供更好的调试体验
1. 支持IDE断点调试
2. 详细的错误信息输出
3. 热重载功能
4. 调试日志输出
"""
import os
import sys
import logging
import socket
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
sys.path.insert(0, str(project_root / 'backend'))
def setup_debug_logging():
"""设置调试日志"""
# 创建logs目录
logs_dir = project_root / 'logs'
logs_dir.mkdir(exist_ok=True)
# 配置日志格式
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.FileHandler(logs_dir / 'debug.log', encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
# 设置Flask和SocketIO的日志级别
logging.getLogger('werkzeug').setLevel(logging.DEBUG)
logging.getLogger('socketio').setLevel(logging.DEBUG)
logging.getLogger('engineio').setLevel(logging.DEBUG)
# 禁用第三方库的详细日志
logging.getLogger('PIL').setLevel(logging.WARNING)
logging.getLogger('PIL.PngImagePlugin').setLevel(logging.WARNING)
logging.getLogger('matplotlib').setLevel(logging.WARNING)
logging.getLogger('matplotlib.font_manager').setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.info('调试日志已启用')
return logger
def get_local_ip():
"""获取本机IP地址"""
try:
# 创建一个UDP socket连接到外部地址来获取本机IP
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def check_debug_environment():
"""检查调试环境"""
logger = logging.getLogger(__name__)
# 检查Python版本
if sys.version_info < (3, 8):
logger.error('需要Python 3.8或更高版本')
return False
# 检查必要文件
required_files = [
'backend/app.py',
'config.ini',
'backend/requirements.txt'
]
for file_path in required_files:
if not (project_root / file_path).exists():
logger.error(f'缺少必要文件: {file_path}')
return False
logger.info('调试环境检查通过')
return True
def start_debug_server():
"""启动调试服务器"""
logger = logging.getLogger(__name__)
try:
# 设置环境变量
os.environ['FLASK_ENV'] = 'development'
os.environ['FLASK_DEBUG'] = '1'
os.environ['PYTHONPATH'] = str(project_root)
# 导入Flask应用
from backend.app import app, socketio, init_app
# 初始化应用
logger.info('初始化应用...')
init_app()
# 获取本机IP地址
local_ip = get_local_ip()
# 启动调试服务器
logger.info('启动调试服务器...')
logger.info('调试模式已启用 - 可以在IDE中设置断点')
logger.info('本地访问: http://127.0.0.1:5000')
logger.info(f'远程访问: http://{local_ip}:5000')
logger.info('健康检查: http://127.0.0.1:5000/health')
logger.info('按 Ctrl+C 停止服务器')
# 启动SocketIO服务器支持调试和远程访问
socketio.run(
app,
host='0.0.0.0', # 允许所有IP访问
port=5000,
debug=True,
use_reloader=False, # 禁用热重载以避免FemtoBolt设备资源冲突
log_output=True, # 输出详细日志
allow_unsafe_werkzeug=True
)
except KeyboardInterrupt:
logger.info('服务器被用户中断')
except Exception as e:
logger.error(f'服务器启动失败: {e}', exc_info=True)
return False
return True
def main():
"""主函数"""
print('='*50)
print('身体平衡评估系统 - 调试模式')
print('='*50)
print()
# 设置调试日志
logger = setup_debug_logging()
# 检查环境
if not check_debug_environment():
input('按任意键退出...')
sys.exit(1)
# 启动调试服务器
success = start_debug_server()
if not success:
input('按任意键退出...')
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -7,7 +7,7 @@ const log = require('electron-log');
let mainWindow;
let pythonProcess;
const BACKEND_PORT = 5000;
const BACKEND_HOST = process.env.BACKEND_HOST || '0.0.0.0';
const BACKEND_HOST = process.env.BACKEND_HOST || '127.0.0.1';
const BACKEND_URL = `http://${BACKEND_HOST}:${BACKEND_PORT}`;
// 配置日志

View File

@ -16,7 +16,7 @@ api.interceptors.request.use(
if (window.electronAPI) {
config.baseURL = window.electronAPI.getBackendUrl()
} else {
config.baseURL = 'http://192.168.1.173:5000'
config.baseURL = 'http://localhost:5000'
}
@ -600,7 +600,7 @@ export const getBackendUrl = () => {
if (window.electronAPI) {
return window.electronAPI.getBackendUrl()
} else {
return 'http://192.168.1.173:5000'
return 'http://localhost:5000'
}
}

View File

@ -1,205 +0,0 @@
@echo off
chcp 65001 >nul
echo ====================================
echo 身体平衡评估系统 - 安装脚本
echo ====================================
echo.
echo 此脚本将帮助您安装所有必要的依赖和配置环境
echo.
:: 检查管理员权限(可选)
net session >nul 2>&1
if %errorlevel% == 0 (
echo [信息] 检测到管理员权限
) else (
echo [警告] 未检测到管理员权限,某些操作可能失败
)
echo.
:: 检查Python
echo [步骤 1/6] 检查Python环境...
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [错误] 未找到Python
echo [提示] 请从 https://www.python.org/downloads/ 下载并安装Python 3.8或更高版本
echo [提示] 安装时请勾选 "Add Python to PATH"
pause
exit /b 1
) else (
for /f "tokens=*" %%i in ('python --version') do echo [成功] 找到 %%i
)
:: 检查Python版本
for /f "tokens=2 delims= " %%i in ('python --version') do set python_version=%%i
for /f "tokens=1,2 delims=." %%a in ("%python_version%") do (
set major=%%a
set minor=%%b
)
if %major% lss 3 (
echo [错误] Python版本过低需要3.8或更高版本
pause
exit /b 1
)
if %major% equ 3 if %minor% lss 8 (
echo [错误] Python版本过低需要3.8或更高版本
pause
exit /b 1
)
echo.
:: 检查Node.js
echo [步骤 2/6] 检查Node.js环境...
node --version >nul 2>&1
if %errorlevel% neq 0 (
echo [错误] 未找到Node.js
echo [提示] 请从 https://nodejs.org/ 下载并安装Node.js 16.0或更高版本
pause
exit /b 1
) else (
for /f "tokens=*" %%i in ('node --version') do echo [成功] 找到Node.js %%i
for /f "tokens=*" %%i in ('npm --version') do echo [成功] 找到npm %%i
)
echo.
:: 创建虚拟环境
echo [步骤 3/6] 创建Python虚拟环境...
if exist "backend\venv" (
echo [信息] 虚拟环境已存在,跳过创建
else (
echo [信息] 正在创建虚拟环境...
python -m venv backend\venv
if %errorlevel% neq 0 (
echo [错误] 创建虚拟环境失败
pause
exit /b 1
)
echo [成功] 虚拟环境创建完成
)
:: 激活虚拟环境
echo [信息] 激活虚拟环境...
call backend\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [错误] 激活虚拟环境失败
pause
exit /b 1
)
echo [成功] 虚拟环境已激活
echo.
:: 升级pip
echo [步骤 4/6] 升级pip...
python -m pip install --upgrade pip
if %errorlevel% neq 0 (
echo [警告] pip升级失败继续安装
) else (
echo [成功] pip升级完成
)
echo.
:: 安装Python依赖
echo [步骤 5/6] 安装Python依赖...
if not exist "backend\requirements.txt" (
echo [错误] 未找到backend\requirements.txt文件
pause
exit /b 1
)
echo [信息] 正在安装Python包这可能需要几分钟...
pip install -r backend\requirements.txt
if %errorlevel% neq 0 (
echo [错误] 安装Python依赖失败
echo [提示] 请检查网络连接和requirements.txt文件
pause
exit /b 1
)
echo [成功] Python依赖安装完成
echo.
:: 安装前端依赖
echo [步骤 6/6] 安装前端依赖...
if exist "frontend\src\renderer\package.json" (
cd frontend\src\renderer
echo [信息] 正在安装前端包,这可能需要几分钟...
npm install
if %errorlevel% neq 0 (
echo [错误] 安装前端依赖失败
echo [提示] 请检查网络连接和package.json文件
cd ..\..\..
pause
exit /b 1
)
echo [成功] 前端依赖安装完成
cd ..\..\..
) else (
echo [警告] 未找到前端package.json文件跳过前端依赖安装
)
echo.
:: 创建目录结构
echo [信息] 创建目录结构...
if not exist "data" mkdir data
if not exist "data\patients" mkdir data\patients
if not exist "data\sessions" mkdir data\sessions
if not exist "data\exports" mkdir data\exports
if not exist "data\backups" mkdir data\backups
if not exist "logs" mkdir logs
if not exist "temp" mkdir temp
echo [成功] 目录结构创建完成
echo.
:: 检查配置文件
echo [信息] 检查配置文件...
if exist "config.json" (
echo [成功] 配置文件已存在
) else (
echo [信息] 配置文件已创建,使用默认配置
)
echo.
:: 运行测试(可选)
echo [信息] 运行基础测试...
echo [测试] 检查Python导入...
python -c "import sys; print('Python路径:', sys.executable)" 2>nul
if %errorlevel% neq 0 (
echo [警告] Python测试失败
) else (
echo [成功] Python测试通过
)
echo [测试] 检查主要依赖...
python -c "import flask, numpy, opencv-python, mediapipe; print('主要依赖检查通过')" 2>nul
if %errorlevel% neq 0 (
echo [警告] 依赖测试失败,某些功能可能不可用
) else (
echo [成功] 依赖测试通过
)
echo.
:: 安装完成
echo ====================================
echo 安装完成!
echo ====================================
echo.
echo [成功] 所有依赖已安装完成
echo [信息] 您现在可以使用以下命令启动应用:
echo.
echo 开发环境: start_dev.bat
echo 生产环境: start_prod.bat
echo 或直接运行: python main.py
echo.
echo [提示] 首次运行建议使用开发环境进行测试
echo [提示] 如遇到问题,请查看 README.md 文件
echo.
set /p choice="是否现在启动开发环境?(y/n): "
if /i "%choice%"=="y" (
echo.
echo [信息] 启动开发环境...
call start_dev.bat
) else (
echo.
echo [信息] 安装完成,您可以稍后手动启动应用
)
pause

200
install/README.md Normal file
View File

@ -0,0 +1,200 @@
# 最小功能测试框架
## 概述
这是一个最小功能测试框架,用于验证 Flask + SocketIO + threading 技术栈在打包成 exe 后的可用性。
## 功能特性
- ✅ Flask HTTP 服务
- ✅ SocketIO WebSocket 服务
- ✅ 强制使用 threading 异步模式
- ✅ 完整的前端测试界面
- ✅ HTTP API 测试
- ✅ WebSocket 连接测试
- ✅ 实时消息收发测试
- ✅ 系统信息显示
## 文件结构
```
install/
├── minimal_test_app.py # 主应用文件
├── build_minimal.py # 打包脚本
├── requirements_minimal.txt # 最小依赖列表
└── README.md # 说明文档
```
## 快速开始
### 1. 安装依赖
```bash
pip install -r requirements_minimal.txt
```
### 2. 测试运行
```bash
python minimal_test_app.py
```
然后在浏览器中访问 http://localhost:5000 进行测试。
### 3. 打包成 exe
```bash
python build_minimal.py
```
### 4. 测试 exe
打包完成后,运行:
```bash
# 方式1直接运行
dist/MinimalTestApp.exe
# 方式2使用脚本
dist/start_test.bat
```
## 测试步骤
### HTTP API 测试
1. 点击「测试 HTTP API」按钮
2. 确认返回成功响应
3. 检查响应数据包含服务器信息
### WebSocket 测试
1. 页面加载时会自动连接 WebSocket
2. 确认连接状态显示为「已连接」
3. 点击「发送测试消息」
4. 确认能收到服务器响应
5. 测试断开和重连功能
### 系统信息检查
- 服务器时间:确认时间正确
- SocketIO模式确认显示为 "threading"
- Flask版本确认版本信息
## 技术要点
### 异步模式选择
```python
# 强制使用 threading 模式,避免 eventlet/gevent 依赖问题
socketio = SocketIO(
app,
cors_allowed_origins='*',
async_mode='threading', # 关键配置
logger=False,
engineio_logger=False
)
```
### 打包配置
```python
# PyInstaller 隐藏导入配置
hiddenimports=[
'flask',
'flask_socketio',
'socketio',
'engineio',
'engineio.async_drivers.threading', # 关键threading 驱动
'socketio.namespace',
'dns',
'dns.resolver',
'dns.asyncresolver'
],
# 排除不需要的异步模式
excludes=[
'eventlet',
'gevent',
'gevent_uwsgi'
]
```
## 故障排除
### 常见问题
1. **"Invalid async_mode specified" 错误**
- 确认已安装所有依赖
- 检查 PyInstaller 隐藏导入配置
- 确认排除了不需要的异步模式
2. **WebSocket 连接失败**
- 检查防火墙设置
- 确认端口 5000 未被占用
- 查看控制台错误信息
3. **打包失败**
- 确认 PyInstaller 版本兼容
- 检查依赖版本冲突
- 查看详细错误输出
### 调试模式
如需调试,可以修改 `minimal_test_app.py`
```python
# 启用调试模式
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=True, # 启用调试
allow_unsafe_werkzeug=True
)
```
## 扩展指南
### 添加新功能
1. **新增 HTTP 路由**
```python
@app.route('/api/new-endpoint')
def new_endpoint():
return jsonify({'message': 'New endpoint'})
```
2. **新增 SocketIO 事件**
```python
@socketio.on('new_event')
def handle_new_event(data):
emit('response', {'status': 'received'})
```
3. **添加依赖**
- 更新 `requirements_minimal.txt`
- 更新 `build_minimal.py` 中的 `hiddenimports`
- 重新测试打包
### 渐进式集成
按照以下顺序逐步添加业务功能:
1. ✅ 基础 Flask + SocketIO当前阶段
2. 🔄 添加数据库支持
3. 🔄 添加文件操作
4. 🔄 添加外部库依赖
5. 🔄 添加硬件设备支持
每个阶段都要确保打包和运行正常,出现问题时更容易定位。
## 版本信息
- Flask: 2.3.3
- Flask-SocketIO: 5.3.6
- PyInstaller: 6.1.0
- Python: 3.8+
## 许可证
MIT License

224
install/build_minimal.py Normal file
View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最小功能测试框架打包脚本
用于将Flask + SocketIO应用打包成exe文件
"""
import os
import sys
import shutil
import subprocess
from pathlib import Path
def check_dependencies():
"""检查必需的依赖"""
print("检查依赖模块...")
required_modules = [
'flask',
'flask_socketio',
'socketio',
'engineio'
]
missing_modules = []
for module in required_modules:
try:
__import__(module)
print(f"{module}")
except ImportError:
print(f"{module} (缺失)")
missing_modules.append(module)
if missing_modules:
print(f"\n缺失模块: {', '.join(missing_modules)}")
print("请运行: pip install -r requirements_minimal.txt")
return False
print("✓ 所有依赖模块检查通过")
return True
def create_spec_file():
"""创建PyInstaller spec文件"""
spec_content = '''
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['minimal_test_app.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'flask',
'flask_socketio',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'socketio.namespace',
'dns',
'dns.resolver',
'dns.asyncresolver'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'eventlet',
'gevent',
'gevent_uwsgi'
],
noarchive=False,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='MinimalTestApp',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)
'''
with open('minimal_test.spec', 'w', encoding='utf-8') as f:
f.write(spec_content)
print("✓ 已创建 minimal_test.spec 文件")
def clean_build_dirs():
"""清理构建目录"""
dirs_to_clean = ['build', 'dist', '__pycache__']
for dir_name in dirs_to_clean:
if os.path.exists(dir_name):
try:
shutil.rmtree(dir_name)
print(f"✓ 清理目录: {dir_name}")
except Exception as e:
print(f"警告: 无法清理目录 {dir_name}: {e}")
def build_exe():
"""构建exe文件"""
print("\n开始构建exe文件...")
try:
# 使用PyInstaller构建
cmd = [sys.executable, '-m', 'PyInstaller', 'minimal_test.spec', '--clean']
print(f"执行命令: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("✓ 构建成功!")
return True
else:
print(f"✗ 构建失败")
print(f"错误输出: {result.stderr}")
return False
except Exception as e:
print(f"✗ 构建过程出错: {e}")
return False
def create_test_script():
"""创建测试脚本"""
test_script = '''
@echo off
echo 启动最小功能测试应用...
echo.
echo 测试说明:
echo 1. 应用启动后在浏览器中访问 http://localhost:5000
echo 2. 测试HTTP API和WebSocket功能
echo 3. 按Ctrl+C停止应用
echo.
"MinimalTestApp.exe"
pause
'''
with open('dist/start_test.bat', 'w', encoding='utf-8') as f:
f.write(test_script)
print("✓ 创建测试脚本: dist/start_test.bat")
def main():
"""主函数"""
print("="*60)
print("最小功能测试框架 - 打包工具")
print("="*60)
print()
# 检查当前目录
if not os.path.exists('minimal_test_app.py'):
print("✗ 错误: 找不到 minimal_test_app.py 文件")
print("请确保在正确的目录下运行此脚本")
input("按回车键退出...")
return
# 检查依赖
if not check_dependencies():
input("按回车键退出...")
return
print()
# 清理构建目录
print("清理构建目录...")
clean_build_dirs()
print()
# 创建spec文件
print("创建PyInstaller配置...")
create_spec_file()
print()
# 构建exe
if build_exe():
print()
print("后处理...")
# 检查生成的exe文件
exe_path = 'dist/MinimalTestApp.exe'
if os.path.exists(exe_path):
print(f"✓ exe文件位置: {exe_path}")
# 创建测试脚本
create_test_script()
print()
print("🎉 打包完成!")
print()
print("测试方式:")
print("1. 直接运行: dist/MinimalTestApp.exe")
print("2. 使用脚本: dist/start_test.bat")
print()
print("测试步骤:")
print("1. 启动应用")
print("2. 浏览器访问 http://localhost:5000")
print("3. 测试HTTP API和WebSocket功能")
print("4. 确认所有功能正常工作")
else:
print("✗ 错误: 未找到生成的exe文件")
else:
print("\n✗ 打包失败")
print()
input("按回车键退出...")
if __name__ == '__main__':
main()

53
install/minimal_test.spec Normal file
View File

@ -0,0 +1,53 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['minimal_test_app.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[
'flask',
'flask_socketio',
'socketio',
'engineio',
'engineio.async_drivers.threading',
'socketio.namespace',
'dns',
'dns.resolver',
'dns.asyncresolver'
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[
'eventlet',
'gevent',
'gevent_uwsgi'
],
noarchive=False,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='MinimalTestApp',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None
)

274
install/minimal_test_app.py Normal file
View File

@ -0,0 +1,274 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
最小功能测试框架 - Flask + SocketIO + threading
用于验证打包exe后HTTP和WebSocket服务的可用性
"""
import os
import sys
from flask import Flask, render_template_string, jsonify
from flask_socketio import SocketIO, emit
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 创建Flask应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'minimal-test-secret-key-2024'
# 初始化SocketIO强制使用threading模式
try:
logger.info("初始化SocketIOthreading模式...")
socketio = SocketIO(
app,
cors_allowed_origins='*',
async_mode='threading',
logger=False,
engineio_logger=False
)
logger.info(f"SocketIO初始化成功异步模式: {socketio.async_mode}")
except Exception as e:
logger.error(f"SocketIO初始化失败: {e}")
sys.exit(1)
# HTML测试页面模板
HTML_TEMPLATE = """
<!DOCTYPE html>
<html>
<head>
<title>最小功能测试 - Flask + SocketIO</title>
<meta charset="utf-8">
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.container { max-width: 800px; margin: 0 auto; }
.section { margin: 20px 0; padding: 20px; border: 1px solid #ddd; border-radius: 5px; }
.success { color: green; }
.error { color: red; }
.info { color: blue; }
button { padding: 10px 20px; margin: 5px; cursor: pointer; }
#messages { height: 200px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; background: #f9f9f9; }
.message { margin: 5px 0; }
</style>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.js"></script>
</head>
<body>
<div class="container">
<h1>最小功能测试框架</h1>
<p class="info">测试Flask HTTP服务和SocketIO WebSocket服务</p>
<!-- HTTP API测试 -->
<div class="section">
<h2>HTTP API 测试</h2>
<button onclick="testHttpApi()">测试 HTTP API</button>
<div id="http-result"></div>
</div>
<!-- WebSocket测试 -->
<div class="section">
<h2>WebSocket 测试</h2>
<button onclick="connectSocket()">连接 WebSocket</button>
<button onclick="disconnectSocket()">断开连接</button>
<button onclick="sendTestMessage()">发送测试消息</button>
<div>连接状态: <span id="connection-status" class="error">未连接</span></div>
<div id="messages"></div>
</div>
<!-- 系统信息 -->
<div class="section">
<h2>系统信息</h2>
<div id="system-info">
<p>服务器时间: {{ server_time }}</p>
<p>SocketIO模式: {{ socketio_mode }}</p>
<p>Flask版本: {{ flask_version }}</p>
</div>
</div>
</div>
<script>
let socket = null;
function addMessage(message, type = 'info') {
const messages = document.getElementById('messages');
const div = document.createElement('div');
div.className = `message ${type}`;
div.innerHTML = `[${new Date().toLocaleTimeString()}] ${message}`;
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
function testHttpApi() {
const resultDiv = document.getElementById('http-result');
resultDiv.innerHTML = '测试中...';
fetch('/api/test')
.then(response => response.json())
.then(data => {
resultDiv.innerHTML = `<span class="success"> HTTP API 正常</span><br>响应: ${JSON.stringify(data, null, 2)}`;
})
.catch(error => {
resultDiv.innerHTML = `<span class="error"> HTTP API 错误: ${error}</span>`;
});
}
function connectSocket() {
if (socket && socket.connected) {
addMessage('WebSocket已连接', 'info');
return;
}
socket = io();
socket.on('connect', function() {
document.getElementById('connection-status').innerHTML = '<span class="success">已连接</span>';
addMessage('WebSocket连接成功', 'success');
});
socket.on('disconnect', function() {
document.getElementById('connection-status').innerHTML = '<span class="error">未连接</span>';
addMessage('WebSocket连接断开', 'error');
});
socket.on('test_response', function(data) {
addMessage(`收到服务器响应: ${JSON.stringify(data)}`, 'success');
});
socket.on('server_message', function(data) {
addMessage(`服务器消息: ${data.message}`, 'info');
});
}
function disconnectSocket() {
if (socket) {
socket.disconnect();
socket = null;
}
}
function sendTestMessage() {
if (!socket || !socket.connected) {
addMessage('请先连接WebSocket', 'error');
return;
}
const testData = {
message: 'Hello from client',
timestamp: new Date().toISOString()
};
socket.emit('test_message', testData);
addMessage(`发送消息: ${JSON.stringify(testData)}`, 'info');
}
// 页面加载时自动连接
window.onload = function() {
connectSocket();
};
</script>
</body>
</html>
"""
# HTTP路由
@app.route('/')
def index():
"""主页面"""
import flask
return render_template_string(HTML_TEMPLATE,
server_time=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
socketio_mode=socketio.async_mode,
flask_version=flask.__version__
)
@app.route('/api/test')
def api_test():
"""HTTP API测试接口"""
return jsonify({
'status': 'success',
'message': 'HTTP API工作正常',
'timestamp': datetime.now().isoformat(),
'server_info': {
'socketio_mode': socketio.async_mode,
'working_directory': os.getcwd()
}
})
@app.route('/health')
def health_check():
"""健康检查接口"""
return jsonify({
'status': 'healthy',
'services': {
'http': 'running',
'websocket': 'running',
'socketio_mode': socketio.async_mode
},
'timestamp': datetime.now().isoformat()
})
# SocketIO事件处理
@socketio.on('connect')
def handle_connect():
"""客户端连接事件"""
logger.info(f"客户端连接: {request.sid if 'request' in globals() else 'unknown'}")
emit('server_message', {
'message': 'WebSocket连接成功',
'timestamp': datetime.now().isoformat(),
'socketio_mode': socketio.async_mode
})
@socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接事件"""
logger.info(f"客户端断开连接: {request.sid if 'request' in globals() else 'unknown'}")
@socketio.on('test_message')
def handle_test_message(data):
"""处理测试消息"""
logger.info(f"收到测试消息: {data}")
# 回复客户端
emit('test_response', {
'status': 'received',
'original_message': data,
'server_timestamp': datetime.now().isoformat(),
'socketio_mode': socketio.async_mode
})
def main():
"""主函数"""
logger.info("="*50)
logger.info("最小功能测试框架启动")
logger.info("="*50)
logger.info(f"工作目录: {os.getcwd()}")
logger.info(f"Python版本: {sys.version}")
logger.info(f"SocketIO异步模式: {socketio.async_mode}")
try:
# 启动服务器
logger.info("启动服务器 http://localhost:5000")
logger.info("按 Ctrl+C 停止服务器")
socketio.run(
app,
host='0.0.0.0',
port=5000,
debug=False,
allow_unsafe_werkzeug=True
)
except KeyboardInterrupt:
logger.info("用户中断,正在关闭服务器...")
except Exception as e:
logger.error(f"服务器启动失败: {e}")
input("按回车键退出...")
sys.exit(1)
if __name__ == '__main__':
main()

110
install/quick_start.bat Normal file
View File

@ -0,0 +1,110 @@
@echo off
echo ========================================
echo Minimal Test Framework - Quick Start
echo ========================================
echo.
:menu
echo Please select an option:
echo 1. Install dependencies
echo 2. Test run (development mode)
echo 3. Build exe
echo 4. Run built exe
echo 5. Show help
echo 0. Exit
echo.
set /p choice=Enter option (0-5):
if "%choice%"=="1" goto install_deps
if "%choice%"=="2" goto test_run
if "%choice%"=="3" goto build_exe
if "%choice%"=="4" goto run_exe
if "%choice%"=="5" goto show_help
if "%choice%"=="0" goto exit
echo Invalid option, please try again
echo.
goto menu
:install_deps
echo.
echo Installing dependencies...
pip install -r requirements_minimal.txt
if %errorlevel% equ 0 (
echo Dependencies installed successfully
) else (
echo Failed to install dependencies
)
echo.
pause
goto menu
:test_run
echo.
echo Starting test server...
echo After server starts, visit http://localhost:5000 in your browser
echo Press Ctrl+C to stop the server
echo.
python minimal_test_app.py
echo.
pause
goto menu
:build_exe
echo.
echo Starting build...
python build_minimal.py
echo.
pause
goto menu
:run_exe
echo.
if exist "dist\MinimalTestApp.exe" (
echo Starting built application...
echo After app starts, visit http://localhost:5000 in your browser
echo.
cd dist
MinimalTestApp.exe
cd ..
) else (
echo dist\MinimalTestApp.exe not found
echo Please run option 3 to build first
)
echo.
pause
goto menu
:show_help
echo.
echo ========================================
echo Usage Instructions
echo ========================================
echo.
echo 1. First time: select "1. Install dependencies"
echo 2. After installation: select "2. Test run" to verify
echo 3. After testing: select "3. Build exe"
echo 4. After building: select "4. Run built exe" to verify
echo.
echo Test Steps:
echo - HTTP API test: Click "Test HTTP API" button on page
echo - WebSocket test: Page auto-connects, can send test messages
echo - System info: Check if SocketIO mode shows "threading"
echo.
echo Files:
echo - minimal_test_app.py: Main application
echo - build_minimal.py: Build script
echo - requirements_minimal.txt: Dependencies
echo - README.md: Detailed documentation
echo.
echo Troubleshooting:
echo - If "Invalid async_mode specified" error occurs
echo check if dependencies are fully installed
echo - If WebSocket connection fails, check firewall settings
echo - See README.md for detailed instructions
echo.
pause
goto menu
:exit
echo Thank you for using!
exit /b 0

View File

@ -0,0 +1,11 @@
# 最小功能测试框架依赖
# 只包含Flask + SocketIO + threading的核心依赖
Flask==2.3.3
Flask-SocketIO==5.3.6
Werkzeug==2.3.7
python-socketio==5.8.0
python-engineio==4.7.1
# 打包工具
PyInstaller>=6.10.0

View File

@ -1,100 +0,0 @@
@echo off
echo ====================================
echo Body Balance Evaluation System - Debug Mode
echo ====================================
echo.
:: Check Python
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [Error] Python not found, please install Python 3.8 or higher
pause
exit /b 1
)
:: Show version info
echo [Info] Checking environment versions...
for /f "tokens=*" %%i in ('python --version') do echo Python: %%i
echo.
:: Check virtual environment
if not exist "backend\venv" (
echo [Info] Creating Python virtual environment...
python -m venv backend\venv
if %errorlevel% neq 0 (
echo [Error] Failed to create virtual environment
pause
exit /b 1
)
)
:: Activate virtual environment
echo [Info] Activating virtual environment...
call backend\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [Error] Failed to activate virtual environment
pause
exit /b 1
)
:: Install Python dependencies
echo [Info] Checking and installing Python dependencies...
if not exist "backend\requirements.txt" (
echo [Error] requirements.txt file not found
pause
exit /b 1
)
pip install -r backend\requirements.txt
if %errorlevel% neq 0 (
echo [Error] Failed to install Python dependencies
pause
exit /b 1
)
:: Create necessary directories
echo [Info] Creating necessary directories...
if not exist "data" mkdir data
if not exist "data\patients" mkdir data\patients
if not exist "data\sessions" mkdir data\sessions
if not exist "data\exports" mkdir data\exports
if not exist "data\backups" mkdir data\backups
if not exist "logs" mkdir logs
if not exist "temp" mkdir temp
:: Start application in debug mode
echo.
echo ====================================
echo Starting Debug Mode
echo ====================================
echo [Info] Starting backend server in debug mode...
echo [Info] Backend address: http://127.0.0.1:5000
echo [Info] Debug mode enabled - you can set breakpoints in your IDE
echo [Info] Press Ctrl+C to stop service
echo.
echo [Debug Tips]:
echo 1. Open your IDE (VS Code, PyCharm, etc.)
echo 2. Set breakpoints in backend\app.py or other Python files
echo 3. Attach debugger to the running process if needed
echo 4. Use browser dev tools for frontend debugging
echo.
:: Set environment variables for debugging
set FLASK_ENV=development
set FLASK_DEBUG=1
set PYTHONPATH=%cd%
:: Start main program with debug flags
python -u backend\app.py
if %errorlevel% neq 0 (
echo.
echo [Error] Application startup failed
echo [Tip] Please check error messages and fix issues
pause
exit /b 1
)
echo.
echo [Info] Debug session ended
pause

View File

@ -1,126 +0,0 @@
@echo off
echo ====================================
echo Body Balance Evaluation System - Development Mode
echo ====================================
echo.
:: Check Python
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [Error] Python not found, please install Python 3.8 or higher
pause
exit /b 1
)
:: Check Node.js
node --version >nul 2>&1
if %errorlevel% neq 0 (
echo [Error] Node.js not found, please install Node.js 16.0 or higher
pause
exit /b 1
)
:: Show version info
echo [Info] Checking environment versions...
for /f "tokens=*" %%i in ('python --version') do echo Python: %%i
for /f "tokens=*" %%i in ('node --version') do echo Node.js: %%i
for /f "tokens=*" %%i in ('npm --version') do echo npm: %%i
echo.
:: Check virtual environment
if not exist "backend\venv" (
echo [Info] Creating Python virtual environment...
python -m venv backend\venv
if %errorlevel% neq 0 (
echo [Error] Failed to create virtual environment
pause
exit /b 1
)
)
:: Activate virtual environment
echo [Info] Activating virtual environment...
call backend\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [Error] Failed to activate virtual environment
pause
exit /b 1
)
:: Install Python dependencies
echo [Info] Checking and installing Python dependencies...
if not exist "backend\requirements.txt" (
echo [Error] requirements.txt file not found
pause
exit /b 1
)
pip install -r backend\requirements.txt
if %errorlevel% neq 0 (
echo [Error] Failed to install Python dependencies
pause
exit /b 1
)
:: Install frontend dependencies
echo [Info] Checking and installing frontend dependencies...
if exist "frontend\src\renderer\package.json" (
cd frontend\src\renderer
if not exist "node_modules" (
echo [Info] Installing frontend dependencies...
npm install
if %errorlevel% neq 0 (
echo [Error] Failed to install frontend dependencies
cd ..\..\..
pause
exit /b 1
)
) else (
echo [Info] Frontend dependencies already exist, skipping installation
)
cd ..\..\..
) else (
echo [Warning] Frontend package.json file not found
)
:: Create necessary directories
echo [Info] Creating necessary directories...
if not exist "data" mkdir data
if not exist "data\patients" mkdir data\patients
if not exist "data\sessions" mkdir data\sessions
if not exist "data\exports" mkdir data\exports
if not exist "data\backups" mkdir data\backups
if not exist "logs" mkdir logs
if not exist "temp" mkdir temp
:: Check config file
if not exist "config.json" (
echo [Warning] config.json configuration file not found
echo [Info] Will use default configuration
)
:: Start application
echo.
echo ====================================
echo Starting Development Environment
echo ====================================
echo [Info] Starting backend server...
echo [Info] Backend address: http://127.0.0.1:5000
echo [Info] Frontend address: http://127.0.0.1:5173
echo [Info] Press Ctrl+C to stop service
echo.
:: Start main program
python backend\main.py --mode development --log-level DEBUG
if %errorlevel% neq 0 (
echo.
echo [Error] Application startup failed
echo [Tip] Please check error messages and fix issues
pause
exit /b 1
)
echo.
echo [Info] Application stopped
pause

View File

@ -1,126 +0,0 @@
@echo off
echo ====================================
echo 身体平衡评估系统 - 开发环境启动脚本
echo ====================================
echo.
:: 检查Python是否安装
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [错误] 未找到Python请先安装Python 3.8或更高版本
pause
exit /b 1
)
:: 检查Node.js是否安装
node --version >nul 2>&1
if %errorlevel% neq 0 (
echo [错误] 未找到Node.js请先安装Node.js 16.0或更高版本
pause
exit /b 1
)
:: 显示版本信息
echo [信息] 检查环境版本...
for /f "tokens=*" %%i in ('python --version') do echo Python: %%i
for /f "tokens=*" %%i in ('node --version') do echo Node.js: %%i
for /f "tokens=*" %%i in ('npm --version') do echo npm: %%i
echo.
:: 检查虚拟环境
if not exist "backend\venv" (
echo [信息] 创建Python虚拟环境...
python -m venv backend\venv
if %errorlevel% neq 0 (
echo [错误] 创建虚拟环境失败
pause
exit /b 1
)
)
:: 激活虚拟环境
echo [信息] 激活虚拟环境...
call backend\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [错误] 激活虚拟环境失败
pause
exit /b 1
)
:: 安装Python依赖
echo [信息] 检查并安装Python依赖...
if not exist "backend\requirements.txt" (
echo [错误] 未找到requirements.txt文件
pause
exit /b 1
)
pip install -r backend\requirements.txt
if %errorlevel% neq 0 (
echo [错误] 安装Python依赖失败
pause
exit /b 1
)
:: 安装前端依赖
echo [信息] 检查并安装前端依赖...
if exist "frontend\src\renderer\package.json" (
cd frontend\src\renderer
if not exist "node_modules" (
echo [信息] 安装前端依赖...
npm install
if %errorlevel% neq 0 (
echo [错误] 安装前端依赖失败
cd ..\..\..
pause
exit /b 1
)
) else (
echo [信息] 前端依赖已存在,跳过安装
)
cd ..\..\..
) else (
echo [警告] 未找到前端package.json文件
)
:: 创建必要的目录
echo [信息] 创建必要的目录...
if not exist "data" mkdir data
if not exist "data\patients" mkdir data\patients
if not exist "data\sessions" mkdir data\sessions
if not exist "data\exports" mkdir data\exports
if not exist "data\backups" mkdir data\backups
if not exist "logs" mkdir logs
if not exist "temp" mkdir temp
:: 检查配置文件
if not exist "config.json" (
echo [警告] 未找到config.json配置文件
echo [信息] 将使用默认配置
)
:: 启动应用
echo.
echo ====================================
echo 启动开发环境
echo ====================================
echo [信息] 启动后端服务器...
echo [信息] 后端地址: http://127.0.0.1:5000
echo [信息] 前端地址: http://127.0.0.1:5173
echo [信息] 按 Ctrl+C 停止服务
echo.
:: 启动主程序
python backend\main.py --mode development --log-level DEBUG
if %errorlevel% neq 0 (
echo.
echo [错误] 应用启动失败
echo [提示] 请检查错误信息并修复问题
pause
exit /b 1
)
echo.
echo [信息] 应用已停止
pause

View File

@ -1,53 +0,0 @@
@echo off
echo Starting Body Balance Evaluation System - Development Mode
echo.
:: Check Python
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo Error: Python not found
pause
exit /b 1
)
:: Check Node.js
node --version >nul 2>&1
if %errorlevel% neq 0 (
echo Error: Node.js not found
pause
exit /b 1
)
echo Environment check passed
echo.
:: Activate virtual environment
if exist "backend\venv\Scripts\activate.bat" (
echo Activating virtual environment...
call backend\venv\Scripts\activate.bat
) else (
echo Warning: Virtual environment not found
)
:: Install dependencies if needed
if exist "backend\requirements.txt" (
echo Installing Python dependencies...
pip install -r backend\requirements.txt
)
:: Create directories
if not exist "data" mkdir data
if not exist "logs" mkdir logs
:: Start application
echo.
echo Starting application...
echo Backend: http://127.0.0.1:5000
echo Frontend: http://127.0.0.1:5173
echo.
python backend\main.py --mode development --log-level DEBUG
echo.
echo Application stopped
pause

View File

@ -1,121 +0,0 @@
@echo off
chcp 65001 >nul
echo ====================================
echo 身体平衡评估系统 - 生产环境启动脚本
echo ====================================
echo.
:: 检查Python是否安装
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [错误] 未找到Python请先安装Python 3.8或更高版本
pause
exit /b 1
)
:: 显示版本信息
echo [信息] 检查环境版本...
for /f "tokens=*" %%i in ('python --version') do echo Python: %%i
echo.
:: 检查虚拟环境
if not exist "backend\venv" (
echo [错误] 未找到虚拟环境,请先运行 start_dev.bat 进行初始化
pause
exit /b 1
)
:: 激活虚拟环境
echo [信息] 激活虚拟环境...
call backend\venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [错误] 激活虚拟环境失败
pause
exit /b 1
)
:: 检查依赖
echo [信息] 检查依赖安装状态...
pip check >nul 2>&1
if %errorlevel% neq 0 (
echo [警告] 依赖检查失败,建议重新安装依赖
)
:: 检查前端构建
if exist "frontend\src\renderer\dist" (
echo [信息] 前端已构建
else (
echo [信息] 前端未构建,正在构建...
if exist "frontend\src\renderer\package.json" (
cd frontend\src\renderer
npm run build
if %errorlevel% neq 0 (
echo [错误] 前端构建失败
cd ..\..\..
pause
exit /b 1
)
cd ..\..\..
) else (
echo [警告] 未找到前端项目
)
)
:: 检查必要目录
echo [信息] 检查目录结构...
if not exist "data" mkdir data
if not exist "data\patients" mkdir data\patients
if not exist "data\sessions" mkdir data\sessions
if not exist "data\exports" mkdir data\exports
if not exist "data\backups" mkdir data\backups
if not exist "logs" mkdir logs
if not exist "temp" mkdir temp
:: 检查配置文件
if not exist "config.json" (
echo [错误] 未找到config.json配置文件
echo [提示] 请确保配置文件存在
pause
exit /b 1
)
:: 清理临时文件
echo [信息] 清理临时文件...
if exist "temp\*" del /q temp\*
if exist "logs\*.tmp" del /q logs\*.tmp
:: 备份数据(可选)
if exist "data\database.db" (
echo [信息] 备份数据库...
for /f "tokens=2 delims==" %%a in ('wmic OS Get localdatetime /value') do set "dt=%%a"
set "YY=%dt:~2,2%" & set "YYYY=%dt:~0,4%" & set "MM=%dt:~4,2%" & set "DD=%dt:~6,2%"
set "HH=%dt:~8,2%" & set "Min=%dt:~10,2%" & set "Sec=%dt:~12,2%"
set "datestamp=%YYYY%%MM%%DD%_%HH%%Min%%Sec%"
copy "data\database.db" "data\backups\database_backup_%datestamp%.db" >nul 2>&1
)
:: 启动应用
echo.
echo ====================================
echo 启动生产环境
echo ====================================
echo [信息] 启动应用服务器...
echo [信息] 服务地址: http://127.0.0.1:5000
echo [信息] 按 Ctrl+C 停止服务
echo [信息] 日志文件: logs\app.log
echo.
:: 启动主程序
python backend\main.py --mode production
if %errorlevel% neq 0 (
echo.
echo [错误] 应用启动失败
echo [提示] 请检查日志文件: logs\app.log
pause
exit /b 1
)
echo.
echo [信息] 应用已停止
pause

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
测试动态视频生成脚本
用于验证RTSP帧是否真的在变化
"""
import cv2
import numpy as np
import time
from datetime import datetime
def create_test_video_source():
"""
创建一个测试视频源生成动态变化的图像
"""
# 创建一个640x480的黑色背景
width, height = 640, 480
frame_count = 0
while True:
# 创建黑色背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
# 添加动态元素
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
# 添加时间戳
cv2.putText(frame, timestamp, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 添加帧计数
cv2.putText(frame, f'Frame: {frame_count}', (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
# 添加移动的圆形
center_x = int(320 + 200 * np.sin(frame_count * 0.1))
center_y = int(240 + 100 * np.cos(frame_count * 0.1))
cv2.circle(frame, (center_x, center_y), 30, (255, 0, 0), -1)
# 添加变化的矩形
rect_size = int(50 + 30 * np.sin(frame_count * 0.05))
cv2.rectangle(frame, (500, 200), (500 + rect_size, 200 + rect_size), (0, 0, 255), -1)
# 添加随机噪点
noise = np.random.randint(0, 50, (height, width, 3), dtype=np.uint8)
frame = cv2.add(frame, noise)
frame_count += 1
yield frame
time.sleep(1/30) # 30 FPS
def test_rtsp_replacement():
"""
测试用动态视频源替换RTSP
"""
print("开始生成测试视频源...")
print("'q' 键退出")
video_source = create_test_video_source()
for frame in video_source:
cv2.imshow('Test Video Source', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cv2.destroyAllWindows()
if __name__ == '__main__':
test_rtsp_replacement()

View File

@ -1,41 +0,0 @@
@echo off
echo Testing start_dev.bat functionality
echo.
:: Check if virtual environment exists
if exist "backend\venv\Scripts\activate.bat" (
echo Virtual environment found
call backend\venv\Scripts\activate.bat
echo Virtual environment activated
) else (
echo Virtual environment not found
)
:: Check if main.py exists
if exist "backend\main.py" (
echo main.py found
) else (
echo main.py not found
)
:: Test basic Python execution
echo Testing Python execution...
python --version
if %errorlevel% equ 0 (
echo Python is working
) else (
echo Python test failed
)
:: Test Node.js
echo Testing Node.js...
node --version
if %errorlevel% equ 0 (
echo Node.js is working
) else (
echo Node.js test failed
)
echo.
echo Test completed
pause

View File

@ -1,53 +0,0 @@
import socketio
import time
# 创建SocketIO客户端
sio = socketio.SimpleClient()
print('连接WebSocket并监听事件...')
try:
# 连接到服务器
print('正在连接到192.168.1.173:5000...')
sio.connect('http://192.168.1.173:5000', wait_timeout=10)
print('WebSocket连接成功!')
# 发送启动视频流事件
sio.emit('start_video')
print('已发送start_video事件等待5秒接收数据...')
# 等待并接收事件
for i in range(5):
try:
# 接收事件
event = sio.receive(timeout=1)
if event:
event_name, data = event
print(f'收到事件: {event_name}, 数据类型: {type(data)}')
if event_name == 'video_frame' and isinstance(data, dict) and 'image' in data:
pass # 图像数据已接收
elif event_name == 'video_status':
print(f'视频状态: {data}')
except socketio.exceptions.TimeoutError:
print(f'等待事件超时 ({i+1}/5)')
# 发送停止视频流事件
sio.emit('stop_video')
print('已发送stop_video事件')
# 等待停止状态事件
try:
event = sio.receive(timeout=2)
if event:
event_name, data = event
print(f'收到停止事件: {event_name}, 数据: {data}')
except socketio.exceptions.TimeoutError:
print('等待停止事件超时')
sio.disconnect()
print('WebSocket连接已断开')
except Exception as e:
print(f'测试过程中发生错误: {e}')
print('WebSocket测试完成')