75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
"""后端通用工具函数。
|
||
|
||
该模块放置与业务无关的通用能力:
|
||
- UTC 时间获取
|
||
- 日志初始化(控制台 + 可选文件滚动)
|
||
- 受限路径拼接(防目录穿越)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from datetime import datetime, timezone
|
||
from logging.handlers import TimedRotatingFileHandler
|
||
from pathlib import Path
|
||
|
||
|
||
def utc_now() -> datetime:
|
||
"""返回当前 UTC 时间(timezone-aware)。"""
|
||
return datetime.now(timezone.utc)
|
||
|
||
|
||
def configure_logging(level: str, log_file: Path | None = None) -> None:
|
||
"""配置全局日志。
|
||
|
||
Args:
|
||
level: 日志级别字符串(例如 "INFO" / "DEBUG")。
|
||
log_file: 可选的日志文件路径;提供时启用按天滚动。
|
||
"""
|
||
level_value = getattr(logging, level.upper(), logging.INFO)
|
||
logging_handlers: list[logging.Handler] = [logging.StreamHandler()]
|
||
if log_file is not None:
|
||
log_file.parent.mkdir(parents=True, exist_ok=True)
|
||
rotating_handler = TimedRotatingFileHandler(
|
||
log_file,
|
||
when="midnight",
|
||
interval=1,
|
||
backupCount=90,
|
||
encoding="utf-8",
|
||
delay=True,
|
||
)
|
||
rotating_handler.suffix = "%Y-%m-%d"
|
||
logging_handlers.append(rotating_handler)
|
||
logging.basicConfig(
|
||
level=level_value,
|
||
format="%(asctime)s %(levelname)s %(name)s %(message)s",
|
||
handlers=logging_handlers,
|
||
force=True,
|
||
)
|
||
for logger_name in ("uvicorn", "uvicorn.error", "uvicorn.access", "uvicorn.asgi"):
|
||
logger = logging.getLogger(logger_name)
|
||
logger.handlers = []
|
||
logger.propagate = True
|
||
|
||
|
||
def project_root() -> Path:
|
||
"""返回项目根目录(backend 的上一级)。"""
|
||
return Path(__file__).resolve().parents[1]
|
||
|
||
|
||
def safe_join(root: Path, untrusted_path: str) -> Path:
|
||
"""将不可信路径拼接到 root 下,并阻止目录穿越/绝对路径/UNC 路径。
|
||
|
||
主要用于下载/文件访问等接口,避免访问到文件根目录之外。
|
||
"""
|
||
if untrusted_path.startswith(("\\\\", "//")):
|
||
raise ValueError("UNC path is not allowed")
|
||
if os.path.isabs(untrusted_path):
|
||
raise ValueError("Absolute path is not allowed")
|
||
candidate = (root / untrusted_path).resolve()
|
||
root_resolved = root.resolve()
|
||
if root_resolved not in candidate.parents and candidate != root_resolved:
|
||
raise ValueError("Path traversal detected")
|
||
return candidate
|