SmartEDT/backend/auth/deps.py

57 lines
2.2 KiB
Python
Raw Permalink Normal View History

"""FastAPI 鉴权相关依赖Depends
当前提供 get_current_user(session_factory) Authorization: Bearer <token> 解析并加载用户信息
"""
from __future__ import annotations
from typing import Callable
from fastapi import HTTPException, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.auth.tokens import verify_access_token
from backend.database.schema import sys_role, sys_user
def get_current_user(session_factory: async_sessionmaker[AsyncSession]) -> Callable[..., dict]:
"""返回一个依赖函数:用于解析当前用户并从 DB 校验用户/角色启用状态。"""
async def _dep(request: Request) -> dict:
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(status_code=401, detail="missing token")
token = auth[len("Bearer ") :].strip()
try:
payload = verify_access_token(token)
except ValueError:
raise HTTPException(status_code=401, detail="invalid token")
async with session_factory() as session:
q = (
select(
sys_user.c.user_id,
sys_user.c.username,
sys_user.c.display_name,
sys_user.c.role_id,
sys_user.c.is_active,
sys_user.c.last_login_at,
sys_user.c.created_at,
sys_user.c.updated_at,
sys_user.c.extra,
sys_role.c.role_name,
sys_role.c.is_active.label("role_is_active"),
)
.select_from(sys_user.join(sys_role, sys_user.c.role_id == sys_role.c.role_id))
.where(sys_user.c.user_id == payload.user_id)
.limit(1)
)
row = (await session.execute(q)).mappings().first()
if not row:
raise HTTPException(status_code=401, detail="user not found")
if not row["is_active"] or not row["role_is_active"]:
raise HTTPException(status_code=403, detail="inactive user")
return dict(row)
return _dep