import hashlib import secrets from typing import Optional from fastapi import Header, HTTPException, status from app.core.config import settings from app.repositories.json_config_repo import JsonConfigRepository def hash_password(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() def verify_password(password: str, expected_hash: str) -> bool: return secrets.compare_digest(hash_password(password), expected_hash) def get_access_password_hash() -> str: config_repo = JsonConfigRepository() device_config = config_repo.read_device_config() password_hash = device_config.get("password", "") if isinstance(device_config, dict) else "" if isinstance(password_hash, str) and password_hash.strip(): return password_hash return hash_password(settings.auth_password) def verify_access_password(password: str) -> bool: return verify_password(password, get_access_password_hash()) def verify_api_token(x_api_token: Optional[str] = Header(default=None)) -> None: if x_api_token != settings.auth_password: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的访问令牌", )