SmartEDT/backend/auth/deps.py

57 lines
2.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""FastAPI 鉴权相关依赖Depends
当前提供 get_current_user(session_factory),从 Authorization: Bearer <token> 解析并加载用户信息。
"""
from __future__ import annotations
from typing import Callable
from fastapi import HTTPException, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.auth.tokens import verify_access_token
from backend.database.schema import sys_role, sys_user
def get_current_user(session_factory: async_sessionmaker[AsyncSession]) -> Callable[..., dict]:
"""返回一个依赖函数:用于解析当前用户并从 DB 校验用户/角色启用状态。"""
async def _dep(request: Request) -> dict:
auth = request.headers.get("Authorization", "")
if not auth.startswith("Bearer "):
raise HTTPException(status_code=401, detail="missing token")
token = auth[len("Bearer ") :].strip()
try:
payload = verify_access_token(token)
except ValueError:
raise HTTPException(status_code=401, detail="invalid token")
async with session_factory() as session:
q = (
select(
sys_user.c.user_id,
sys_user.c.username,
sys_user.c.display_name,
sys_user.c.role_id,
sys_user.c.is_active,
sys_user.c.last_login_at,
sys_user.c.created_at,
sys_user.c.updated_at,
sys_user.c.extra,
sys_role.c.role_name,
sys_role.c.is_active.label("role_is_active"),
)
.select_from(sys_user.join(sys_role, sys_user.c.role_id == sys_role.c.role_id))
.where(sys_user.c.user_id == payload.user_id)
.limit(1)
)
row = (await session.execute(q)).mappings().first()
if not row:
raise HTTPException(status_code=401, detail="user not found")
if not row["is_active"] or not row["role_is_active"]:
raise HTTPException(status_code=403, detail="inactive user")
return dict(row)
return _dep