#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 配置管理工具 提供设备配置的统一管理和读取 """ import os import configparser import json import logging from typing import Dict, Any, Optional, Union from pathlib import Path class ConfigManager: """配置管理器""" def __init__(self, config_path: Optional[str] = None): """ 初始化配置管理器 Args: config_path: 配置文件路径,默认为backend/config.ini """ self.logger = logging.getLogger(f"{__name__}.ConfigManager") # 确定配置文件路径 if config_path: self.config_path = config_path else: # 自动查找配置文件 self.config_path = self._find_config_file() self.config = configparser.ConfigParser() self._device_configs = {} self._load_config() def _find_config_file(self) -> str: """ 查找配置文件 Returns: str: 配置文件路径 """ # 可能的配置文件路径 possible_paths = [ os.path.join(os.path.dirname(__file__), 'config.ini') ] for path in possible_paths: abs_path = os.path.abspath(path) if os.path.exists(abs_path): self.logger.info(f"找到配置文件: {abs_path}") return abs_path def _load_config(self): """ 加载配置文件 """ try: if os.path.exists(self.config_path): self.config.read(self.config_path, encoding='utf-8') self.logger.info(f"成功加载配置文件: {self.config_path}") else: self.logger.warning(f"配置文件不存在: {self.config_path}") self._create_default_config() except Exception as e: self.logger.error(f"加载配置文件失败: {e}") self._create_default_config() def _create_default_config(self): """ 创建默认配置 """ self.config.clear() # 默认设备配置 self.config['DEVICES'] = { 'imu_port': 'COM7', 'imu_baudrate': '9600', 'pressure_port': 'COM8', 'pressure_baudrate': '115200' } # 默认相机配置 self.config['CAMERA'] = { 'device_index': '0', 'width': '1280', 'height': '720', 'fps': '30' } # 默认FemtoBolt配置 self.config['FEMTOBOLT'] = { 'color_resolution': '1080P', 'depth_mode': 'NFOV_UNBINNED', 'fps': '15', 'depth_range_min': '500', 'depth_range_max': '4500' } # 默认系统配置 self.config['SYSTEM'] = { 'log_level': 'INFO', 'max_cache_size': '10', 'cache_timeout': '5.0' } self.logger.info("创建默认配置") def get_device_config(self, device_name: str) -> Dict[str, Any]: """ 获取设备配置 Args: device_name: 设备名称 (camera, femtobolt, imu, pressure) Returns: Dict[str, Any]: 设备配置字典 """ if device_name in self._device_configs: return self._device_configs[device_name].copy() config = {} if device_name == 'camera': config = self._get_camera_config() elif device_name == 'femtobolt': config = self._get_femtobolt_config() elif device_name == 'imu': config = self._get_imu_config() elif device_name == 'pressure': config = self._get_pressure_config() else: self.logger.warning(f"未知设备类型: {device_name}") # 缓存配置 self._device_configs[device_name] = config return config.copy() def _get_camera_config(self) -> Dict[str, Any]: """ 获取相机配置 Returns: Dict[str, Any]: 相机配置 """ return { 'device_index': self.config.getint('CAMERA', 'device_index', fallback=0), 'width': self.config.getint('CAMERA', 'width', fallback=1280), 'height': self.config.getint('CAMERA', 'height', fallback=720), 'fps': self.config.getint('CAMERA', 'fps', fallback=30), 'buffer_size': self.config.getint('CAMERA', 'buffer_size', fallback=1), 'fourcc': self.config.get('CAMERA', 'fourcc', fallback='MJPG') } def _get_femtobolt_config(self) -> Dict[str, Any]: """ 获取FemtoBolt配置 Returns: Dict[str, Any]: FemtoBolt配置 """ return { 'color_resolution': self.config.get('FEMTOBOLT', 'color_resolution', fallback='1080P'), 'depth_mode': self.config.get('FEMTOBOLT', 'depth_mode', fallback='NFOV_UNBINNED'), 'fps': self.config.getint('FEMTOBOLT', 'fps', fallback=15), 'depth_range_min': self.config.getint('FEMTOBOLT', 'depth_range_min', fallback=500), 'depth_range_max': self.config.getint('FEMTOBOLT', 'depth_range_max', fallback=4500), 'synchronized_images_only': self.config.getboolean('FEMTOBOLT', 'synchronized_images_only', fallback=False) } def _get_imu_config(self) -> Dict[str, Any]: """ 获取IMU配置 Returns: Dict[str, Any]: IMU配置 """ return { 'device_type': self.config.get('DEVICES', 'imu_device_type', fallback='mock'), 'port': self.config.get('DEVICES', 'imu_port', fallback='COM7'), 'baudrate': self.config.getint('DEVICES', 'imu_baudrate', fallback=9600), 'timeout': self.config.getfloat('DEVICES', 'imu_timeout', fallback=1.0), 'calibration_samples': self.config.getint('DEVICES', 'imu_calibration_samples', fallback=100), } def _get_pressure_config(self) -> Dict[str, Any]: """ 获取压力传感器配置 Returns: Dict[str, Any]: 压力传感器配置 """ return { 'device_type': self.config.get('DEVICES', 'pressure_device_type', fallback='mock'), 'port': self.config.get('DEVICES', 'pressure_port', fallback='COM8'), 'baudrate': self.config.getint('DEVICES', 'pressure_baudrate', fallback=115200), 'timeout': self.config.getfloat('DEVICES', 'pressure_timeout', fallback=1.0), 'calibration_samples': self.config.getint('DEVICES', 'pressure_calibration_samples', fallback=50) } def get_system_config(self) -> Dict[str, Any]: """ 获取系统配置 Returns: Dict[str, Any]: 系统配置 """ return { 'log_level': self.config.get('SYSTEM', 'log_level', fallback='INFO'), 'max_cache_size': self.config.getint('SYSTEM', 'max_cache_size', fallback=10), 'cache_timeout': self.config.getfloat('SYSTEM', 'cache_timeout', fallback=5.0), 'heartbeat_interval': self.config.getfloat('SYSTEM', 'heartbeat_interval', fallback=30.0) } def get_config_value(self, section: str, key: str, fallback: Any = None) -> Any: """ 获取配置值 Args: section: 配置段 key: 配置键 fallback: 默认值 Returns: Any: 配置值 """ try: if isinstance(fallback, bool): return self.config.getboolean(section, key, fallback=fallback) elif isinstance(fallback, int): return self.config.getint(section, key, fallback=fallback) elif isinstance(fallback, float): return self.config.getfloat(section, key, fallback=fallback) else: return self.config.get(section, key, fallback=fallback) except Exception as e: self.logger.warning(f"获取配置值失败 [{section}][{key}]: {e}") return fallback def set_config_value(self, section: str, key: str, value: Any): """ 设置配置值 Args: section: 配置段 key: 配置键 value: 配置值 """ if not self.config.has_section(section): self.config.add_section(section) self.config.set(section, key, str(value)) # 清除缓存 self._device_configs.clear() def save_config(self): """ 保存配置到文件 """ try: # 确保目录存在 os.makedirs(os.path.dirname(self.config_path), exist_ok=True) with open(self.config_path, 'w', encoding='utf-8') as f: self.config.write(f) self.logger.info(f"配置已保存到: {self.config_path}") except Exception as e: self.logger.error(f"保存配置失败: {e}") def reload_config(self): """ 重新加载配置 """ self._device_configs.clear() self._load_config() self.logger.info("配置已重新加载") def get_all_sections(self) -> Dict[str, Dict[str, str]]: """ 获取所有配置段 Returns: Dict[str, Dict[str, str]]: 所有配置 """ result = {} for section_name in self.config.sections(): result[section_name] = dict(self.config[section_name]) return result def validate_config(self) -> Dict[str, list]: """ 验证配置有效性 Returns: Dict[str, list]: 验证结果,包含错误和警告 """ errors = [] warnings = [] # 验证必需的配置段 required_sections = ['DEVICES', 'CAMERA', 'FEMTOBOLT', 'SYSTEM'] for section in required_sections: if not self.config.has_section(section): errors.append(f"缺少必需的配置段: {section}") # 验证设备配置 try: imu_config = self.get_device_config('imu') if not imu_config.get('port'): warnings.append("IMU串口未配置") except Exception as e: errors.append(f"IMU配置验证失败: {e}") return { 'errors': errors, 'warnings': warnings, 'valid': len(errors) == 0 } # HTTP接口设备参数设置方法 def set_imu_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """ 设置IMU设备配置 Args: config_data: IMU配置数据 { 'device_type': 'real' | 'mock', 'port': 'COM6', 'baudrate': 9600 } Returns: Dict[str, Any]: 设置结果 """ try: # 验证必需参数 if 'device_type' in config_data: self.set_config_value('DEVICES', 'imu_device_type', config_data['device_type']) if 'port' in config_data: self.set_config_value('DEVICES', 'imu_port', config_data['port']) if 'baudrate' in config_data: self.set_config_value('DEVICES', 'imu_baudrate', str(config_data['baudrate'])) # 保存配置 self.save_config() self.logger.info(f"IMU配置已更新: {config_data}") return { 'success': True, 'message': 'IMU配置更新成功', 'config': self.get_device_config('imu') } except Exception as e: self.logger.error(f"设置IMU配置失败: {e}") return { 'success': False, 'message': f'设置IMU配置失败: {str(e)}' } def set_pressure_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """ 设置压力板设备配置 Args: config_data: 压力板配置数据 { 'device_type': 'real' | 'mock', 'use_mock': False, 'port': 'COM5', 'baudrate': 115200 } Returns: Dict[str, Any]: 设置结果 """ try: # 验证必需参数 if 'device_type' in config_data: self.set_config_value('DEVICES', 'pressure_device_type', config_data['device_type']) if 'use_mock' in config_data: self.set_config_value('DEVICES', 'pressure_use_mock', str(config_data['use_mock'])) if 'port' in config_data: self.set_config_value('DEVICES', 'pressure_port', config_data['port']) if 'baudrate' in config_data: self.set_config_value('DEVICES', 'pressure_baudrate', str(config_data['baudrate'])) # 保存配置 self.save_config() self.logger.info(f"压力板配置已更新: {config_data}") return { 'success': True, 'message': '压力板配置更新成功', 'config': self.get_device_config('pressure') } except Exception as e: self.logger.error(f"设置压力板配置失败: {e}") return { 'success': False, 'message': f'设置压力板配置失败: {str(e)}' } def set_camera_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """ 设置相机设备配置 Args: config_data: 相机配置数据 { 'device_index': 1, 'width': 1280, 'height': 720, 'fps': 30 } Returns: Dict[str, Any]: 设置结果 """ try: # 验证必需参数 if 'device_index' in config_data: self.set_config_value('CAMERA', 'device_index', str(config_data['device_index'])) if 'width' in config_data: self.set_config_value('CAMERA', 'width', str(config_data['width'])) if 'height' in config_data: self.set_config_value('CAMERA', 'height', str(config_data['height'])) if 'fps' in config_data: self.set_config_value('CAMERA', 'fps', str(config_data['fps'])) # 保存配置 self.save_config() self.logger.info(f"相机配置已更新: {config_data}") return { 'success': True, 'message': '相机配置更新成功', 'config': self.get_device_config('camera') } except Exception as e: self.logger.error(f"设置相机配置失败: {e}") return { 'success': False, 'message': f'设置相机配置失败: {str(e)}' } def set_femtobolt_config(self, config_data: Dict[str, Any]) -> Dict[str, Any]: """ 设置FemtoBolt设备配置 Args: config_data: FemtoBolt配置数据 { 'color_resolution': '1080P', 'depth_mode': 'NFOV_UNBINNED', 'fps': 30, 'depth_range_min': 1200, 'depth_range_max': 1500 } Returns: Dict[str, Any]: 设置结果 """ try: # 验证必需参数 if 'color_resolution' in config_data: self.set_config_value('FEMTOBOLT', 'color_resolution', config_data['color_resolution']) if 'depth_mode' in config_data: self.set_config_value('FEMTOBOLT', 'depth_mode', config_data['depth_mode']) if 'fps' in config_data: self.set_config_value('FEMTOBOLT', 'fps', str(config_data['fps'])) if 'depth_range_min' in config_data: self.set_config_value('FEMTOBOLT', 'depth_range_min', str(config_data['depth_range_min'])) if 'depth_range_max' in config_data: self.set_config_value('FEMTOBOLT', 'depth_range_max', str(config_data['depth_range_max'])) # 保存配置 self.save_config() self.logger.info(f"FemtoBolt配置已更新: {config_data}") return { 'success': True, 'message': 'FemtoBolt配置更新成功', 'config': self.get_device_config('femtobolt') } except Exception as e: self.logger.error(f"设置FemtoBolt配置失败: {e}") return { 'success': False, 'message': f'设置FemtoBolt配置失败: {str(e)}' } def get_all_device_configs(self) -> Dict[str, Any]: """ 获取所有设备配置 Returns: Dict[str, Any]: 所有设备配置 """ return { 'imu': self.get_device_config('imu'), 'pressure': self.get_device_config('pressure'), 'camera': self.get_device_config('camera'), 'femtobolt': self.get_device_config('femtobolt') }