SmartEDT/backend/api/rbac_routes.py

156 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""角色/权限RBAC管理路由。
说明:
- 该模块提供角色与权限的增删改查,以及“给角色配置权限点”的接口。
- 当前实现采用最小化策略仅允许系统管理员role_id=admin访问。
"""
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.api.schemas import (
PermissionCreateRequest,
PermissionResponse,
RoleCreateRequest,
RolePermissionsResponse,
RolePermissionsUpdateRequest,
RoleResponse,
RoleUpdateRequest,
)
from backend.auth.deps import get_current_user
from backend.services.rbac_service import RbacService
def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter:
"""构造 RBAC 路由。"""
router = APIRouter(prefix="/api", tags=["rbac"])
rbac = RbacService(session_factory)
def _require_admin(user: dict) -> None:
"""管理员校验(当前仅按 role_id 判断)。"""
if user.get("role_id") != "admin":
raise HTTPException(status_code=403, detail="forbidden")
@router.get("/roles", response_model=list[RoleResponse])
async def list_roles(current_user: dict = Depends(get_current_user(session_factory))) -> list[RoleResponse]:
"""查询角色列表。"""
_require_admin(current_user)
rows = await rbac.list_roles()
return [RoleResponse(**r) for r in rows]
@router.get("/roles/{role_id}", response_model=RoleResponse)
async def get_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse:
"""查询角色详情。"""
_require_admin(current_user)
role = await rbac.get_role(role_id)
if not role:
raise HTTPException(status_code=404, detail="not found")
return RoleResponse(**role)
@router.post("/roles", response_model=RoleResponse)
async def create_role(body: RoleCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse:
"""创建角色。"""
_require_admin(current_user)
try:
role = await rbac.create_role(
role_id=body.role_id,
role_name=body.role_name,
role_desc=body.role_desc,
is_active=body.is_active,
extra=body.extra,
)
except IntegrityError:
raise HTTPException(status_code=409, detail="conflict")
return RoleResponse(**role)
@router.patch("/roles/{role_id}", response_model=RoleResponse)
async def update_role(
role_id: str, body: RoleUpdateRequest, current_user: dict = Depends(get_current_user(session_factory))
) -> RoleResponse:
"""更新角色。"""
_require_admin(current_user)
try:
role = await rbac.update_role(
role_id,
role_name=body.role_name,
role_desc=body.role_desc,
is_active=body.is_active,
extra=body.extra,
)
except IntegrityError:
raise HTTPException(status_code=409, detail="conflict")
if not role:
raise HTTPException(status_code=404, detail="not found")
return RoleResponse(**role)
@router.delete("/roles/{role_id}")
async def delete_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict:
"""禁用角色(软删除)。"""
_require_admin(current_user)
ok = await rbac.disable_role(role_id)
if not ok:
raise HTTPException(status_code=404, detail="not found")
return {"ok": True}
@router.get("/permissions", response_model=list[PermissionResponse])
async def list_permissions(current_user: dict = Depends(get_current_user(session_factory))) -> list[PermissionResponse]:
"""查询权限点列表。"""
_require_admin(current_user)
rows = await rbac.list_permissions()
return [PermissionResponse(**r) for r in rows]
@router.post("/permissions", response_model=PermissionResponse)
async def create_permission(
body: PermissionCreateRequest, current_user: dict = Depends(get_current_user(session_factory))
) -> PermissionResponse:
"""创建权限点perm_code 支持自定义命名)。"""
_require_admin(current_user)
try:
perm = await rbac.create_permission(
perm_code=body.perm_code, perm_name=body.perm_name, perm_group=body.perm_group, perm_desc=body.perm_desc
)
except IntegrityError:
raise HTTPException(status_code=409, detail="conflict")
return PermissionResponse(**perm)
@router.delete("/permissions/{perm_code}")
async def delete_permission(perm_code: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict:
"""删除权限点。"""
_require_admin(current_user)
ok = await rbac.delete_permission(perm_code)
if not ok:
raise HTTPException(status_code=404, detail="not found")
return {"ok": True}
@router.get("/roles/{role_id}/permissions", response_model=RolePermissionsResponse)
async def get_role_permissions(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RolePermissionsResponse:
"""查询角色拥有的权限点集合。"""
_require_admin(current_user)
role = await rbac.get_role(role_id)
if not role:
raise HTTPException(status_code=404, detail="not found")
codes = await rbac.get_role_permissions(role_id)
return RolePermissionsResponse(role_id=role_id, perm_codes=codes)
@router.put("/roles/{role_id}/permissions", response_model=RolePermissionsResponse)
async def set_role_permissions(
role_id: str,
body: RolePermissionsUpdateRequest,
current_user: dict = Depends(get_current_user(session_factory)),
) -> RolePermissionsResponse:
"""覆盖设置角色权限点集合。"""
_require_admin(current_user)
role = await rbac.get_role(role_id)
if not role:
raise HTTPException(status_code=404, detail="not found")
try:
codes = await rbac.set_role_permissions(role_id=role_id, perm_codes=body.perm_codes)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return RolePermissionsResponse(role_id=role_id, perm_codes=codes)
return router