Merge branch 'main' of http://121.37.111.42:3000/ThbTech/BodyBalanceEvaluation
This commit is contained in:
commit
aa7000c0e2
@ -1,274 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
设备配置HTTP API接口
|
||||
提供通过HTTP方式设置设备参数的功能
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
from flask import Flask, request, jsonify
|
||||
from typing import Dict, Any
|
||||
|
||||
# 添加路径以支持导入
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
from utils.config_manager import ConfigManager
|
||||
|
||||
|
||||
class ConfigAPI:
|
||||
"""配置API类"""
|
||||
|
||||
def __init__(self, app: Flask = None):
|
||||
"""
|
||||
初始化配置API
|
||||
|
||||
Args:
|
||||
app: Flask应用实例
|
||||
"""
|
||||
self.logger = logging.getLogger(f"{__name__}.ConfigAPI")
|
||||
self.config_manager = ConfigManager()
|
||||
|
||||
if app:
|
||||
self.init_app(app)
|
||||
|
||||
def init_app(self, app: Flask):
|
||||
"""
|
||||
初始化Flask应用
|
||||
|
||||
Args:
|
||||
app: Flask应用实例
|
||||
"""
|
||||
self.app = app
|
||||
self._register_routes()
|
||||
|
||||
def _register_routes(self):
|
||||
"""
|
||||
注册路由
|
||||
"""
|
||||
# 获取所有设备配置
|
||||
@self.app.route('/api/config/devices', methods=['GET'])
|
||||
def get_all_device_configs():
|
||||
"""获取所有设备配置"""
|
||||
try:
|
||||
configs = self.config_manager.get_all_device_configs()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': configs
|
||||
})
|
||||
except Exception as e:
|
||||
self.logger.error(f"获取设备配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'获取设备配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 获取单个设备配置
|
||||
@self.app.route('/api/config/devices/<device_name>', methods=['GET'])
|
||||
def get_device_config(device_name: str):
|
||||
"""获取单个设备配置"""
|
||||
try:
|
||||
if device_name not in ['imu', 'pressure', 'camera', 'femtobolt']:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'不支持的设备类型: {device_name}'
|
||||
}), 400
|
||||
|
||||
config = self.config_manager.get_device_config(device_name)
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': config
|
||||
})
|
||||
except Exception as e:
|
||||
self.logger.error(f"获取{device_name}配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'获取{device_name}配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 设置IMU配置
|
||||
@self.app.route('/api/config/devices/imu', methods=['POST'])
|
||||
def set_imu_config():
|
||||
"""设置IMU配置"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_imu_config(data)
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"设置IMU配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'设置IMU配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 设置压力板配置
|
||||
@self.app.route('/api/config/devices/pressure', methods=['POST'])
|
||||
def set_pressure_config():
|
||||
"""设置压力板配置"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_pressure_config(data)
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"设置压力板配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'设置压力板配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 设置相机配置
|
||||
@self.app.route('/api/config/devices/camera', methods=['POST'])
|
||||
def set_camera_config():
|
||||
"""设置相机配置"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_camera_config(data)
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"设置相机配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'设置相机配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 设置FemtoBolt配置
|
||||
@self.app.route('/api/config/devices/femtobolt', methods=['POST'])
|
||||
def set_femtobolt_config():
|
||||
"""设置FemtoBolt配置"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_femtobolt_config(data)
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"设置FemtoBolt配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'设置FemtoBolt配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 批量设置所有设备配置
|
||||
@self.app.route('/api/config/devices/all', methods=['POST'])
|
||||
def set_all_device_configs():
|
||||
"""批量设置所有设备配置"""
|
||||
try:
|
||||
data = request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
# 验证数据格式
|
||||
supported_devices = ['imu', 'pressure', 'camera', 'femtobolt']
|
||||
for device_name in data.keys():
|
||||
if device_name not in supported_devices:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'不支持的设备类型: {device_name},支持的设备类型: {", ".join(supported_devices)}'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_all_device_configs(data)
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"批量设置设备配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'批量设置设备配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 重新加载配置
|
||||
@self.app.route('/api/config/reload', methods=['POST'])
|
||||
def reload_config():
|
||||
"""重新加载配置"""
|
||||
try:
|
||||
self.config_manager.reload_config()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'message': '配置重新加载成功'
|
||||
})
|
||||
except Exception as e:
|
||||
self.logger.error(f"重新加载配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'重新加载配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
# 验证配置
|
||||
@self.app.route('/api/config/validate', methods=['GET'])
|
||||
def validate_config():
|
||||
"""验证配置"""
|
||||
try:
|
||||
result = self.config_manager.validate_config()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': result
|
||||
})
|
||||
except Exception as e:
|
||||
self.logger.error(f"验证配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'验证配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
|
||||
# 创建独立的Flask应用用于测试
|
||||
def create_config_app():
|
||||
"""
|
||||
创建配置API应用
|
||||
|
||||
Returns:
|
||||
Flask: Flask应用实例
|
||||
"""
|
||||
app = Flask(__name__)
|
||||
|
||||
# 配置日志
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
# 初始化配置API
|
||||
config_api = ConfigAPI(app)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 创建并运行应用
|
||||
app = create_config_app()
|
||||
app.run(host='0.0.0.0', port=5002, debug=True)
|
@ -29,8 +29,8 @@ depth_range_max = 1700
|
||||
|
||||
[DEVICES]
|
||||
imu_device_type = real
|
||||
imu_port = COM7
|
||||
imu_baudrate = 9600
|
||||
imu_port = COM3
|
||||
imu_baudrate = 115200
|
||||
pressure_device_type = real
|
||||
pressure_use_mock = False
|
||||
pressure_port = COM5
|
||||
|
@ -686,6 +686,79 @@ class AppServer:
|
||||
self.logger.error(f'刷新设备失败: {e}')
|
||||
return jsonify({'success': False, 'error': str(e)}), 500
|
||||
|
||||
# ==================== 设备配置API ====================
|
||||
|
||||
@self.app.route('/api/config/devices', methods=['GET'])
|
||||
def get_all_device_configs():
|
||||
"""获取所有设备配置"""
|
||||
try:
|
||||
if self.config_manager:
|
||||
configs = self.config_manager.get_all_device_configs()
|
||||
return jsonify({
|
||||
'success': True,
|
||||
'data': configs
|
||||
})
|
||||
else:
|
||||
return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500
|
||||
except Exception as e:
|
||||
self.logger.error(f"获取设备配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'获取设备配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@self.app.route('/api/config/devices/all', methods=['POST'])
|
||||
def set_all_device_configs():
|
||||
"""批量设置所有设备配置"""
|
||||
try:
|
||||
if not self.config_manager:
|
||||
return jsonify({'success': False, 'error': '配置管理器未初始化'}), 500
|
||||
|
||||
data = flask_request.get_json()
|
||||
if not data:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': '请求数据不能为空'
|
||||
}), 400
|
||||
|
||||
# 验证数据格式
|
||||
supported_devices = ['imu', 'pressure', 'camera', 'femtobolt']
|
||||
for device_name in data.keys():
|
||||
if device_name not in supported_devices:
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'不支持的设备类型: {device_name},支持的设备类型: {", ".join(supported_devices)}'
|
||||
}), 400
|
||||
|
||||
result = self.config_manager.set_all_device_configs(data)
|
||||
|
||||
# 如果配置设置成功,重启设备数据推送
|
||||
if result['success']:
|
||||
try:
|
||||
self.logger.info("设备配置更新成功,重启设备数据推送...")
|
||||
# 先停止当前的数据推送
|
||||
if self.is_pushing_data:
|
||||
self.stop_device_push_data()
|
||||
time.sleep(1) # 等待停止完成
|
||||
|
||||
# 重新启动设备数据推送
|
||||
self.start_device_push_data()
|
||||
result['message'] = result.get('message', '') + ' 设备已重启数据推送。'
|
||||
self.logger.info("设备配置更新并重启数据推送完成")
|
||||
except Exception as restart_error:
|
||||
self.logger.error(f"重启设备数据推送失败: {restart_error}")
|
||||
result['message'] = result.get('message', '') + f' 但重启设备数据推送失败: {str(restart_error)}'
|
||||
|
||||
status_code = 200 if result['success'] else 400
|
||||
return jsonify(result), status_code
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"批量设置设备配置失败: {e}")
|
||||
return jsonify({
|
||||
'success': False,
|
||||
'message': f'批量设置设备配置失败: {str(e)}'
|
||||
}), 500
|
||||
|
||||
@self.app.route('/api/devices/calibrate', methods=['POST'])
|
||||
def calibrate_device():
|
||||
"""校准设备"""
|
||||
|
Loading…
Reference in New Issue
Block a user