BodyBalanceEvaluation/backend/devices/config_api.py

274 lines
9.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
设备配置HTTP API接口
提供通过HTTP方式设置设备参数的功能
"""
import json
import logging
import sys
import os
from flask import Flask, request, jsonify
from typing import Dict, Any
# 添加路径以支持导入
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from utils.config_manager import ConfigManager
class ConfigAPI:
"""配置API类"""
def __init__(self, app: Flask = None):
"""
初始化配置API
Args:
app: Flask应用实例
"""
self.logger = logging.getLogger(f"{__name__}.ConfigAPI")
self.config_manager = ConfigManager()
if app:
self.init_app(app)
def init_app(self, app: Flask):
"""
初始化Flask应用
Args:
app: Flask应用实例
"""
self.app = app
self._register_routes()
def _register_routes(self):
"""
注册路由
"""
# 获取所有设备配置
@self.app.route('/api/config/devices', methods=['GET'])
def get_all_device_configs():
"""获取所有设备配置"""
try:
configs = self.config_manager.get_all_device_configs()
return jsonify({
'success': True,
'data': configs
})
except Exception as e:
self.logger.error(f"获取设备配置失败: {e}")
return jsonify({
'success': False,
'message': f'获取设备配置失败: {str(e)}'
}), 500
# 获取单个设备配置
@self.app.route('/api/config/devices/<device_name>', methods=['GET'])
def get_device_config(device_name: str):
"""获取单个设备配置"""
try:
if device_name not in ['imu', 'pressure', 'camera', 'femtobolt']:
return jsonify({
'success': False,
'message': f'不支持的设备类型: {device_name}'
}), 400
config = self.config_manager.get_device_config(device_name)
return jsonify({
'success': True,
'data': config
})
except Exception as e:
self.logger.error(f"获取{device_name}配置失败: {e}")
return jsonify({
'success': False,
'message': f'获取{device_name}配置失败: {str(e)}'
}), 500
# 设置IMU配置
@self.app.route('/api/config/devices/imu', methods=['POST'])
def set_imu_config():
"""设置IMU配置"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
result = self.config_manager.set_imu_config(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"设置IMU配置失败: {e}")
return jsonify({
'success': False,
'message': f'设置IMU配置失败: {str(e)}'
}), 500
# 设置压力板配置
@self.app.route('/api/config/devices/pressure', methods=['POST'])
def set_pressure_config():
"""设置压力板配置"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
result = self.config_manager.set_pressure_config(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"设置压力板配置失败: {e}")
return jsonify({
'success': False,
'message': f'设置压力板配置失败: {str(e)}'
}), 500
# 设置相机配置
@self.app.route('/api/config/devices/camera', methods=['POST'])
def set_camera_config():
"""设置相机配置"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
result = self.config_manager.set_camera_config(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"设置相机配置失败: {e}")
return jsonify({
'success': False,
'message': f'设置相机配置失败: {str(e)}'
}), 500
# 设置FemtoBolt配置
@self.app.route('/api/config/devices/femtobolt', methods=['POST'])
def set_femtobolt_config():
"""设置FemtoBolt配置"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
result = self.config_manager.set_femtobolt_config(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"设置FemtoBolt配置失败: {e}")
return jsonify({
'success': False,
'message': f'设置FemtoBolt配置失败: {str(e)}'
}), 500
# 批量设置所有设备配置
@self.app.route('/api/config/devices/all', methods=['POST'])
def set_all_device_configs():
"""批量设置所有设备配置"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'message': '请求数据不能为空'
}), 400
# 验证数据格式
supported_devices = ['imu', 'pressure', 'camera', 'femtobolt']
for device_name in data.keys():
if device_name not in supported_devices:
return jsonify({
'success': False,
'message': f'不支持的设备类型: {device_name},支持的设备类型: {", ".join(supported_devices)}'
}), 400
result = self.config_manager.set_all_device_configs(data)
status_code = 200 if result['success'] else 400
return jsonify(result), status_code
except Exception as e:
self.logger.error(f"批量设置设备配置失败: {e}")
return jsonify({
'success': False,
'message': f'批量设置设备配置失败: {str(e)}'
}), 500
# 重新加载配置
@self.app.route('/api/config/reload', methods=['POST'])
def reload_config():
"""重新加载配置"""
try:
self.config_manager.reload_config()
return jsonify({
'success': True,
'message': '配置重新加载成功'
})
except Exception as e:
self.logger.error(f"重新加载配置失败: {e}")
return jsonify({
'success': False,
'message': f'重新加载配置失败: {str(e)}'
}), 500
# 验证配置
@self.app.route('/api/config/validate', methods=['GET'])
def validate_config():
"""验证配置"""
try:
result = self.config_manager.validate_config()
return jsonify({
'success': True,
'data': result
})
except Exception as e:
self.logger.error(f"验证配置失败: {e}")
return jsonify({
'success': False,
'message': f'验证配置失败: {str(e)}'
}), 500
# 创建独立的Flask应用用于测试
def create_config_app():
"""
创建配置API应用
Returns:
Flask: Flask应用实例
"""
app = Flask(__name__)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 初始化配置API
config_api = ConfigAPI(app)
return app
if __name__ == '__main__':
# 创建并运行应用
app = create_config_app()
app.run(host='0.0.0.0', port=5002, debug=True)