SmartEDT/backend/api/user_routes.py

112 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""系统用户管理路由。
提供用户的增删改查与密码重置,并支持为用户分配角色。
当前版本仅允许系统管理员role_id=admin访问。
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.api.schemas import UserCreateRequest, UserPasswordUpdateRequest, UserResponse, UserUpdateRequest
from backend.auth.deps import get_current_user
from backend.services.rbac_service import RbacService
from backend.services.user_service import UserService
def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter:
"""构造用户管理路由。"""
router = APIRouter(prefix="/api", tags=["users"])
users = UserService(session_factory)
rbac = RbacService(session_factory)
def _require_admin(user: dict) -> None:
"""管理员校验(当前仅按 role_id 判断)。"""
if user.get("role_id") != "admin":
raise HTTPException(status_code=403, detail="forbidden")
@router.get("/users", response_model=list[UserResponse])
async def list_users(current_user: dict = Depends(get_current_user(session_factory))) -> list[UserResponse]:
"""查询用户列表。"""
_require_admin(current_user)
rows = await users.list_users()
return [UserResponse(**r) for r in rows]
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse:
"""查询用户详情。"""
_require_admin(current_user)
row = await users.get_user(user_id)
if not row:
raise HTTPException(status_code=404, detail="not found")
return UserResponse(**row)
@router.post("/users", response_model=UserResponse)
async def create_user(body: UserCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse:
"""创建用户并写入密码哈希。"""
_require_admin(current_user)
role = await rbac.get_role(body.role_id)
if not role:
raise HTTPException(status_code=400, detail="invalid role_id")
try:
user = await users.create_user(
user_id=body.user_id,
username=body.username,
display_name=body.display_name,
password=body.password,
role_id=body.role_id,
is_active=body.is_active,
extra=body.extra,
)
except IntegrityError:
raise HTTPException(status_code=409, detail="conflict")
return UserResponse(**user)
@router.patch("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: str, body: UserUpdateRequest, current_user: dict = Depends(get_current_user(session_factory))
) -> UserResponse:
"""更新用户信息(可更新角色、启用状态与扩展字段)。"""
_require_admin(current_user)
if body.role_id is not None:
role = await rbac.get_role(body.role_id)
if not role:
raise HTTPException(status_code=400, detail="invalid role_id")
try:
updated = await users.update_user(
user_id,
display_name=body.display_name,
role_id=body.role_id,
is_active=body.is_active,
extra=body.extra,
)
except IntegrityError:
raise HTTPException(status_code=409, detail="conflict")
if not updated:
raise HTTPException(status_code=404, detail="not found")
return UserResponse(**updated)
@router.delete("/users/{user_id}")
async def delete_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict:
"""禁用用户(软删除)。"""
_require_admin(current_user)
ok = await users.disable_user(user_id)
if not ok:
raise HTTPException(status_code=404, detail="not found")
return {"ok": True}
@router.put("/users/{user_id}/password")
async def set_password(
user_id: str, body: UserPasswordUpdateRequest, current_user: dict = Depends(get_current_user(session_factory))
) -> dict:
"""管理员重置指定用户密码。"""
_require_admin(current_user)
ok = await users.set_password(user_id, body.new_password)
if not ok:
raise HTTPException(status_code=404, detail="not found")
return {"ok": True}
return router