SmartEDT/backend/api/auth_routes.py

82 lines
3.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""认证相关路由。
提供:
- 登录获取 Bearer Token
- 系统首次初始化bootstrap创建首个管理员
- 获取当前登录用户信息(/me
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.api.schemas import BootstrapRequest, BootstrapResponse, LoginRequest, LoginResponse, MeResponse, TokenResponse, UserResponse
from backend.auth.deps import get_current_user
from backend.database.schema import sys_user
from backend.services.auth_service import AuthService
from backend.services.user_service import UserService
def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter:
"""构造认证路由。"""
router = APIRouter(prefix="/api/auth", tags=["auth"])
auth = AuthService(session_factory)
users = UserService(session_factory)
@router.post("/login", response_model=LoginResponse)
async def login(body: LoginRequest) -> LoginResponse:
"""用户名密码登录,返回 access_token。"""
try:
token, user_row = await auth.login(username=body.username, password=body.password, expires_in_seconds=3600)
except PermissionError:
raise HTTPException(status_code=403, detail="inactive user")
except ValueError:
raise HTTPException(status_code=401, detail="invalid credentials")
user = await users.get_user(str(user_row["user_id"]))
if not user:
raise HTTPException(status_code=401, detail="invalid credentials")
return LoginResponse(token=TokenResponse(access_token=token, expires_in=3600), user=UserResponse(**user))
@router.post("/bootstrap", response_model=BootstrapResponse)
async def bootstrap(body: BootstrapRequest) -> BootstrapResponse:
"""初始化系统:当系统还没有任何用户时,创建首个管理员并返回 token。"""
async with session_factory() as session:
exists = (await session.execute(select(sys_user.c.user_id).limit(1))).first()
if exists:
raise HTTPException(status_code=409, detail="already initialized")
try:
created = await users.create_user(
user_id=None,
username=body.username,
display_name=body.display_name,
password=body.password,
role_id="admin",
is_active=True,
extra=None,
)
except Exception:
raise HTTPException(status_code=400, detail="bootstrap failed")
token, _ = await auth.login(username=body.username, password=body.password, expires_in_seconds=3600)
return BootstrapResponse(token=TokenResponse(access_token=token, expires_in=3600), user=UserResponse(**created))
@router.get("/me", response_model=MeResponse)
async def me(current_user: dict = Depends(get_current_user(session_factory))) -> MeResponse:
"""返回当前登录用户(由 Authorization: Bearer token 解析)。"""
user = UserResponse(
user_id=str(current_user["user_id"]),
username=str(current_user["username"]),
display_name=current_user.get("display_name"),
role_id=str(current_user["role_id"]),
role_name=current_user.get("role_name"),
is_active=bool(current_user.get("is_active")),
last_login_at=current_user.get("last_login_at"),
created_at=current_user.get("created_at"),
updated_at=current_user.get("updated_at"),
extra=current_user.get("extra"),
)
return MeResponse(user=user)
return router