BodyBalanceEvaluation/backend/devices/utils/config_manager.py

531 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
配置管理工具
提供设备配置的统一管理和读取
"""
import os
import configparser
import json
import logging
from typing import Dict, Any, Optional, Union
from pathlib import Path
class ConfigManager:
"""配置管理器"""
def __init__(self, config_path: Optional[str] = None):
"""
初始化配置管理器
Args:
config_path: 配置文件路径默认为backend/config.ini
"""
self.logger = logging.getLogger(f"{__name__}.ConfigManager")
# 确定配置文件路径
if config_path:
self.config_path = config_path
else:
# 自动查找配置文件
self.config_path = self._find_config_file()
self.config = configparser.ConfigParser()
self._device_configs = {}
self._load_config()
def _find_config_file(self) -> str:
"""
查找配置文件
Returns:
str: 配置文件路径
"""
import sys
# 可能的配置文件路径
possible_paths = []
# 如果是打包后的exe文件从exe文件同级目录获取
if getattr(sys, 'frozen', False):
# 打包后的exe文件路径
exe_dir = os.path.dirname(sys.executable)
possible_paths.append(os.path.join(exe_dir, 'config.ini'))
# 开发环境下的路径
possible_paths.extend([
os.path.join(os.path.dirname(__file__), 'config.ini'),
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.ini'), # backend/config.ini
os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))), 'config.ini') # 项目根目录/config.ini
])
for path in possible_paths:
abs_path = os.path.abspath(path)
if os.path.exists(abs_path):
self.logger.info(f"找到配置文件: {abs_path}")
return abs_path
# 如果都找不到返回默认路径exe同级目录或backend目录
if getattr(sys, 'frozen', False):
default_path = os.path.join(os.path.dirname(sys.executable), 'config.ini')
else:
default_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'config.ini')
self.logger.warning(f"未找到配置文件,使用默认路径: {default_path}")
return default_path
def _load_config(self):
"""
加载配置文件
"""
try:
if os.path.exists(self.config_path):
self.config.read(self.config_path, encoding='utf-8')
self.logger.info(f"成功加载配置文件: {self.config_path}")
else:
self.logger.warning(f"配置文件不存在: {self.config_path}")
self._create_default_config()
except Exception as e:
self.logger.error(f"加载配置文件失败: {e}")
self._create_default_config()
def _create_default_config(self):
"""
创建默认配置
"""
self.config.clear()
# 默认设备配置
self.config['DEVICES'] = {
'imu_port': 'COM7',
'imu_baudrate': '9600',
'pressure_port': 'COM8',
'pressure_baudrate': '115200'
}
# 默认相机配置
self.config['CAMERA'] = {
'device_index': '0',
'width': '1280',
'height': '720',
'fps': '30'
}
# 默认FemtoBolt配置
self.config['FEMTOBOLT'] = {
'color_resolution': '1080P',
'depth_mode': 'NFOV_UNBINNED',
'fps': '15',
'depth_range_min': '500',
'depth_range_max': '4500'
}
# 默认系统配置
self.config['SYSTEM'] = {
'log_level': 'INFO',
'max_cache_size': '10',
'cache_timeout': '5.0'
}
self.logger.info("创建默认配置")
def get_device_config(self, device_name: str) -> Dict[str, Any]:
"""
获取设备配置
Args:
device_name: 设备名称 (camera, femtobolt, imu, pressure)
Returns:
Dict[str, Any]: 设备配置字典
"""
if device_name in self._device_configs:
return self._device_configs[device_name].copy()
config = {}
if device_name == 'camera':
config = self._get_camera_config()
elif device_name == 'femtobolt':
config = self._get_femtobolt_config()
elif device_name == 'imu':
config = self._get_imu_config()
elif device_name == 'pressure':
config = self._get_pressure_config()
else:
self.logger.warning(f"未知设备类型: {device_name}")
# 缓存配置
self._device_configs[device_name] = config
return config.copy()
def _get_camera_config(self) -> Dict[str, Any]:
"""
获取相机配置
Returns:
Dict[str, Any]: 相机配置
"""
return {
'device_index': self.config.getint('CAMERA', 'device_index', fallback=0),
'width': self.config.getint('CAMERA', 'width', fallback=1280),
'height': self.config.getint('CAMERA', 'height', fallback=720),
'fps': self.config.getint('CAMERA', 'fps', fallback=30),
'buffer_size': self.config.getint('CAMERA', 'buffer_size', fallback=1),
'fourcc': self.config.get('CAMERA', 'fourcc', fallback='MJPG')
}
def _get_femtobolt_config(self) -> Dict[str, Any]:
"""
获取FemtoBolt配置
Returns:
Dict[str, Any]: FemtoBolt配置
"""
return {
'color_resolution': self.config.get('FEMTOBOLT', 'color_resolution', fallback='1080P'),
'depth_mode': self.config.get('FEMTOBOLT', 'depth_mode', fallback='NFOV_UNBINNED'),
'fps': self.config.getint('FEMTOBOLT', 'fps', fallback=15),
'depth_range_min': self.config.getint('FEMTOBOLT', 'depth_range_min', fallback=500),
'depth_range_max': self.config.getint('FEMTOBOLT', 'depth_range_max', fallback=4500),
'synchronized_images_only': self.config.getboolean('FEMTOBOLT', 'synchronized_images_only', fallback=False)
}
def _get_imu_config(self) -> Dict[str, Any]:
"""
获取IMU配置
Returns:
Dict[str, Any]: IMU配置
"""
return {
'device_type': self.config.get('DEVICES', 'imu_device_type', fallback='mock'),
'port': self.config.get('DEVICES', 'imu_port', fallback='COM7'),
'baudrate': self.config.getint('DEVICES', 'imu_baudrate', fallback=9600),
'timeout': self.config.getfloat('DEVICES', 'imu_timeout', fallback=1.0),
'calibration_samples': self.config.getint('DEVICES', 'imu_calibration_samples', fallback=100),
}
def _get_pressure_config(self) -> Dict[str, Any]:
"""
获取压力传感器配置
Returns:
Dict[str, Any]: 压力传感器配置
"""
return {
'device_type': self.config.get('DEVICES', 'pressure_device_type', fallback='mock'),
'port': self.config.get('DEVICES', 'pressure_port', fallback='COM8'),
'baudrate': self.config.getint('DEVICES', 'pressure_baudrate', fallback=115200),
'timeout': self.config.getfloat('DEVICES', 'pressure_timeout', fallback=1.0),
'calibration_samples': self.config.getint('DEVICES', 'pressure_calibration_samples', fallback=50)
}
def get_system_config(self) -> Dict[str, Any]:
"""
获取系统配置
Returns:
Dict[str, Any]: 系统配置
"""
return {
'log_level': self.config.get('SYSTEM', 'log_level', fallback='INFO'),
'max_cache_size': self.config.getint('SYSTEM', 'max_cache_size', fallback=10),
'cache_timeout': self.config.getfloat('SYSTEM', 'cache_timeout', fallback=5.0),
'heartbeat_interval': self.config.getfloat('SYSTEM', 'heartbeat_interval', fallback=30.0)
}
def get_config_value(self, section: str, key: str, fallback: Any = None) -> Any:
"""
获取配置值
Args:
section: 配置段
key: 配置键
fallback: 默认值
Returns:
Any: 配置值
"""
try:
if isinstance(fallback, bool):
return self.config.getboolean(section, key, fallback=fallback)
elif isinstance(fallback, int):
return self.config.getint(section, key, fallback=fallback)
elif isinstance(fallback, float):
return self.config.getfloat(section, key, fallback=fallback)
else:
return self.config.get(section, key, fallback=fallback)
except Exception as e:
self.logger.warning(f"获取配置值失败 [{section}][{key}]: {e}")
return fallback
def set_config_value(self, section: str, key: str, value: Any):
"""
设置配置值
Args:
section: 配置段
key: 配置键
value: 配置值
"""
if not self.config.has_section(section):
self.config.add_section(section)
self.config.set(section, key, str(value))
# 清除缓存
self._device_configs.clear()
def save_config(self):
"""
保存配置到文件
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, 'w', encoding='utf-8') as f:
self.config.write(f)
self.logger.info(f"配置已保存到: {self.config_path}")
except Exception as e:
self.logger.error(f"保存配置失败: {e}")
def reload_config(self):
"""
重新加载配置
"""
self._device_configs.clear()
self._load_config()
self.logger.info("配置已重新加载")
def get_all_sections(self) -> Dict[str, Dict[str, str]]:
"""
获取所有配置段
Returns:
Dict[str, Dict[str, str]]: 所有配置
"""
result = {}
for section_name in self.config.sections():
result[section_name] = dict(self.config[section_name])
return result
def validate_config(self) -> Dict[str, list]:
"""
验证配置有效性
Returns:
Dict[str, list]: 验证结果,包含错误和警告
"""
errors = []
warnings = []
# 验证必需的配置段
required_sections = ['DEVICES', 'CAMERA', 'FEMTOBOLT', 'SYSTEM']
for section in required_sections:
if not self.config.has_section(section):
errors.append(f"缺少必需的配置段: {section}")
# 验证设备配置
try:
imu_config = self.get_device_config('imu')
if not imu_config.get('port'):
warnings.append("IMU串口未配置")
except Exception as e:
errors.append(f"IMU配置验证失败: {e}")
return {
'errors': errors,
'warnings': warnings,
'valid': len(errors) == 0
}
# HTTP接口设备参数设置方法
def set_imu_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
设置IMU设备配置
Args:
config_data: IMU配置数据
{
'device_type': 'real' | 'mock',
'port': 'COM6',
'baudrate': 9600
}
Returns:
Dict[str, Any]: 设置结果
"""
try:
# 验证必需参数
if 'device_type' in config_data:
self.set_config_value('DEVICES', 'imu_device_type', config_data['device_type'])
if 'port' in config_data:
self.set_config_value('DEVICES', 'imu_port', config_data['port'])
if 'baudrate' in config_data:
self.set_config_value('DEVICES', 'imu_baudrate', str(config_data['baudrate']))
# 保存配置
self.save_config()
self.logger.info(f"IMU配置已更新: {config_data}")
return {
'success': True,
'message': 'IMU配置更新成功',
'config': self.get_device_config('imu')
}
except Exception as e:
self.logger.error(f"设置IMU配置失败: {e}")
return {
'success': False,
'message': f'设置IMU配置失败: {str(e)}'
}
def set_pressure_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
设置压力板设备配置
Args:
config_data: 压力板配置数据
{
'device_type': 'real' | 'mock',
'use_mock': False,
'port': 'COM5',
'baudrate': 115200
}
Returns:
Dict[str, Any]: 设置结果
"""
try:
# 验证必需参数
if 'device_type' in config_data:
self.set_config_value('DEVICES', 'pressure_device_type', config_data['device_type'])
if 'use_mock' in config_data:
self.set_config_value('DEVICES', 'pressure_use_mock', str(config_data['use_mock']))
if 'port' in config_data:
self.set_config_value('DEVICES', 'pressure_port', config_data['port'])
if 'baudrate' in config_data:
self.set_config_value('DEVICES', 'pressure_baudrate', str(config_data['baudrate']))
# 保存配置
self.save_config()
self.logger.info(f"压力板配置已更新: {config_data}")
return {
'success': True,
'message': '压力板配置更新成功',
'config': self.get_device_config('pressure')
}
except Exception as e:
self.logger.error(f"设置压力板配置失败: {e}")
return {
'success': False,
'message': f'设置压力板配置失败: {str(e)}'
}
def set_camera_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
设置相机设备配置
Args:
config_data: 相机配置数据
{
'device_index': 1,
'width': 1280,
'height': 720,
'fps': 30
}
Returns:
Dict[str, Any]: 设置结果
"""
try:
# 验证必需参数
if 'device_index' in config_data:
self.set_config_value('CAMERA', 'device_index', str(config_data['device_index']))
if 'width' in config_data:
self.set_config_value('CAMERA', 'width', str(config_data['width']))
if 'height' in config_data:
self.set_config_value('CAMERA', 'height', str(config_data['height']))
if 'fps' in config_data:
self.set_config_value('CAMERA', 'fps', str(config_data['fps']))
# 保存配置
self.save_config()
self.logger.info(f"相机配置已更新: {config_data}")
return {
'success': True,
'message': '相机配置更新成功',
'config': self.get_device_config('camera')
}
except Exception as e:
self.logger.error(f"设置相机配置失败: {e}")
return {
'success': False,
'message': f'设置相机配置失败: {str(e)}'
}
def set_femtobolt_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
设置FemtoBolt设备配置
Args:
config_data: FemtoBolt配置数据
{
'color_resolution': '1080P',
'depth_mode': 'NFOV_UNBINNED',
'fps': 30,
'depth_range_min': 1200,
'depth_range_max': 1500
}
Returns:
Dict[str, Any]: 设置结果
"""
try:
# 验证必需参数
if 'color_resolution' in config_data:
self.set_config_value('FEMTOBOLT', 'color_resolution', config_data['color_resolution'])
if 'depth_mode' in config_data:
self.set_config_value('FEMTOBOLT', 'depth_mode', config_data['depth_mode'])
if 'fps' in config_data:
self.set_config_value('FEMTOBOLT', 'fps', str(config_data['fps']))
if 'depth_range_min' in config_data:
self.set_config_value('FEMTOBOLT', 'depth_range_min', str(config_data['depth_range_min']))
if 'depth_range_max' in config_data:
self.set_config_value('FEMTOBOLT', 'depth_range_max', str(config_data['depth_range_max']))
# 保存配置
self.save_config()
self.logger.info(f"FemtoBolt配置已更新: {config_data}")
return {
'success': True,
'message': 'FemtoBolt配置更新成功',
'config': self.get_device_config('femtobolt')
}
except Exception as e:
self.logger.error(f"设置FemtoBolt配置失败: {e}")
return {
'success': False,
'message': f'设置FemtoBolt配置失败: {str(e)}'
}
def get_all_device_configs(self) -> Dict[str, Any]:
"""
获取所有设备配置
Returns:
Dict[str, Any]: 所有设备配置
"""
return {
'imu': self.get_device_config('imu'),
'pressure': self.get_device_config('pressure'),
'camera': self.get_device_config('camera'),
'femtobolt': self.get_device_config('femtobolt')
}