"""系统用户管理路由。 提供用户的增删改查与密码重置,并支持为用户分配角色。 当前版本仅允许系统管理员(role_id=admin)访问。 """ from __future__ import annotations from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from backend.api.schemas import UserCreateRequest, UserPasswordUpdateRequest, UserResponse, UserUpdateRequest from backend.auth.deps import get_current_user from backend.services.rbac_service import RbacService from backend.services.user_service import UserService def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter: """构造用户管理路由。""" router = APIRouter(prefix="/api", tags=["users"]) users = UserService(session_factory) rbac = RbacService(session_factory) def _require_admin(user: dict) -> None: """管理员校验(当前仅按 role_id 判断)。""" if user.get("role_id") != "admin": raise HTTPException(status_code=403, detail="forbidden") @router.get("/users", response_model=list[UserResponse]) async def list_users(current_user: dict = Depends(get_current_user(session_factory))) -> list[UserResponse]: """查询用户列表。""" _require_admin(current_user) rows = await users.list_users() return [UserResponse(**r) for r in rows] @router.get("/users/{user_id}", response_model=UserResponse) async def get_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse: """查询用户详情。""" _require_admin(current_user) row = await users.get_user(user_id) if not row: raise HTTPException(status_code=404, detail="not found") return UserResponse(**row) @router.post("/users", response_model=UserResponse) async def create_user(body: UserCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse: """创建用户并写入密码哈希。""" _require_admin(current_user) role = await rbac.get_role(body.role_id) if not role: raise HTTPException(status_code=400, detail="invalid role_id") try: user = await users.create_user( user_id=body.user_id, username=body.username, display_name=body.display_name, password=body.password, role_id=body.role_id, is_active=body.is_active, extra=body.extra, ) except IntegrityError: raise HTTPException(status_code=409, detail="conflict") return UserResponse(**user) @router.patch("/users/{user_id}", response_model=UserResponse) async def update_user( user_id: str, body: UserUpdateRequest, current_user: dict = Depends(get_current_user(session_factory)) ) -> UserResponse: """更新用户信息(可更新角色、启用状态与扩展字段)。""" _require_admin(current_user) if body.role_id is not None: role = await rbac.get_role(body.role_id) if not role: raise HTTPException(status_code=400, detail="invalid role_id") try: updated = await users.update_user( user_id, display_name=body.display_name, role_id=body.role_id, is_active=body.is_active, extra=body.extra, ) except IntegrityError: raise HTTPException(status_code=409, detail="conflict") if not updated: raise HTTPException(status_code=404, detail="not found") return UserResponse(**updated) @router.delete("/users/{user_id}") async def delete_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict: """禁用用户(软删除)。""" _require_admin(current_user) ok = await users.disable_user(user_id) if not ok: raise HTTPException(status_code=404, detail="not found") return {"ok": True} @router.put("/users/{user_id}/password") async def set_password( user_id: str, body: UserPasswordUpdateRequest, current_user: dict = Depends(get_current_user(session_factory)) ) -> dict: """管理员重置指定用户密码。""" _require_admin(current_user) ok = await users.set_password(user_id, body.new_password) if not ok: raise HTTPException(status_code=404, detail="not found") return {"ok": True} return router