156 lines
6.5 KiB
Python
156 lines
6.5 KiB
Python
|
|
"""角色/权限(RBAC)管理路由。
|
|||
|
|
|
|||
|
|
说明:
|
|||
|
|
- 该模块提供角色与权限的增删改查,以及“给角色配置权限点”的接口。
|
|||
|
|
- 当前实现采用最小化策略:仅允许系统管理员(role_id=admin)访问。
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|||
|
|
from sqlalchemy.exc import IntegrityError
|
|||
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|||
|
|
|
|||
|
|
from backend.api.schemas import (
|
|||
|
|
PermissionCreateRequest,
|
|||
|
|
PermissionResponse,
|
|||
|
|
RoleCreateRequest,
|
|||
|
|
RolePermissionsResponse,
|
|||
|
|
RolePermissionsUpdateRequest,
|
|||
|
|
RoleResponse,
|
|||
|
|
RoleUpdateRequest,
|
|||
|
|
)
|
|||
|
|
from backend.auth.deps import get_current_user
|
|||
|
|
from backend.services.rbac_service import RbacService
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter:
|
|||
|
|
"""构造 RBAC 路由。"""
|
|||
|
|
router = APIRouter(prefix="/api", tags=["rbac"])
|
|||
|
|
rbac = RbacService(session_factory)
|
|||
|
|
|
|||
|
|
def _require_admin(user: dict) -> None:
|
|||
|
|
"""管理员校验(当前仅按 role_id 判断)。"""
|
|||
|
|
if user.get("role_id") != "admin":
|
|||
|
|
raise HTTPException(status_code=403, detail="forbidden")
|
|||
|
|
|
|||
|
|
@router.get("/roles", response_model=list[RoleResponse])
|
|||
|
|
async def list_roles(current_user: dict = Depends(get_current_user(session_factory))) -> list[RoleResponse]:
|
|||
|
|
"""查询角色列表。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
rows = await rbac.list_roles()
|
|||
|
|
return [RoleResponse(**r) for r in rows]
|
|||
|
|
|
|||
|
|
@router.get("/roles/{role_id}", response_model=RoleResponse)
|
|||
|
|
async def get_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse:
|
|||
|
|
"""查询角色详情。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
role = await rbac.get_role(role_id)
|
|||
|
|
if not role:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
return RoleResponse(**role)
|
|||
|
|
|
|||
|
|
@router.post("/roles", response_model=RoleResponse)
|
|||
|
|
async def create_role(body: RoleCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse:
|
|||
|
|
"""创建角色。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
try:
|
|||
|
|
role = await rbac.create_role(
|
|||
|
|
role_id=body.role_id,
|
|||
|
|
role_name=body.role_name,
|
|||
|
|
role_desc=body.role_desc,
|
|||
|
|
is_active=body.is_active,
|
|||
|
|
extra=body.extra,
|
|||
|
|
)
|
|||
|
|
except IntegrityError:
|
|||
|
|
raise HTTPException(status_code=409, detail="conflict")
|
|||
|
|
return RoleResponse(**role)
|
|||
|
|
|
|||
|
|
@router.patch("/roles/{role_id}", response_model=RoleResponse)
|
|||
|
|
async def update_role(
|
|||
|
|
role_id: str, body: RoleUpdateRequest, current_user: dict = Depends(get_current_user(session_factory))
|
|||
|
|
) -> RoleResponse:
|
|||
|
|
"""更新角色。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
try:
|
|||
|
|
role = await rbac.update_role(
|
|||
|
|
role_id,
|
|||
|
|
role_name=body.role_name,
|
|||
|
|
role_desc=body.role_desc,
|
|||
|
|
is_active=body.is_active,
|
|||
|
|
extra=body.extra,
|
|||
|
|
)
|
|||
|
|
except IntegrityError:
|
|||
|
|
raise HTTPException(status_code=409, detail="conflict")
|
|||
|
|
if not role:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
return RoleResponse(**role)
|
|||
|
|
|
|||
|
|
@router.delete("/roles/{role_id}")
|
|||
|
|
async def delete_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict:
|
|||
|
|
"""禁用角色(软删除)。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
ok = await rbac.disable_role(role_id)
|
|||
|
|
if not ok:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
return {"ok": True}
|
|||
|
|
|
|||
|
|
@router.get("/permissions", response_model=list[PermissionResponse])
|
|||
|
|
async def list_permissions(current_user: dict = Depends(get_current_user(session_factory))) -> list[PermissionResponse]:
|
|||
|
|
"""查询权限点列表。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
rows = await rbac.list_permissions()
|
|||
|
|
return [PermissionResponse(**r) for r in rows]
|
|||
|
|
|
|||
|
|
@router.post("/permissions", response_model=PermissionResponse)
|
|||
|
|
async def create_permission(
|
|||
|
|
body: PermissionCreateRequest, current_user: dict = Depends(get_current_user(session_factory))
|
|||
|
|
) -> PermissionResponse:
|
|||
|
|
"""创建权限点(perm_code 支持自定义命名)。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
try:
|
|||
|
|
perm = await rbac.create_permission(
|
|||
|
|
perm_code=body.perm_code, perm_name=body.perm_name, perm_group=body.perm_group, perm_desc=body.perm_desc
|
|||
|
|
)
|
|||
|
|
except IntegrityError:
|
|||
|
|
raise HTTPException(status_code=409, detail="conflict")
|
|||
|
|
return PermissionResponse(**perm)
|
|||
|
|
|
|||
|
|
@router.delete("/permissions/{perm_code}")
|
|||
|
|
async def delete_permission(perm_code: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict:
|
|||
|
|
"""删除权限点。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
ok = await rbac.delete_permission(perm_code)
|
|||
|
|
if not ok:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
return {"ok": True}
|
|||
|
|
|
|||
|
|
@router.get("/roles/{role_id}/permissions", response_model=RolePermissionsResponse)
|
|||
|
|
async def get_role_permissions(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RolePermissionsResponse:
|
|||
|
|
"""查询角色拥有的权限点集合。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
role = await rbac.get_role(role_id)
|
|||
|
|
if not role:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
codes = await rbac.get_role_permissions(role_id)
|
|||
|
|
return RolePermissionsResponse(role_id=role_id, perm_codes=codes)
|
|||
|
|
|
|||
|
|
@router.put("/roles/{role_id}/permissions", response_model=RolePermissionsResponse)
|
|||
|
|
async def set_role_permissions(
|
|||
|
|
role_id: str,
|
|||
|
|
body: RolePermissionsUpdateRequest,
|
|||
|
|
current_user: dict = Depends(get_current_user(session_factory)),
|
|||
|
|
) -> RolePermissionsResponse:
|
|||
|
|
"""覆盖设置角色权限点集合。"""
|
|||
|
|
_require_admin(current_user)
|
|||
|
|
role = await rbac.get_role(role_id)
|
|||
|
|
if not role:
|
|||
|
|
raise HTTPException(status_code=404, detail="not found")
|
|||
|
|
try:
|
|||
|
|
codes = await rbac.set_role_permissions(role_id=role_id, perm_codes=body.perm_codes)
|
|||
|
|
except ValueError as e:
|
|||
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|||
|
|
return RolePermissionsResponse(role_id=role_id, perm_codes=codes)
|
|||
|
|
|
|||
|
|
return router
|