SmartEDT/backend/services/auth_service.py

46 lines
1.7 KiB
Python
Raw Permalink Normal View History

"""认证服务。
该模块实现用户名 + 密码的登录校验并签发 access token
"""
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from backend.auth.passwords import verify_password
from backend.auth.tokens import issue_access_token
from backend.services.user_service import UserService
class AuthService:
"""认证相关业务逻辑(不直接绑定 HTTP 框架)。"""
def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None:
self._session_factory = session_factory
self._users = UserService(session_factory)
async def login(self, *, username: str, password: str, expires_in_seconds: int = 3600) -> tuple[str, dict]:
"""登录并返回 (token, 用户行数据)。
Raises:
ValueError: 用户名或密码错误
PermissionError: 用户已被禁用
"""
user_row = await self._users.get_user_by_username(username)
if not user_row:
raise ValueError("用户名或密码错误!")
if not user_row.get("is_active", True):
raise PermissionError("用户已被禁用!")
stored = user_row.get("password_hash") or ""
if not verify_password(password, stored):
raise ValueError("用户名或密码错误!")
await self._users.touch_last_login(str(user_row["user_id"]))
token = issue_access_token(
user_id=str(user_row["user_id"]),
username=str(user_row["username"]),
role_id=str(user_row["role_id"]),
expires_in_seconds=expires_in_seconds,
)
return token, user_row