from __future__ import annotations import json from pathlib import Path from typing import Any, Dict, List, Optional, Union from app.core.config import settings from app.utils.backup import backup_file class JsonConfigRepository: def __init__(self, config_dir: Optional[Path] = None) -> None: self.config_dir = config_dir or settings.config_dir self.config_dir.mkdir(parents=True, exist_ok=True) def _write_json(self, filename: str, payload: Union[Dict[str, Any], List[Any]]) -> Path: path = self.config_dir / filename backup_file(path, settings.backup_dir) temp_path = path.with_suffix(".tmp") with temp_path.open("w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) temp_path.replace(path) return path def write_device_config(self, payload: Dict[str, Any]) -> Path: return self._write_json("device.json", payload) def write_channel_config(self, payload: Dict[str, Any]) -> Path: return self._write_json("channel.json", payload) def write_setting_config(self, payload: Dict[str, Any]) -> Path: return self._write_json("setting.json", payload) def read_device_config(self) -> Dict[str, Any]: data = self.read_json("device.json") return data if isinstance(data, dict) else {} def read_channel_config(self) -> Dict[str, Any]: data = self.read_json("channel.json") return data if isinstance(data, dict) else {} def read_setting_config(self) -> Dict[str, Any]: data = self.read_json("setting.json") return data if isinstance(data, dict) else {} def read_setting_section(self, section: str) -> Any: return self.read_setting_config().get(section) def read_json(self, filename: str) -> Union[Dict[str, Any], List[Any]]: path = self.config_dir / filename if not path.exists(): return {} with path.open("r", encoding="utf-8") as file: return json.load(file)