diff --git a/.trae/skills/py-zh-commenter/SKILL.md b/.trae/skills/py-zh-commenter/SKILL.md new file mode 100644 index 0000000..74c68fa --- /dev/null +++ b/.trae/skills/py-zh-commenter/SKILL.md @@ -0,0 +1,84 @@ +--- +name: "py-zh-commenter" +description: "为 Python 代码补充清晰的中文注释与必要 docstring。用户要求“加中文注释/解释这段代码/补齐注释/可读性提升”时调用。" +--- + +# Python 中文注释助手(py-zh-commenter) + +## 目标 + +在**不改变代码行为**的前提下,为 Python 代码补充高质量的中文注释与必要的 docstring,提升可读性与可维护性。 + +## 何时调用 + +当用户提出以下需求时调用: +- “给这段 Python 代码加中文注释/补齐注释” +- “解释代码在做什么,希望在源码里体现” +- “帮我把关键逻辑注释清楚/写 docstring” + +不适用于: +- 纯代码审查(请用 code-reviewer / pr-reviewer) +- 重构/改逻辑(除非用户明确要求) + +## 输出原则(必须遵守) + +1. **不改逻辑**:仅添加注释/docstring 与必要的类型提示(仅当非常明确且不影响行为)。 +2. **少而精**:不写“显而易见”的注释;只解释“为什么/边界/约束/不变量/副作用/复杂算法/业务含义”。 +3. **就近放置**: + - 模块:文件头 docstring(可选,描述模块职责) + - 类/函数:docstring(参数/返回/异常/副作用/线程安全/性能) + - 复杂分支/循环:行内注释说明意图与约束 +4. **风格一致**:遵循项目现有格式(注释语气、标点、缩进、是否偏好 docstring)。 +5. **安全合规**:不在注释中写入密钥、口令、内网地址等敏感信息;不把用户数据或隐私写进注释。 +6. **避免噪音**:不要对每行都加注释;避免把代码翻译成中文。 + +## 注释内容清单(优先级从高到低) + +- 业务含义:字段/表/状态机/权限点的语义 +- 不变量:必须满足的条件(例如“role_id 必须存在且启用”) +- 边界情况:空值/异常路径/并发/重试/超时 +- 副作用:数据库写入、网络请求、文件读写、全局状态变更 +- 性能:复杂度、批处理、索引依赖、潜在慢点 +- 安全:鉴权、权限校验点、数据校验原因 + +## 注释模板(可直接复用) + +### 函数 docstring(推荐) + +```python +def foo(x: int) -> int: + \"\"\"一句话说明做什么。 + + Args: + x: 参数含义(业务语义/单位/范围)。 + + Returns: + 返回值含义(单位/范围/是否可能为空)。 + + Raises: + ValueError: 触发条件说明。 + \"\"\" +``` + +### 复杂分支注释(示例) + +```python +# 这里做 X 的原因:…… +# 约束:……(例如必须在事务内/必须先校验权限) +if cond: + ... +``` + +## 执行步骤(给模型的工作流) + +1. 先快速阅读:识别模块职责、关键数据结构、外部依赖(DB/HTTP/文件/线程/协程)。 +2. 找“复杂点”:多层嵌套、隐藏副作用、异常处理、权限判断、数据转换。 +3. 优先写 docstring:为对外接口(路由函数/服务方法/工具函数)补齐输入输出语义。 +4. 补关键行注释:只在需要解释“意图/原因/约束”的地方写。 +5. 自检:确保不引入代码变更、不新增导入导致 lint/test 变化。 + +## 交付格式 + +- 如果是仓库内改动:直接在对应 `.py` 文件中增加注释/docstring。 +- 如果用户只要解释:给出“建议应写在源码中的注释内容”,并可选附带 patch。 + diff --git a/INSTALL_DB.md b/PostgreSQL及实时数据库安装指南.md similarity index 100% rename from INSTALL_DB.md rename to PostgreSQL及实时数据库安装指南.md diff --git a/DB_PERFORMANCE_ANALYSIS.md b/TimescaleDB性能测试分析报告.md similarity index 100% rename from DB_PERFORMANCE_ANALYSIS.md rename to TimescaleDB性能测试分析报告.md diff --git a/backend/__init__.py b/backend/__init__.py index 8b13789..b54784d 100644 --- a/backend/__init__.py +++ b/backend/__init__.py @@ -1 +1,2 @@ +"""SmartEDT 后端包。""" diff --git a/backend/api/__init__.py b/backend/api/__init__.py index 8b13789..f021d7c 100644 --- a/backend/api/__init__.py +++ b/backend/api/__init__.py @@ -1 +1,2 @@ +"""API 子包:HTTP 路由、WebSocket 处理与 Pydantic schema。""" diff --git a/backend/api/auth_routes.py b/backend/api/auth_routes.py new file mode 100644 index 0000000..fc67d1a --- /dev/null +++ b/backend/api/auth_routes.py @@ -0,0 +1,81 @@ +"""认证相关路由。 + +提供: +- 登录获取 Bearer Token +- 系统首次初始化(bootstrap:创建首个管理员) +- 获取当前登录用户信息(/me) +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.api.schemas import BootstrapRequest, BootstrapResponse, LoginRequest, LoginResponse, MeResponse, TokenResponse, UserResponse +from backend.auth.deps import get_current_user +from backend.database.schema import sys_user +from backend.services.auth_service import AuthService +from backend.services.user_service import UserService + + +def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter: + """构造认证路由。""" + router = APIRouter(prefix="/api/auth", tags=["auth"]) + auth = AuthService(session_factory) + users = UserService(session_factory) + + @router.post("/login", response_model=LoginResponse) + async def login(body: LoginRequest) -> LoginResponse: + """用户名密码登录,返回 access_token。""" + try: + token, user_row = await auth.login(username=body.username, password=body.password, expires_in_seconds=3600) + except PermissionError: + raise HTTPException(status_code=403, detail="inactive user") + except ValueError: + raise HTTPException(status_code=401, detail="invalid credentials") + user = await users.get_user(str(user_row["user_id"])) + if not user: + raise HTTPException(status_code=401, detail="invalid credentials") + return LoginResponse(token=TokenResponse(access_token=token, expires_in=3600), user=UserResponse(**user)) + + @router.post("/bootstrap", response_model=BootstrapResponse) + async def bootstrap(body: BootstrapRequest) -> BootstrapResponse: + """初始化系统:当系统还没有任何用户时,创建首个管理员并返回 token。""" + async with session_factory() as session: + exists = (await session.execute(select(sys_user.c.user_id).limit(1))).first() + if exists: + raise HTTPException(status_code=409, detail="already initialized") + try: + created = await users.create_user( + user_id=None, + username=body.username, + display_name=body.display_name, + password=body.password, + role_id="admin", + is_active=True, + extra=None, + ) + except Exception: + raise HTTPException(status_code=400, detail="bootstrap failed") + token, _ = await auth.login(username=body.username, password=body.password, expires_in_seconds=3600) + return BootstrapResponse(token=TokenResponse(access_token=token, expires_in=3600), user=UserResponse(**created)) + + @router.get("/me", response_model=MeResponse) + async def me(current_user: dict = Depends(get_current_user(session_factory))) -> MeResponse: + """返回当前登录用户(由 Authorization: Bearer token 解析)。""" + user = UserResponse( + user_id=str(current_user["user_id"]), + username=str(current_user["username"]), + display_name=current_user.get("display_name"), + role_id=str(current_user["role_id"]), + role_name=current_user.get("role_name"), + is_active=bool(current_user.get("is_active")), + last_login_at=current_user.get("last_login_at"), + created_at=current_user.get("created_at"), + updated_at=current_user.get("updated_at"), + extra=current_user.get("extra"), + ) + return MeResponse(user=user) + + return router diff --git a/backend/api/rbac_routes.py b/backend/api/rbac_routes.py new file mode 100644 index 0000000..ac65258 --- /dev/null +++ b/backend/api/rbac_routes.py @@ -0,0 +1,155 @@ +"""角色/权限(RBAC)管理路由。 + +说明: +- 该模块提供角色与权限的增删改查,以及“给角色配置权限点”的接口。 +- 当前实现采用最小化策略:仅允许系统管理员(role_id=admin)访问。 +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.api.schemas import ( + PermissionCreateRequest, + PermissionResponse, + RoleCreateRequest, + RolePermissionsResponse, + RolePermissionsUpdateRequest, + RoleResponse, + RoleUpdateRequest, +) +from backend.auth.deps import get_current_user +from backend.services.rbac_service import RbacService + + +def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter: + """构造 RBAC 路由。""" + router = APIRouter(prefix="/api", tags=["rbac"]) + rbac = RbacService(session_factory) + + def _require_admin(user: dict) -> None: + """管理员校验(当前仅按 role_id 判断)。""" + if user.get("role_id") != "admin": + raise HTTPException(status_code=403, detail="forbidden") + + @router.get("/roles", response_model=list[RoleResponse]) + async def list_roles(current_user: dict = Depends(get_current_user(session_factory))) -> list[RoleResponse]: + """查询角色列表。""" + _require_admin(current_user) + rows = await rbac.list_roles() + return [RoleResponse(**r) for r in rows] + + @router.get("/roles/{role_id}", response_model=RoleResponse) + async def get_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse: + """查询角色详情。""" + _require_admin(current_user) + role = await rbac.get_role(role_id) + if not role: + raise HTTPException(status_code=404, detail="not found") + return RoleResponse(**role) + + @router.post("/roles", response_model=RoleResponse) + async def create_role(body: RoleCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> RoleResponse: + """创建角色。""" + _require_admin(current_user) + try: + role = await rbac.create_role( + role_id=body.role_id, + role_name=body.role_name, + role_desc=body.role_desc, + is_active=body.is_active, + extra=body.extra, + ) + except IntegrityError: + raise HTTPException(status_code=409, detail="conflict") + return RoleResponse(**role) + + @router.patch("/roles/{role_id}", response_model=RoleResponse) + async def update_role( + role_id: str, body: RoleUpdateRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> RoleResponse: + """更新角色。""" + _require_admin(current_user) + try: + role = await rbac.update_role( + role_id, + role_name=body.role_name, + role_desc=body.role_desc, + is_active=body.is_active, + extra=body.extra, + ) + except IntegrityError: + raise HTTPException(status_code=409, detail="conflict") + if not role: + raise HTTPException(status_code=404, detail="not found") + return RoleResponse(**role) + + @router.delete("/roles/{role_id}") + async def delete_role(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict: + """禁用角色(软删除)。""" + _require_admin(current_user) + ok = await rbac.disable_role(role_id) + if not ok: + raise HTTPException(status_code=404, detail="not found") + return {"ok": True} + + @router.get("/permissions", response_model=list[PermissionResponse]) + async def list_permissions(current_user: dict = Depends(get_current_user(session_factory))) -> list[PermissionResponse]: + """查询权限点列表。""" + _require_admin(current_user) + rows = await rbac.list_permissions() + return [PermissionResponse(**r) for r in rows] + + @router.post("/permissions", response_model=PermissionResponse) + async def create_permission( + body: PermissionCreateRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> PermissionResponse: + """创建权限点(perm_code 支持自定义命名)。""" + _require_admin(current_user) + try: + perm = await rbac.create_permission( + perm_code=body.perm_code, perm_name=body.perm_name, perm_group=body.perm_group, perm_desc=body.perm_desc + ) + except IntegrityError: + raise HTTPException(status_code=409, detail="conflict") + return PermissionResponse(**perm) + + @router.delete("/permissions/{perm_code}") + async def delete_permission(perm_code: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict: + """删除权限点。""" + _require_admin(current_user) + ok = await rbac.delete_permission(perm_code) + if not ok: + raise HTTPException(status_code=404, detail="not found") + return {"ok": True} + + @router.get("/roles/{role_id}/permissions", response_model=RolePermissionsResponse) + async def get_role_permissions(role_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> RolePermissionsResponse: + """查询角色拥有的权限点集合。""" + _require_admin(current_user) + role = await rbac.get_role(role_id) + if not role: + raise HTTPException(status_code=404, detail="not found") + codes = await rbac.get_role_permissions(role_id) + return RolePermissionsResponse(role_id=role_id, perm_codes=codes) + + @router.put("/roles/{role_id}/permissions", response_model=RolePermissionsResponse) + async def set_role_permissions( + role_id: str, + body: RolePermissionsUpdateRequest, + current_user: dict = Depends(get_current_user(session_factory)), + ) -> RolePermissionsResponse: + """覆盖设置角色权限点集合。""" + _require_admin(current_user) + role = await rbac.get_role(role_id) + if not role: + raise HTTPException(status_code=404, detail="not found") + try: + codes = await rbac.set_role_permissions(role_id=role_id, perm_codes=body.perm_codes) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + return RolePermissionsResponse(role_id=role_id, perm_codes=codes) + + return router diff --git a/backend/api/routes.py b/backend/api/routes.py index 20976d8..84da575 100644 --- a/backend/api/routes.py +++ b/backend/api/routes.py @@ -1,3 +1,12 @@ +"""基础业务路由(仿真/设备/文件下载等)。 + +该文件保留项目早期的示例接口与基础能力: +- 健康检查 +- 设备连接状态(示例) +- 启停仿真 +- 文件下载(带目录穿越保护) +""" + from __future__ import annotations from pathlib import Path @@ -11,14 +20,20 @@ from backend.utils import safe_join def get_router(simulation_manager: SimulationManager, file_root: Path) -> APIRouter: + """构造基础业务路由。 + + 说明:此项目采用“router 工厂函数”风格,通过参数注入 service/配置,而不是全局依赖容器。 + """ router = APIRouter() @router.get("/health", response_model=HealthResponse) async def health() -> HealthResponse: + """健康检查(用于容器编排/负载均衡探活)。""" return HealthResponse() @router.get("/api/devices") async def devices(): + """返回设备列表(当前为示例数据,反映仿真运行时状态)。""" runtime = simulation_manager.current() return { "data": [ @@ -32,16 +47,19 @@ def get_router(simulation_manager: SimulationManager, file_root: Path) -> APIRou @router.post("/api/simulation/start", response_model=SimulationStartResponse) async def start_simulation(body: SimulationStartRequest) -> SimulationStartResponse: + """启动仿真。""" simulation_id = await simulation_manager.start(body.model_dump()) return SimulationStartResponse(simulation_id=simulation_id) @router.post("/api/simulation/{simulation_id}/stop", response_model=SimulationStopResponse) async def stop_simulation(simulation_id: str) -> SimulationStopResponse: + """停止仿真。""" await simulation_manager.stop(simulation_id) return SimulationStopResponse(simulation_id=simulation_id, status="stopped") @router.get("/files/{file_path:path}") async def files(file_path: str): + """下载文件(相对 file_root),并校验路径合法性。""" try: resolved = safe_join(file_root, file_path) except ValueError: @@ -51,4 +69,3 @@ def get_router(simulation_manager: SimulationManager, file_root: Path) -> APIRou return FileResponse(str(resolved)) return router - diff --git a/backend/api/schemas.py b/backend/api/schemas.py index 19791c4..a665e12 100644 --- a/backend/api/schemas.py +++ b/backend/api/schemas.py @@ -1,15 +1,28 @@ +"""API 层的请求/响应数据模型(Pydantic)。 + +该文件集中定义后端 HTTP 接口的入参与返回结构,便于: +- 校验请求字段(长度、范围等) +- 生成 OpenAPI 文档 +- 在路由层与前端之间形成稳定契约 +""" + from __future__ import annotations +from datetime import datetime from typing import Any from pydantic import BaseModel, Field class HealthResponse(BaseModel): + """健康检查返回。""" + status: str = "ok" class SimulationStartRequest(BaseModel): + """启动仿真的请求体。""" + scenario: str | None = None weather: str | None = None time_period: str | None = None @@ -27,3 +40,146 @@ class SimulationStopResponse(BaseModel): simulation_id: str status: str + +class TokenResponse(BaseModel): + """登录成功后返回的 token 信息。""" + + access_token: str + token_type: str = "bearer" + expires_in: int + + +class LoginRequest(BaseModel): + """用户名密码登录请求。""" + + username: str = Field(min_length=1, max_length=64) + password: str = Field(min_length=1, max_length=128) + + +class RoleCreateRequest(BaseModel): + """创建角色请求。""" + + role_id: str | None = Field(default=None, max_length=64) + role_name: str = Field(min_length=1, max_length=64) + role_desc: str | None = Field(default=None, max_length=255) + is_active: bool = True + extra: dict[str, Any] | None = None + + +class RoleUpdateRequest(BaseModel): + role_name: str | None = Field(default=None, max_length=64) + role_desc: str | None = Field(default=None, max_length=255) + is_active: bool | None = None + extra: dict[str, Any] | None = None + + +class RoleResponse(BaseModel): + role_id: str + role_name: str + role_desc: str | None = None + is_active: bool + created_at: datetime | None = None + updated_at: datetime | None = None + extra: dict[str, Any] | None = None + + +class PermissionCreateRequest(BaseModel): + """创建权限点请求(perm_code 支持自定义命名规则)。""" + + perm_code: str = Field(min_length=1, max_length=128) + perm_name: str = Field(min_length=1, max_length=128) + perm_group: str | None = Field(default=None, max_length=64) + perm_desc: str | None = Field(default=None, max_length=255) + + +class PermissionResponse(BaseModel): + perm_code: str + perm_name: str + perm_group: str | None = None + perm_desc: str | None = None + created_at: datetime | None = None + + +class RolePermissionsUpdateRequest(BaseModel): + perm_codes: list[str] = Field(default_factory=list) + + +class RolePermissionsResponse(BaseModel): + role_id: str + perm_codes: list[str] + + +class UserCreateRequest(BaseModel): + """创建用户请求(包含明文密码,服务端将保存为哈希)。""" + + user_id: str | None = Field(default=None, max_length=64) + username: str = Field(min_length=1, max_length=64) + display_name: str | None = Field(default=None, max_length=64) + password: str = Field(min_length=1, max_length=128) + role_id: str = Field(min_length=1, max_length=64) + is_active: bool = True + extra: dict[str, Any] | None = None + + +class UserUpdateRequest(BaseModel): + display_name: str | None = Field(default=None, max_length=64) + role_id: str | None = Field(default=None, max_length=64) + is_active: bool | None = None + extra: dict[str, Any] | None = None + + +class UserPasswordUpdateRequest(BaseModel): + new_password: str = Field(min_length=1, max_length=128) + + +class UserResponse(BaseModel): + user_id: str + username: str + display_name: str | None = None + role_id: str + role_name: str | None = None + is_active: bool + last_login_at: datetime | None = None + created_at: datetime | None = None + updated_at: datetime | None = None + extra: dict[str, Any] | None = None + + +class MeResponse(BaseModel): + """当前登录用户信息返回。""" + + user: UserResponse + + +class LoginResponse(BaseModel): + token: TokenResponse + user: UserResponse + + +class BootstrapRequest(BaseModel): + """系统首次初始化请求(仅允许在系统尚无任何用户时调用)。""" + + username: str = Field(min_length=1, max_length=64) + password: str = Field(min_length=1, max_length=128) + display_name: str | None = Field(default=None, max_length=64) + + +class BootstrapResponse(BaseModel): + token: TokenResponse + user: UserResponse + + +class UnityInitConfigRequest(BaseModel): + payload: dict[str, Any] + + +class UnityInitConfigResponse(BaseModel): + simulation_id: str + + +class UnityCommandRequest(BaseModel): + payload: dict[str, Any] + + +class UnityCommandResponse(BaseModel): + ok: bool = True diff --git a/backend/api/unity_routes.py b/backend/api/unity_routes.py new file mode 100644 index 0000000..bfc4d9d --- /dev/null +++ b/backend/api/unity_routes.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.api.schemas import UnityCommandRequest, UnityCommandResponse, UnityInitConfigRequest, UnityInitConfigResponse +from backend.auth.deps import get_current_user +from backend.services.simulation_manager import SimulationManager + + +def get_router(simulation_manager: SimulationManager, session_factory: async_sessionmaker[AsyncSession]) -> APIRouter: + router = APIRouter(prefix="/api/unity", tags=["unity"]) + + def _require_admin(user: dict) -> None: + if user.get("role_id") != "admin": + raise HTTPException(status_code=403, detail="forbidden") + + @router.post("/initconfig", response_model=UnityInitConfigResponse) + async def initconfig( + body: UnityInitConfigRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> UnityInitConfigResponse: + _require_admin(current_user) + simulation_id = await simulation_manager.init_config(body.payload) + return UnityInitConfigResponse(simulation_id=simulation_id) + + @router.post("/command", response_model=UnityCommandResponse) + async def command( + body: UnityCommandRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> UnityCommandResponse: + _require_admin(current_user) + await simulation_manager.send_command(body.payload) + return UnityCommandResponse() + + return router + diff --git a/backend/api/user_routes.py b/backend/api/user_routes.py new file mode 100644 index 0000000..3a31e52 --- /dev/null +++ b/backend/api/user_routes.py @@ -0,0 +1,111 @@ +"""系统用户管理路由。 + +提供用户的增删改查与密码重置,并支持为用户分配角色。 +当前版本仅允许系统管理员(role_id=admin)访问。 +""" + +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.api.schemas import UserCreateRequest, UserPasswordUpdateRequest, UserResponse, UserUpdateRequest +from backend.auth.deps import get_current_user +from backend.services.rbac_service import RbacService +from backend.services.user_service import UserService + + +def get_router(session_factory: async_sessionmaker[AsyncSession]) -> APIRouter: + """构造用户管理路由。""" + router = APIRouter(prefix="/api", tags=["users"]) + users = UserService(session_factory) + rbac = RbacService(session_factory) + + def _require_admin(user: dict) -> None: + """管理员校验(当前仅按 role_id 判断)。""" + if user.get("role_id") != "admin": + raise HTTPException(status_code=403, detail="forbidden") + + @router.get("/users", response_model=list[UserResponse]) + async def list_users(current_user: dict = Depends(get_current_user(session_factory))) -> list[UserResponse]: + """查询用户列表。""" + _require_admin(current_user) + rows = await users.list_users() + return [UserResponse(**r) for r in rows] + + @router.get("/users/{user_id}", response_model=UserResponse) + async def get_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse: + """查询用户详情。""" + _require_admin(current_user) + row = await users.get_user(user_id) + if not row: + raise HTTPException(status_code=404, detail="not found") + return UserResponse(**row) + + @router.post("/users", response_model=UserResponse) + async def create_user(body: UserCreateRequest, current_user: dict = Depends(get_current_user(session_factory))) -> UserResponse: + """创建用户并写入密码哈希。""" + _require_admin(current_user) + role = await rbac.get_role(body.role_id) + if not role: + raise HTTPException(status_code=400, detail="invalid role_id") + try: + user = await users.create_user( + user_id=body.user_id, + username=body.username, + display_name=body.display_name, + password=body.password, + role_id=body.role_id, + is_active=body.is_active, + extra=body.extra, + ) + except IntegrityError: + raise HTTPException(status_code=409, detail="conflict") + return UserResponse(**user) + + @router.patch("/users/{user_id}", response_model=UserResponse) + async def update_user( + user_id: str, body: UserUpdateRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> UserResponse: + """更新用户信息(可更新角色、启用状态与扩展字段)。""" + _require_admin(current_user) + if body.role_id is not None: + role = await rbac.get_role(body.role_id) + if not role: + raise HTTPException(status_code=400, detail="invalid role_id") + try: + updated = await users.update_user( + user_id, + display_name=body.display_name, + role_id=body.role_id, + is_active=body.is_active, + extra=body.extra, + ) + except IntegrityError: + raise HTTPException(status_code=409, detail="conflict") + if not updated: + raise HTTPException(status_code=404, detail="not found") + return UserResponse(**updated) + + @router.delete("/users/{user_id}") + async def delete_user(user_id: str, current_user: dict = Depends(get_current_user(session_factory))) -> dict: + """禁用用户(软删除)。""" + _require_admin(current_user) + ok = await users.disable_user(user_id) + if not ok: + raise HTTPException(status_code=404, detail="not found") + return {"ok": True} + + @router.put("/users/{user_id}/password") + async def set_password( + user_id: str, body: UserPasswordUpdateRequest, current_user: dict = Depends(get_current_user(session_factory)) + ) -> dict: + """管理员重置指定用户密码。""" + _require_admin(current_user) + ok = await users.set_password(user_id, body.new_password) + if not ok: + raise HTTPException(status_code=404, detail="not found") + return {"ok": True} + + return router diff --git a/backend/api/ws.py b/backend/api/ws.py index 4f81703..00e65ca 100644 --- a/backend/api/ws.py +++ b/backend/api/ws.py @@ -1,3 +1,8 @@ +"""WebSocket 路由处理器。 + +当前实现仅用于维持连接并将连接注册到 Broadcaster,便于服务端主动推送消息。 +""" + from __future__ import annotations from fastapi import WebSocket, WebSocketDisconnect @@ -6,8 +11,10 @@ from backend.services.broadcaster import Broadcaster async def websocket_handler(ws: WebSocket, broadcaster: Broadcaster) -> None: + """WebSocket 连接处理:接入、注册、保持心跳、断开清理。""" await ws.accept() await broadcaster.add(ws) + logger.info("WebSocket 连接接入:%s", ws.client) try: while True: await ws.receive_text() @@ -16,4 +23,4 @@ async def websocket_handler(ws: WebSocket, broadcaster: Broadcaster) -> None: pass finally: await broadcaster.remove(ws) - + logger.info("WebSocket 连接断开:%s", ws.client) diff --git a/backend/auth/__init__.py b/backend/auth/__init__.py new file mode 100644 index 0000000..4320405 --- /dev/null +++ b/backend/auth/__init__.py @@ -0,0 +1,3 @@ +"""认证子包:密码哈希、token 签发与 FastAPI 依赖。""" + +from __future__ import annotations diff --git a/backend/auth/deps.py b/backend/auth/deps.py new file mode 100644 index 0000000..b0fc711 --- /dev/null +++ b/backend/auth/deps.py @@ -0,0 +1,56 @@ +"""FastAPI 鉴权相关依赖(Depends)。 + +当前提供 get_current_user(session_factory),从 Authorization: Bearer 解析并加载用户信息。 +""" + +from __future__ import annotations + +from typing import Callable + +from fastapi import HTTPException, Request +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.auth.tokens import verify_access_token +from backend.database.schema import sys_role, sys_user + + +def get_current_user(session_factory: async_sessionmaker[AsyncSession]) -> Callable[..., dict]: + """返回一个依赖函数:用于解析当前用户并从 DB 校验用户/角色启用状态。""" + async def _dep(request: Request) -> dict: + auth = request.headers.get("Authorization", "") + if not auth.startswith("Bearer "): + raise HTTPException(status_code=401, detail="missing token") + token = auth[len("Bearer ") :].strip() + try: + payload = verify_access_token(token) + except ValueError: + raise HTTPException(status_code=401, detail="invalid token") + + async with session_factory() as session: + q = ( + select( + sys_user.c.user_id, + sys_user.c.username, + sys_user.c.display_name, + sys_user.c.role_id, + sys_user.c.is_active, + sys_user.c.last_login_at, + sys_user.c.created_at, + sys_user.c.updated_at, + sys_user.c.extra, + sys_role.c.role_name, + sys_role.c.is_active.label("role_is_active"), + ) + .select_from(sys_user.join(sys_role, sys_user.c.role_id == sys_role.c.role_id)) + .where(sys_user.c.user_id == payload.user_id) + .limit(1) + ) + row = (await session.execute(q)).mappings().first() + if not row: + raise HTTPException(status_code=401, detail="user not found") + if not row["is_active"] or not row["role_is_active"]: + raise HTTPException(status_code=403, detail="inactive user") + return dict(row) + + return _dep diff --git a/backend/auth/passwords.py b/backend/auth/passwords.py new file mode 100644 index 0000000..793715f --- /dev/null +++ b/backend/auth/passwords.py @@ -0,0 +1,52 @@ +"""密码哈希与校验。 + +当前实现使用 PBKDF2-HMAC-SHA256(内置 hashlib),以避免引入额外依赖。 +存储格式: + pbkdf2_sha256$$$ +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import secrets + + +_ALGO = "pbkdf2_sha256" +_ITERATIONS = 210_000 +_SALT_BYTES = 16 +_DKLEN = 32 + + +def hash_password(password: str) -> str: + """对明文密码进行哈希并返回可存储字符串。""" + if not isinstance(password, str) or not password: + raise ValueError("password required") + salt = secrets.token_bytes(_SALT_BYTES) + dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, _ITERATIONS, dklen=_DKLEN) + salt_b64 = base64.urlsafe_b64encode(salt).decode("ascii").rstrip("=") + dk_b64 = base64.urlsafe_b64encode(dk).decode("ascii").rstrip("=") + return f"{_ALGO}${_ITERATIONS}${salt_b64}${dk_b64}" + + +def verify_password(password: str, stored_hash: str) -> bool: + """校验明文密码是否匹配已存储的哈希。""" + try: + algo, iters_s, salt_b64, dk_b64 = stored_hash.split("$", 3) + if algo != _ALGO: + return False + iterations = int(iters_s) + salt = _b64url_decode(salt_b64) + expected = _b64url_decode(dk_b64) + except Exception: + return False + + dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, iterations, dklen=len(expected)) + return hmac.compare_digest(dk, expected) + + +def _b64url_decode(value: str) -> bytes: + """解码不带 padding 的 base64url 字符串。""" + padded = value + "=" * (-len(value) % 4) + return base64.urlsafe_b64decode(padded.encode("ascii")) diff --git a/backend/auth/tokens.py b/backend/auth/tokens.py new file mode 100644 index 0000000..237a0b7 --- /dev/null +++ b/backend/auth/tokens.py @@ -0,0 +1,99 @@ +"""轻量 access token 签发与校验。 + +说明: +- 当前实现不是标准 JWT(避免引入额外依赖),而是“base64url(payload) + HMAC 签名”的轻量令牌。 +- 适用于内部系统的最小化认证需求;如需与第三方兼容,可替换为 JWT。 +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import os +import time +from dataclasses import dataclass + + +@dataclass(frozen=True) +class TokenPayload: + """解析后的 token 载荷。""" + + user_id: str + username: str + role_id: str + exp: int + iat: int + + +def issue_access_token(*, user_id: str, username: str, role_id: str, expires_in_seconds: int = 3600) -> str: + """签发 access token。""" + now = int(time.time()) + payload = { + "sub": user_id, + "username": username, + "role_id": role_id, + "iat": now, + "exp": now + int(expires_in_seconds), + "v": 1, + } + payload_bytes = json.dumps(payload, separators=(",", ":"), ensure_ascii=False).encode("utf-8") + payload_b64 = _b64url_encode(payload_bytes) + sig = _sign(payload_b64.encode("ascii")) + sig_b64 = _b64url_encode(sig) + return f"v1.{payload_b64}.{sig_b64}" + + +def verify_access_token(token: str) -> TokenPayload: + """校验 access token 并返回载荷。 + + Raises: + ValueError: token 非法或已过期。 + """ + if not token or not isinstance(token, str): + raise ValueError("invalid token") + parts = token.split(".") + if len(parts) != 3 or parts[0] != "v1": + raise ValueError("invalid token") + payload_b64, sig_b64 = parts[1], parts[2] + expected_sig = _sign(payload_b64.encode("ascii")) + actual_sig = _b64url_decode(sig_b64) + if not hmac.compare_digest(expected_sig, actual_sig): + raise ValueError("invalid token") + payload_raw = _b64url_decode(payload_b64) + payload = json.loads(payload_raw.decode("utf-8")) + exp = int(payload.get("exp")) + if int(time.time()) >= exp: + raise ValueError("token expired") + return TokenPayload( + user_id=str(payload.get("sub")), + username=str(payload.get("username")), + role_id=str(payload.get("role_id")), + exp=exp, + iat=int(payload.get("iat")), + ) + + +def access_token_secret() -> bytes: + """获取 token 签名密钥(来自环境变量 SMARTEDT_AUTH_SECRET)。""" + secret = os.getenv("SMARTEDT_AUTH_SECRET", "").strip() + if not secret: + secret = "smartedt-dev-secret-change-me" + return secret.encode("utf-8") + + +def _sign(message: bytes) -> bytes: + """对 message 做 HMAC-SHA256 签名。""" + return hmac.new(access_token_secret(), message, hashlib.sha256).digest() + + +def _b64url_encode(data: bytes) -> str: + """编码为不带 padding 的 base64url 字符串。""" + return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") + + +def _b64url_decode(value: str) -> bytes: + """解码不带 padding 的 base64url 字符串。""" + padded = value + "=" * (-len(value) % 4) + return base64.urlsafe_b64decode(padded.encode("ascii")) diff --git a/backend/config/__init__.py b/backend/config/__init__.py index 8b13789..523b120 100644 --- a/backend/config/__init__.py +++ b/backend/config/__init__.py @@ -1 +1,2 @@ +"""后端配置子包。""" diff --git a/backend/config/config.ini b/backend/config/config.ini index e6635c6..6f17784 100644 --- a/backend/config/config.ini +++ b/backend/config/config.ini @@ -10,3 +10,6 @@ path = data url = postgresql+psycopg://smartedt:postgres@127.0.0.1:5432/smartedt timescaledb = True +[UNITY] +host = 127.0.0.1 +port = 6000 diff --git a/backend/config/settings.py b/backend/config/settings.py index 1d6f473..41ec9a9 100644 --- a/backend/config/settings.py +++ b/backend/config/settings.py @@ -1,3 +1,12 @@ +"""配置加载与设置模型。 + +优先级(高 -> 低): +1. 环境变量 SMARTEDT_CONFIG 指定的配置文件 +2. 在若干候选位置寻找 config.ini(兼容 PyInstaller 打包运行态) +3. 环境变量的 fallback +4. 内置默认值 +""" + from __future__ import annotations import configparser @@ -8,6 +17,8 @@ from pathlib import Path @dataclass(frozen=True) class ServerSettings: + """服务监听配置。""" + host: str = "0.0.0.0" port: int = 5000 debug: bool = False @@ -15,25 +26,39 @@ class ServerSettings: @dataclass(frozen=True) class FileSettings: + """文件存储相关配置。""" + root_path: Path @dataclass(frozen=True) class DatabaseSettings: + """数据库连接相关配置。""" + url: str timescaledb: bool = True +@dataclass(frozen=True) +class UnitySettings: + host: str = "127.0.0.1" + port: int = 6000 + + @dataclass(frozen=True) class AppSettings: + """应用聚合配置。""" + server: ServerSettings files: FileSettings database: DatabaseSettings + unity: UnitySettings import sys def _find_config_file() -> Path | None: + """尝试从若干候选位置定位 config.ini(包含 PyInstaller 运行态)。""" # Handle PyInstaller frozen state if getattr(sys, 'frozen', False): # If onefile, _MEIPASS. If onedir, executable dir or _internal @@ -63,6 +88,7 @@ def _find_config_file() -> Path | None: def load_settings() -> AppSettings: + """加载并返回应用配置。""" config = configparser.ConfigParser() config_path = os.getenv("SMARTEDT_CONFIG") if config_path: @@ -93,9 +119,14 @@ def load_settings() -> AppSettings: fallback=os.getenv("SMARTEDT_TIMESCALEDB", "True").lower() == "true", ) + unity = UnitySettings( + host=config.get("UNITY", "host", fallback=os.getenv("SMARTEDT_UNITY_HOST", "127.0.0.1")), + port=config.getint("UNITY", "port", fallback=int(os.getenv("SMARTEDT_UNITY_PORT", "6000"))), + ) + return AppSettings( server=server, files=FileSettings(root_path=root_path), database=DatabaseSettings(url=database_url, timescaledb=timescaledb), + unity=unity, ) - diff --git a/backend/database/__init__.py b/backend/database/__init__.py index 8b13789..a7de1ec 100644 --- a/backend/database/__init__.py +++ b/backend/database/__init__.py @@ -1 +1 @@ - +"""数据库子包:schema 定义与 DB 工具脚本。""" diff --git a/backend/database/check_db.py b/backend/database/check_db.py index 0ca5625..e666032 100644 --- a/backend/database/check_db.py +++ b/backend/database/check_db.py @@ -1,7 +1,17 @@ +"""数据库连通性检查脚本。 + +用途: +- 快速验证 PostgreSQL 是否可连接 +- 尝试加载 TimescaleDB 扩展,并输出版本信息(若可用) + +注意:脚本只做连通性验证,不会打印密码。 +""" + import os import sys def check_database(): + """检查数据库连接与 TimescaleDB 扩展可用性。""" print("正在检查数据库连接...") # 连接参数:通过环境变量覆盖(不在输出中打印密码) @@ -57,6 +67,7 @@ def check_database(): return False if __name__ == "__main__": + """作为脚本运行时的入口。""" try: import psycopg # noqa: F401 except ImportError: diff --git a/backend/database/engine.py b/backend/database/engine.py index 145ec5b..b19d435 100644 --- a/backend/database/engine.py +++ b/backend/database/engine.py @@ -1,3 +1,8 @@ +"""数据库引擎与 Session 工厂。 + +集中封装 SQLAlchemy async engine 与 async_sessionmaker 的创建逻辑,便于在主程序中统一注入。 +""" + from __future__ import annotations from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine @@ -6,6 +11,7 @@ from backend.config.settings import DatabaseSettings def create_engine(settings: DatabaseSettings) -> AsyncEngine: + """根据配置创建 AsyncEngine。""" return create_async_engine( settings.url, pool_pre_ping=True, @@ -14,5 +20,5 @@ def create_engine(settings: DatabaseSettings) -> AsyncEngine: def create_session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: + """创建异步 Session 工厂(expire_on_commit=False 便于返回已提交对象)。""" return async_sessionmaker(engine, expire_on_commit=False) - diff --git a/backend/database/init_db.py b/backend/database/init_db.py new file mode 100644 index 0000000..09bef74 --- /dev/null +++ b/backend/database/init_db.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import argparse +import os +import platform +import sys +from pathlib import Path + +from sqlalchemy.engine.url import make_url +from sqlalchemy.ext.asyncio import create_async_engine + +_PROJECT_ROOT = Path(__file__).resolve().parents[2] +if str(_PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(_PROJECT_ROOT)) + +from backend.config.settings import load_settings +from backend.database.schema import init_schema, init_timescaledb + + +def _redact_url(url: str) -> str: + try: + parsed = make_url(url) + if parsed.password: + parsed = parsed.set(password="***") + return str(parsed) + except Exception: + return url + + +async def _run(url: str, enable_timescaledb: bool) -> None: + engine = create_async_engine(url, echo=False, pool_pre_ping=True) + try: + await init_schema(engine) + if enable_timescaledb: + await init_timescaledb(engine) + finally: + await engine.dispose() + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--url", default=None) + parser.add_argument("--timescaledb", action="store_true") + parser.add_argument("--no-timescaledb", action="store_true") + args = parser.parse_args() + + settings = load_settings() + url = (args.url or os.getenv("SMARTEDT_DATABASE_URL") or settings.database.url).strip() + enable_timescaledb = settings.database.timescaledb + if args.timescaledb: + enable_timescaledb = True + if args.no_timescaledb: + enable_timescaledb = False + + print(f"Connecting to DB: {_redact_url(url)}") + print(f"Init schema: yes; init timescaledb: {'yes' if enable_timescaledb else 'no'}") + + import asyncio + if platform.system() == "Windows" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + asyncio.run(_run(url, enable_timescaledb)) + print("✅ 初始化完成") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/backend/database/schema.py b/backend/database/schema.py index b9b7ffa..6ce790a 100644 --- a/backend/database/schema.py +++ b/backend/database/schema.py @@ -1,47 +1,79 @@ +"""数据库 Schema 定义。 + +说明: +- ORM 模型:用于结构相对稳定、需要 ORM 能力的表(例如 Simulation) +- Core Table:用于时序/大数据量写入或更灵活的 SQL 操作(例如 vehicle_signals、server_metrics、RBAC 表等) +""" + from __future__ import annotations from datetime import datetime -from sqlalchemy import JSON, BigInteger, Boolean, Column, DateTime, Float, ForeignKey, Index, String, Table, text +from sqlalchemy import JSON, BigInteger, Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, String, Table, text from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column class Base(DeclarativeBase): + """SQLAlchemy Declarative Base。""" + pass -class Simulation(Base): - __tablename__ = "simulations" +class SimulationScene(Base): + """仿真场景配置(非时序数据)。""" + __tablename__ = "sim_scenes" + scene_id: Mapped[str] = mapped_column(String(64), primary_key=True, comment="场景 ID") + scene_name: Mapped[str] = mapped_column(String(255), nullable=False, unique=True, index=True, comment="场景名称") + scene_desc: Mapped[str | None] = mapped_column(String(255), nullable=True, comment="场景描述(可选)") + scene_config: Mapped[dict] = mapped_column(JSON, default=dict, comment="场景配置信息(JSON)") + is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False, index=True, comment="是否启用") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False, server_default=text("now()"), index=True, comment="创建时间(UTC)") + updated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, comment="更新时间(UTC)") - simulation_id: Mapped[str] = mapped_column(String(64), primary_key=True, comment="仿真 ID") - status: Mapped[str] = mapped_column(String(32), index=True, comment="仿真状态(running/stopped 等)") + +class SimulationTask(Base): + """仿真任务记录(非时序数据)。""" + __tablename__ = "sim_tasks" + """以下为仿真任务相关配置""" + task_id: Mapped[str] = mapped_column(String(64), primary_key=True, comment="任务 ID") + task_name: Mapped[str | None] = mapped_column(String(255), nullable=True, comment="任务名称") + scene_id: Mapped[str | None] = mapped_column( + String(64), ForeignKey("sim_scenes.scene_id"), nullable=True, index=True, comment="仿真场景 ID(场景表中选择)" + ) + scene_name: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True, comment="仿真场景名称") + scene_config: Mapped[dict] = mapped_column(JSON, default=dict, comment="仿真场景配置信息") + config_created_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True, comment="配置创建时间(UTC)") + """以下为仿真启停操作状态相关记录信息""" started_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), index=True, comment="开始时间(UTC)") ended_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True, comment="结束时间(UTC)") - scenario_name: Mapped[str | None] = mapped_column(String(255), nullable=True, index=True, comment="仿真场景名称") - scenario_config: Mapped[dict] = mapped_column(JSON, default=dict, comment="仿真场景配置(JSON)") - config_created_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True, comment="配置创建时间(UTC)") - operator: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True, comment="仿真操作员") - archived: Mapped[bool] = mapped_column(Boolean, default=False, comment="是否归档") + status: Mapped[str] = mapped_column(String(32), index=True, comment="仿真任务状态(wait/running/stopped/archived 等)") + operator: Mapped[str | None] = mapped_column(String(64), nullable=True, index=True, comment="仿真操作员") + """以下为开始时发送给Unity程序的连接及初始化配置""" + unity_host: Mapped[str | None] = mapped_column(String(64), nullable=True, comment="Unity Socket 主机") + unity_port: Mapped[int | None] = mapped_column(Integer, nullable=True, comment="Unity Socket 端口") + sync_timestamp: Mapped[int | None] = mapped_column(BigInteger, nullable=True, index=True, comment="同步基准时间戳(毫秒)") + init_config: Mapped[dict | None] = mapped_column(JSONB, nullable=True, comment="InitConfig 原始内容(主控→Unity,JSONB)") + init_sent_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, index=True, comment="InitConfig 发送时间(UTC)") vehicle_signals = Table( "sim_vehicle_signals", Base.metadata, Column("ts", DateTime(timezone=True), nullable=False, index=True, comment="信号采样时间(UTC)"), - Column("simulation_id", String(64), nullable=False, index=True, comment="仿真 ID"), - Column("device_id", String(64), nullable=False, index=True, comment="设备 ID"), + Column("simulation_id", String(64), nullable=False, index=True, comment="仿真任务 ID(sim_tasks.task_id)"), + Column("vehicle_id", String(64), nullable=False, index=True, comment="实物车辆ID,默认值为'0'"), Column("seq", BigInteger, nullable=False, comment="信号序列号(单仿真内递增)"), Column("signals", JSONB, nullable=False, comment="车辆信号载荷(JSONB)"), Index("idx_vehicle_signals_sim_ts", "simulation_id", "ts"), comment="车辆信号时序数据(TimescaleDB hypertable)", ) -unity_vehicle_frames = Table( - "sim_unity_vehicle_frames", +unity_frames = Table( + "sim_unity_frames", Base.metadata, Column("ts", DateTime(timezone=True), nullable=False, index=True, comment="帧时间(UTC)"), - Column("simulation_id", String(64), nullable=False, index=True, comment="仿真 ID"), + Column("simulation_id", String(64), nullable=False, index=True, comment="仿真任务 ID(sim_tasks.task_id)"), Column("vehicle_id", String(64), nullable=False, index=True, comment="虚拟车辆 ID"), Column("seq", BigInteger, nullable=False, comment="帧序号(单仿真单车内递增)"), Column("pos_x", Float, nullable=False, comment="位置 X(世界坐标)"), @@ -67,7 +99,7 @@ screen_recordings = Table( "sim_screen_videos", Base.metadata, Column("video_id", String(64), primary_key=True, comment="录制文件记录 ID"), - Column("simulation_id", String(64), nullable=False, index=True, comment="仿真 ID"), + Column("simulation_id", String(64), nullable=False, index=True, comment="仿真任务 ID(sim_tasks.task_id)"), Column("screen_type", String(32), nullable=False, index=True, comment="屏幕类型(big_screen/vehicle_screen 等)"), Column("source_name", String(64), nullable=True, index=True, comment="录制源名称(可选,如设备号/通道号)"), Column("status", String(32), nullable=False, index=True, comment="状态(recording/ready/failed 等)"), @@ -174,22 +206,60 @@ server_metrics = Table( async def init_schema(engine) -> None: + """初始化数据库表结构与必要的兼容性变更。 + + 该函数会: + - create_all:创建 Base.metadata 里声明的表 + - 插入默认角色(若不存在) + - 对历史表做列/索引补齐(兼容升级) + """ from sqlalchemy.ext.asyncio import AsyncEngine if not isinstance(engine, AsyncEngine): raise TypeError("engine must be AsyncEngine") async with engine.begin() as conn: + await conn.execute(text("DROP INDEX IF EXISTS idx_vehicle_signals_sim_ts")) + await conn.execute(text("DROP INDEX IF EXISTS idx_unity_frames_sim_vehicle_ts")) + await conn.execute(text("DROP INDEX IF EXISTS idx_screen_recordings_sim_screen_created")) + await conn.execute(text("DROP INDEX IF EXISTS idx_screen_recordings_sim_screen_time")) + await conn.execute(text("DROP INDEX IF EXISTS idx_sys_role_permission_role")) + await conn.execute(text("DROP INDEX IF EXISTS idx_sys_role_permission_perm")) + await conn.execute(text("DROP INDEX IF EXISTS idx_sys_logs_user_ts")) + await conn.execute(text("DROP INDEX IF EXISTS idx_sys_logs_action_ts")) + await conn.execute(text("DROP INDEX IF EXISTS idx_server_metrics_host_ts")) await conn.run_sync(Base.metadata.create_all) - await conn.execute(text("ALTER TABLE simulations ADD COLUMN IF NOT EXISTS scenario_name VARCHAR(255)")) - await conn.execute(text("ALTER TABLE simulations ADD COLUMN IF NOT EXISTS config_created_at TIMESTAMPTZ")) - await conn.execute(text("ALTER TABLE simulations ADD COLUMN IF NOT EXISTS operator VARCHAR(64)")) - await conn.execute(text("CREATE INDEX IF NOT EXISTS idx_simulations_scenario_name ON simulations (scenario_name)")) - await conn.execute(text("CREATE INDEX IF NOT EXISTS idx_simulations_config_created_at ON simulations (config_created_at)")) - await conn.execute(text("CREATE INDEX IF NOT EXISTS idx_simulations_operator ON simulations (operator)")) + await conn.execute( + text( + """ + INSERT INTO sys_role (role_id, role_name, role_desc, is_active) + VALUES + ('admin', '系统管理员', '系统管理员', TRUE), + ('auditor', '审计员', '审计员', TRUE), + ('teacher', '老师', '老师', TRUE), + ('student', '学生', '学生', TRUE) + ON CONFLICT (role_id) DO NOTHING + """ + ) + ) + await conn.execute( + text( + """ + INSERT INTO sim_scenes (scene_id, scene_name, scene_desc, scene_config, is_active) + VALUES + ('scene_01', '城市道路', '默认场景 1', '{}'::json, TRUE), + ('scene_02', '高速公路', '默认场景 2', '{}'::json, TRUE), + ('scene_03', '学校道路', '默认场景 3', '{}'::json, TRUE), + ('scene_04', '场地训练', '默认场景 4', '{}'::json, TRUE), + ('scene_05', '综合测试', '默认场景 5', '{}'::json, TRUE) + ON CONFLICT (scene_id) DO NOTHING + """ + ) + ) async def init_timescaledb(engine) -> None: + """初始化 TimescaleDB 扩展与 hypertable/索引(若启用)。""" async with engine.begin() as conn: await conn.execute(text("CREATE EXTENSION IF NOT EXISTS timescaledb")) await conn.execute( @@ -214,11 +284,11 @@ async def init_timescaledb(engine) -> None: ) await conn.execute( text( - "SELECT create_hypertable('sim_unity_vehicle_frames', 'ts', if_not_exists => TRUE)" + "SELECT create_hypertable('sim_unity_frames', 'ts', if_not_exists => TRUE)" ) ) await conn.execute( text( - "CREATE INDEX IF NOT EXISTS idx_unity_frames_sim_vehicle_ts_desc ON sim_unity_vehicle_frames (simulation_id, vehicle_id, ts DESC)" + "CREATE INDEX IF NOT EXISTS idx_unity_frames_sim_vehicle_ts_desc ON sim_unity_frames (simulation_id, vehicle_id, ts DESC)" ) ) diff --git a/backend/database/test_db.py b/backend/database/test_db.py index c8674ae..37a1bab 100644 --- a/backend/database/test_db.py +++ b/backend/database/test_db.py @@ -1,3 +1,13 @@ +"""数据库性能与功能测试脚本(开发/压测用途)。 + +该脚本会: +- 初始化 schema 与 TimescaleDB(若可用) +- 批量写入模拟车辆信号(JSONB) +- 运行几类常见查询并输出耗时 + +注意:该脚本会写入大量数据,请不要在生产库中执行。 +""" + import asyncio import os import time @@ -7,11 +17,12 @@ from datetime import datetime, timezone from sqlalchemy import insert, select, text from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.engine.url import make_url -from backend.database.schema import vehicle_signals, Simulation, init_schema, init_timescaledb +from backend.database.schema import SimulationTask, unity_frames, vehicle_signals, init_schema, init_timescaledb from backend.config.settings import load_settings # 模拟数据生成 def generate_payload(): + """生成一条模拟车辆信号负载(用于写入 JSONB)。""" return { "steering_wheel_angle_deg": round(random.uniform(-450, 450), 1), "brake_pedal_travel_mm": round(random.uniform(0, 100), 1), @@ -38,6 +49,7 @@ def generate_payload(): } def _redact_url(url: str) -> str: + """隐藏数据库 URL 中的密码,避免误打印敏感信息。""" try: parsed = make_url(url) if parsed.password: @@ -47,6 +59,7 @@ def _redact_url(url: str) -> str: return url async def run_test(): + """执行写入/查询性能测试。""" settings = load_settings() db_url = os.getenv("SMARTEDT_TEST_DATABASE_URL", settings.database.url).strip() diff --git a/backend/device/__init__.py b/backend/device/__init__.py index 8b13789..979c335 100644 --- a/backend/device/__init__.py +++ b/backend/device/__init__.py @@ -1 +1 @@ - +"""设备子包:设备抽象与具体实现。""" diff --git a/backend/device/base.py b/backend/device/base.py index 244b1e0..7152d3d 100644 --- a/backend/device/base.py +++ b/backend/device/base.py @@ -1,3 +1,8 @@ +"""设备抽象层。 + +用于统一不同设备(真实硬件、仿真设备、Mock)的连接与状态查询接口。 +""" + from __future__ import annotations from dataclasses import dataclass @@ -5,21 +10,27 @@ from dataclasses import dataclass @dataclass(frozen=True) class DeviceInfo: + """设备信息快照(可用于 API 输出)。""" + device_id: str device_type: str connected: bool class DeviceAdapter: + """设备适配器接口(异步)。""" + device_id: str device_type: str async def connect(self) -> None: + """建立与设备的连接。""" raise NotImplementedError async def disconnect(self) -> None: + """断开与设备的连接。""" raise NotImplementedError async def is_connected(self) -> bool: + """返回当前连接状态。""" raise NotImplementedError - diff --git a/backend/device/mock_vehicle.py b/backend/device/mock_vehicle.py index f5242b3..94f7289 100644 --- a/backend/device/mock_vehicle.py +++ b/backend/device/mock_vehicle.py @@ -1,3 +1,8 @@ +"""Mock 车辆设备实现。 + +用于在没有真实硬件接入时,生成可用于联调的车辆信号数据。 +""" + from __future__ import annotations import random @@ -8,6 +13,8 @@ from backend.device.base import DeviceAdapter @dataclass(frozen=True) class VehicleSignalPayload: + """一帧车辆信号载荷(用于广播/落库)。""" + steering_wheel_angle_deg: float brake_pedal_travel_mm: float throttle_pedal_travel_mm: float @@ -22,6 +29,7 @@ class VehicleSignalPayload: temperature_c: float def to_dict(self) -> dict: + """转为可 JSON 序列化的 dict。""" return { "steering_wheel_angle_deg": self.steering_wheel_angle_deg, "brake_pedal_travel_mm": self.brake_pedal_travel_mm, @@ -39,21 +47,27 @@ class VehicleSignalPayload: class MockVehicleDevice(DeviceAdapter): + """模拟车辆设备:提供 connect/disconnect/is_connected 与 sample()。""" + def __init__(self, device_id: str = "controlbox_01") -> None: self.device_id = device_id self.device_type = "mock_vehicle" self._connected = False async def connect(self) -> None: + """模拟建立连接。""" self._connected = True async def disconnect(self) -> None: + """模拟断开连接。""" self._connected = False async def is_connected(self) -> bool: + """返回当前连接状态。""" return self._connected def sample(self) -> VehicleSignalPayload: + """采样生成一帧模拟信号。""" steering = random.uniform(-180.0, 180.0) brake = max(0.0, random.gauss(2.0, 1.0)) throttle = max(0.0, random.gauss(15.0, 5.0)) @@ -86,4 +100,3 @@ class MockVehicleDevice(DeviceAdapter): current_a=current, temperature_c=temp, ) - diff --git a/backend/main.py b/backend/main.py index c49f64d..cf12aa9 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,3 +1,12 @@ +"""SmartEDT 后端服务入口。 + +主要职责: +- 加载配置与初始化日志 +- 初始化数据库 schema/TimescaleDB +- 构造核心服务(仿真、监控、鉴权/RBAC) +- 挂载 HTTP/WebSocket 路由并启动 uvicorn +""" + from __future__ import annotations import argparse @@ -35,19 +44,22 @@ from backend.database.schema import init_schema, init_timescaledb from backend.services.broadcaster import Broadcaster from backend.services.simulation_manager import SimulationManager from backend.services.server_monitor import ServerMonitorService +from backend.services.unity_socket_client import UnitySocketClient from backend.device.mock_vehicle import MockVehicleDevice -from backend.api import routes, ws +from backend.api import auth_routes, rbac_routes, routes, unity_routes, user_routes, ws from backend.utils import configure_logging logger = logging.getLogger("backend") def _default_backend_log_file() -> Path | None: + """在打包运行态下返回默认日志文件路径;开发态返回 None。""" if getattr(sys, "frozen", False): exe_dir = Path(sys.executable).resolve().parent return exe_dir / "logs" / "backend.log" return None def _force_windows_selector_event_loop_for_uvicorn() -> None: + """避免 uvicorn 在 Windows 上切换到 ProactorEventLoop(与 psycopg async 不兼容)。""" if platform.system() != "Windows": return try: @@ -62,6 +74,8 @@ def _force_windows_selector_event_loop_for_uvicorn() -> None: # 全局单例容器(简单实现) class Container: + """简易全局容器:集中创建与持有配置、DB 引擎、session 工厂与各服务单例。""" + def __init__(self): load_dotenv() self.settings = load_settings() @@ -76,11 +90,13 @@ class Container: self.engine = create_engine(self.settings.database) self.session_factory = create_session_factory(self.engine) self.broadcaster = Broadcaster() + self.unity_client = UnitySocketClient(self.settings.unity.host, self.settings.unity.port) # 实例化服务 self.simulation_manager = SimulationManager( self.session_factory, - self.broadcaster + self.broadcaster, + unity_client=self.unity_client, ) # 实例化监控服务 @@ -93,6 +109,7 @@ container = Container() @asynccontextmanager async def lifespan(app: FastAPI): + """FastAPI 生命周期:启动初始化与停机清理。""" # 启动前初始化 await init_schema(container.engine) if container.settings.database.timescaledb: @@ -118,6 +135,10 @@ async def lifespan(app: FastAPI): app = FastAPI(title="SmartEDT Backend", version="0.1.0", lifespan=lifespan) app.include_router(routes.get_router(simulation_manager=container.simulation_manager, file_root=container.file_root)) +app.include_router(auth_routes.get_router(session_factory=container.session_factory)) +app.include_router(rbac_routes.get_router(session_factory=container.session_factory)) +app.include_router(user_routes.get_router(session_factory=container.session_factory)) +app.include_router(unity_routes.get_router(simulation_manager=container.simulation_manager, session_factory=container.session_factory)) @app.websocket("/ws") async def ws_endpoint(websocket: WebSocket): @@ -125,6 +146,7 @@ async def ws_endpoint(websocket: WebSocket): def main() -> None: + """命令行入口:解析参数并启动 uvicorn。""" parser = argparse.ArgumentParser() parser.add_argument("--host", default=None) parser.add_argument("--port", type=int, default=None) diff --git a/backend/services/__init__.py b/backend/services/__init__.py index 8b13789..69b9e80 100644 --- a/backend/services/__init__.py +++ b/backend/services/__init__.py @@ -1 +1 @@ - +"""后端业务服务子包。""" diff --git a/backend/services/auth_service.py b/backend/services/auth_service.py new file mode 100644 index 0000000..8dec263 --- /dev/null +++ b/backend/services/auth_service.py @@ -0,0 +1,45 @@ +"""认证服务。 + +该模块实现“用户名 + 密码”的登录校验,并签发 access token。 +""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.auth.passwords import verify_password +from backend.auth.tokens import issue_access_token +from backend.services.user_service import UserService + + +class AuthService: + """认证相关业务逻辑(不直接绑定 HTTP 框架)。""" + + def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: + self._session_factory = session_factory + self._users = UserService(session_factory) + + async def login(self, *, username: str, password: str, expires_in_seconds: int = 3600) -> tuple[str, dict]: + """登录并返回 (token, 用户行数据)。 + + Raises: + ValueError: 用户名或密码错误。 + PermissionError: 用户已被禁用。 + """ + user_row = await self._users.get_user_by_username(username) + if not user_row: + raise ValueError("用户名或密码错误!") + if not user_row.get("is_active", True): + raise PermissionError("用户已被禁用!") + stored = user_row.get("password_hash") or "" + if not verify_password(password, stored): + raise ValueError("用户名或密码错误!") + + await self._users.touch_last_login(str(user_row["user_id"])) + token = issue_access_token( + user_id=str(user_row["user_id"]), + username=str(user_row["username"]), + role_id=str(user_row["role_id"]), + expires_in_seconds=expires_in_seconds, + ) + return token, user_row diff --git a/backend/services/broadcaster.py b/backend/services/broadcaster.py index c1a0a7f..e2a3cbb 100644 --- a/backend/services/broadcaster.py +++ b/backend/services/broadcaster.py @@ -1,3 +1,8 @@ +"""WebSocket 广播器。 + +维护当前在线的 WebSocket 连接集合,并支持向所有连接广播 JSON 消息。 +""" + from __future__ import annotations import asyncio @@ -7,19 +12,27 @@ from starlette.websockets import WebSocket class Broadcaster: + """简单的 WebSocket 广播器(线程安全:使用 asyncio.Lock)。""" + def __init__(self) -> None: self._clients: set[WebSocket] = set() self._lock = asyncio.Lock() async def add(self, ws: WebSocket) -> None: + """注册连接。""" async with self._lock: self._clients.add(ws) async def remove(self, ws: WebSocket) -> None: + """移除连接(若不存在则忽略)。""" async with self._lock: self._clients.discard(ws) async def broadcast_json(self, message: dict[str, Any]) -> None: + """向所有连接广播 JSON。 + + 若某个连接发送失败,会被自动移除,避免集合泄漏。 + """ async with self._lock: clients = list(self._clients) for ws in clients: @@ -27,4 +40,3 @@ class Broadcaster: await ws.send_json(message) except Exception: await self.remove(ws) - diff --git a/backend/services/rbac_service.py b/backend/services/rbac_service.py new file mode 100644 index 0000000..6ebb81b --- /dev/null +++ b/backend/services/rbac_service.py @@ -0,0 +1,186 @@ +"""RBAC(角色/权限)服务。 + +该模块围绕 sys_role / sys_permission / sys_role_permission 三张表提供基本的增删改查与绑定关系维护。 +""" + +from __future__ import annotations + +import secrets +from typing import Any + +from sqlalchemy import delete, insert, select, update +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.database.schema import sys_permission, sys_role, sys_role_permission +from backend.utils import utc_now + + +class RbacService: + """角色与权限点管理服务(SQLAlchemy Core)。""" + + def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: + self._session_factory = session_factory + + async def list_roles(self) -> list[dict[str, Any]]: + """查询角色列表。""" + async with self._session_factory() as session: + q = ( + select( + sys_role.c.role_id, + sys_role.c.role_name, + sys_role.c.role_desc, + sys_role.c.is_active, + sys_role.c.created_at, + sys_role.c.updated_at, + sys_role.c.extra, + ) + .order_by(sys_role.c.role_name.asc()) + ) + return [dict(r) for r in (await session.execute(q)).mappings().all()] + + async def get_role(self, role_id: str) -> dict[str, Any] | None: + """按 role_id 查询角色。""" + async with self._session_factory() as session: + q = select(sys_role).where(sys_role.c.role_id == role_id).limit(1) + row = (await session.execute(q)).mappings().first() + return dict(row) if row else None + + async def create_role( + self, + *, + role_id: str | None, + role_name: str, + role_desc: str | None = None, + is_active: bool = True, + extra: dict | None = None, + ) -> dict[str, Any]: + """创建角色。 + + 说明:role_id 不传时会自动生成。 + """ + rid = role_id or ("role_" + secrets.token_hex(8)) + values: dict[str, Any] = { + "role_id": rid, + "role_name": role_name, + "role_desc": role_desc, + "is_active": is_active, + "updated_at": utc_now(), + "extra": extra, + } + async with self._session_factory() as session: + try: + await session.execute(insert(sys_role).values(**values)) + await session.commit() + except IntegrityError: + await session.rollback() + raise + created = await self.get_role(rid) + if not created: + raise RuntimeError("failed to create role") + return created + + async def update_role( + self, + role_id: str, + *, + role_name: str | None = None, + role_desc: str | None = None, + is_active: bool | None = None, + extra: dict | None = None, + ) -> dict[str, Any] | None: + """更新角色字段(仅更新传入的字段)。""" + patch: dict[str, Any] = {"updated_at": utc_now()} + if role_name is not None: + patch["role_name"] = role_name + if role_desc is not None: + patch["role_desc"] = role_desc + if is_active is not None: + patch["is_active"] = is_active + if extra is not None: + patch["extra"] = extra + async with self._session_factory() as session: + try: + res = await session.execute(update(sys_role).where(sys_role.c.role_id == role_id).values(**patch)) + await session.commit() + except IntegrityError: + await session.rollback() + raise + if res.rowcount == 0: + return None + return await self.get_role(role_id) + + async def disable_role(self, role_id: str) -> bool: + """禁用角色(软删除)。""" + async with self._session_factory() as session: + res = await session.execute( + update(sys_role).where(sys_role.c.role_id == role_id).values(is_active=False, updated_at=utc_now()) + ) + await session.commit() + return bool(res.rowcount) + + async def list_permissions(self) -> list[dict[str, Any]]: + """查询权限点列表。""" + async with self._session_factory() as session: + q = select(sys_permission).order_by(sys_permission.c.perm_group.asc().nulls_last(), sys_permission.c.perm_code.asc()) + return [dict(r) for r in (await session.execute(q)).mappings().all()] + + async def create_permission( + self, *, perm_code: str, perm_name: str, perm_group: str | None = None, perm_desc: str | None = None + ) -> dict[str, Any]: + """创建权限点。""" + async with self._session_factory() as session: + try: + await session.execute( + insert(sys_permission).values( + perm_code=perm_code, perm_name=perm_name, perm_group=perm_group, perm_desc=perm_desc + ) + ) + await session.commit() + except IntegrityError: + await session.rollback() + raise + q = select(sys_permission).where(sys_permission.c.perm_code == perm_code).limit(1) + row = (await session.execute(q)).mappings().first() + if not row: + raise RuntimeError("failed to create permission") + return dict(row) + + async def delete_permission(self, perm_code: str) -> bool: + """删除权限点,并清理与角色的关联。""" + async with self._session_factory() as session: + await session.execute(delete(sys_role_permission).where(sys_role_permission.c.perm_code == perm_code)) + res = await session.execute(delete(sys_permission).where(sys_permission.c.perm_code == perm_code)) + await session.commit() + return bool(res.rowcount) + + async def get_role_permissions(self, role_id: str) -> list[str]: + """查询指定角色拥有的权限点编码列表。""" + async with self._session_factory() as session: + q = select(sys_role_permission.c.perm_code).where(sys_role_permission.c.role_id == role_id) + rows = (await session.execute(q)).scalars().all() + return list(rows) + + async def set_role_permissions(self, *, role_id: str, perm_codes: list[str]) -> list[str]: + """覆盖设置角色权限点集合(先删后插)。""" + unique = list(dict.fromkeys(perm_codes)) + async with self._session_factory() as session: + if unique: + q = select(sys_permission.c.perm_code).where(sys_permission.c.perm_code.in_(unique)) + existing = set((await session.execute(q)).scalars().all()) + missing = [c for c in unique if c not in existing] + if missing: + raise ValueError(f"missing permissions: {', '.join(missing)}") + + try: + await session.execute(delete(sys_role_permission).where(sys_role_permission.c.role_id == role_id)) + if unique: + await session.execute( + insert(sys_role_permission), + [{"role_id": role_id, "perm_code": code} for code in unique], + ) + await session.commit() + except IntegrityError: + await session.rollback() + raise + return await self.get_role_permissions(role_id) diff --git a/backend/services/server_monitor.py b/backend/services/server_monitor.py index 46e175f..a23f1fb 100644 --- a/backend/services/server_monitor.py +++ b/backend/services/server_monitor.py @@ -1,3 +1,10 @@ +"""服务器监控采集服务。 + +以较高频率采样系统指标(CPU/内存),并以较低频率进行下采样后: +- 通过 WebSocket 广播给前端 +- 写入 TimescaleDB(server_metrics hypertable) +""" + import asyncio import time import logging @@ -14,6 +21,8 @@ from backend.services.broadcaster import Broadcaster logger = logging.getLogger("backend.monitor") class ServerMonitorService: + """服务器资源监控服务(采样 + 下采样 + 广播 + 落库)。""" + def __init__(self, session_factory: async_sessionmaker, broadcaster: Broadcaster): self._session_factory = session_factory self._broadcaster = broadcaster @@ -29,6 +38,7 @@ class ServerMonitorService: self._buffer_mem = [] async def start(self): + """启动监控循环(幂等)。""" if self._running: return self._running = True @@ -36,6 +46,7 @@ class ServerMonitorService: logger.info("ServerMonitorService started") async def stop(self): + """停止监控循环并等待任务结束。""" self._running = False if self._task: try: @@ -45,6 +56,7 @@ class ServerMonitorService: logger.info("ServerMonitorService stopped") async def _run_loop(self): + """采样循环:50Hz 采样,10Hz 报告。""" loop = asyncio.get_running_loop() next_time = loop.time() @@ -76,6 +88,7 @@ class ServerMonitorService: await asyncio.sleep(0) async def _process_and_report(self): + """对采样缓冲区做下采样,并完成广播与落库。""" if not self._buffer_cpu: return diff --git a/backend/services/simulation_manager.py b/backend/services/simulation_manager.py index 6508054..f67c087 100644 --- a/backend/services/simulation_manager.py +++ b/backend/services/simulation_manager.py @@ -1,3 +1,12 @@ +"""仿真管理服务。 + +负责: +- 仿真生命周期(start/stop) +- 设备接入(目前为 MockVehicleDevice) +- 信号采样与广播(WebSocket) +- 信号落库(TimescaleDB hypertable) +""" + from __future__ import annotations import asyncio @@ -9,9 +18,10 @@ from typing import Any from sqlalchemy import insert from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from backend.database.schema import Simulation, vehicle_signals +from backend.database.schema import SimulationTask, vehicle_signals from backend.device.mock_vehicle import MockVehicleDevice from backend.services.broadcaster import Broadcaster +from backend.services.unity_socket_client import UnitySocketClient from backend.utils import utc_now @@ -20,51 +30,158 @@ logger = logging.getLogger("backend.simulation") @dataclass class SimulationRuntime: + """运行中的仿真信息(内存态)。""" + simulation_id: str status: str task: asyncio.Task | None = None class SimulationManager: + """仿真生命周期管理器。""" + def __init__( self, session_factory: async_sessionmaker[AsyncSession], broadcaster: Broadcaster, + unity_client: UnitySocketClient | None = None, ) -> None: self._session_factory = session_factory self._broadcaster = broadcaster + self._unity_client = unity_client self._runtime: SimulationRuntime | None = None self._device = MockVehicleDevice() self._seq = 0 + self._command_seq = 0 def current(self) -> SimulationRuntime | None: + """返回当前运行中的仿真(若无则为 None)。""" return self._runtime async def register_device(self, device: MockVehicleDevice) -> None: + """注册仿真设备实现(用于采样)。""" self._device = device + async def init_config(self, init_config: dict[str, Any]) -> str: + session_info = init_config.get("session") or {} + driver_info = init_config.get("driver") or {} + vehicle_info = init_config.get("vehicle") or {} + scene_info = init_config.get("scene") or {} + + task_id = str(session_info.get("taskId") or "").strip() or None + simulation_id = None + if task_id and len(task_id) <= 64: + simulation_id = task_id + if not simulation_id: + simulation_id = "SIM" + utc_now().strftime("%Y%m%d%H%M%S") + secrets.token_hex(2).upper() + + now = utc_now() + task_name = (session_info.get("taskName") or None) + sync_timestamp = session_info.get("syncTimestamp") + driver_id = (driver_info.get("driverId") or None) + vehicle_id = (vehicle_info.get("vehicleId") or None) + scene_id = (scene_info.get("sceneId") or None) + scene_name = (scene_info.get("sceneName") or None) + scene_config = scene_info if isinstance(scene_info, dict) else {} + operator = driver_info.get("name") or None + + async with self._session_factory() as session: + sim = await session.get(SimulationTask, simulation_id) + if sim is None: + sim = SimulationTask( + task_id=simulation_id, + task_name=task_name, + scene_id=scene_id, + scene_name=scene_name, + scene_config=scene_config, + config_created_at=now, + started_at=now, + ended_at=None, + status="wait", + operator=operator, + unity_host=self._unity_client.host if self._unity_client is not None else None, + unity_port=self._unity_client.port if self._unity_client is not None else None, + sync_timestamp=int(sync_timestamp) if sync_timestamp is not None else None, + init_config=init_config, + init_sent_at=None, + ) + session.add(sim) + else: + sim.task_name = task_name + sim.scene_id = scene_id + sim.scene_name = scene_name + sim.scene_config = scene_config + sim.config_created_at = now + sim.operator = operator + sim.sync_timestamp = int(sync_timestamp) if sync_timestamp is not None else None + sim.init_config = init_config + if self._unity_client is not None: + sim.unity_host = self._unity_client.host + sim.unity_port = self._unity_client.port + + await session.commit() + + if self._unity_client is not None: + payload = dict(init_config) + payload.setdefault("msgType", "init") + await self._unity_client.send_json(payload) + async with self._session_factory() as session: + sim = await session.get(SimulationTask, simulation_id) + if sim is not None: + sim.init_sent_at = utc_now() + await session.commit() + + await self._broadcaster.broadcast_json( + {"type": "simulation.init_config", "ts": now.timestamp(), "simulation_id": simulation_id, "payload": init_config} + ) + return simulation_id + + async def send_command(self, command: dict[str, Any]) -> None: + if self._unity_client is None: + raise RuntimeError("unity client not configured") + payload = dict(command) + payload.setdefault("msgType", "command") + payload.setdefault("timestamp", int(utc_now().timestamp() * 1000)) + if "seqId" not in payload: + self._command_seq += 1 + payload["seqId"] = self._command_seq + await self._unity_client.send_json(payload) + await self._broadcaster.broadcast_json( + {"type": "simulation.command", "ts": utc_now().timestamp(), "payload": payload} + ) + async def start(self, scenario_config: dict[str, Any]) -> str: + """启动仿真并返回 simulation_id。 + + 说明:如果已有仿真在运行,会直接返回当前 simulation_id(幂等)。 + """ if self._runtime and self._runtime.status == "running": return self._runtime.simulation_id simulation_id = "SIM" + utc_now().strftime("%Y%m%d%H%M%S") + secrets.token_hex(2).upper() started_at = utc_now() - scenario_name = scenario_config.get("scenario") + task_name = scenario_config.get("scenario") operator = scenario_config.get("driver") or scenario_config.get("operator") config_created_at = started_at async with self._session_factory() as session: session.add( - Simulation( - simulation_id=simulation_id, - status="running", + SimulationTask( + task_id=simulation_id, + task_name=task_name, + scene_id=None, + scene_name=None, + scene_config=scenario_config, + config_created_at=config_created_at, started_at=started_at, ended_at=None, - scenario_name=scenario_name, - scenario_config=scenario_config, - config_created_at=config_created_at, + status="running", operator=operator, - archived=False, + unity_host=self._unity_client.host if self._unity_client is not None else None, + unity_port=self._unity_client.port if self._unity_client is not None else None, + sync_timestamp=None, + init_config=None, + init_sent_at=None, ) ) await session.commit() @@ -75,9 +192,12 @@ class SimulationManager: await self._broadcaster.broadcast_json( {"type": "simulation.status", "ts": started_at.timestamp(), "simulation_id": simulation_id, "payload": {"status": "running"}} ) + if self._unity_client is not None: + await self.send_command({"action": "start"}) return simulation_id async def stop(self, simulation_id: str) -> None: + """停止仿真(若 simulation_id 不匹配当前运行实例则忽略)。""" runtime = self._runtime if not runtime or runtime.simulation_id != simulation_id: return @@ -94,7 +214,7 @@ class SimulationManager: ended_at = utc_now() async with self._session_factory() as session: - sim = await session.get(Simulation, simulation_id) + sim = await session.get(SimulationTask, simulation_id) if sim: sim.status = "stopped" sim.ended_at = ended_at @@ -103,9 +223,12 @@ class SimulationManager: await self._broadcaster.broadcast_json( {"type": "simulation.status", "ts": ended_at.timestamp(), "simulation_id": simulation_id, "payload": {"status": "stopped"}} ) + if self._unity_client is not None: + await self.send_command({"action": "stop"}) self._runtime = None async def _run_loop(self, simulation_id: str) -> None: + """仿真运行循环:采样设备信号、广播并写入数据库。""" try: while True: await asyncio.sleep(0.05) @@ -131,6 +254,7 @@ class SimulationManager: logger.exception("simulation loop crashed") async def _persist_signal(self, ts, simulation_id: str, device_id: str, seq: int, signals: dict[str, Any]) -> None: + """将单条信号写入 sim_vehicle_signals(TimescaleDB)。""" async with self._session_factory() as session: await session.execute( insert(vehicle_signals).values( diff --git a/backend/services/unity_socket_client.py b/backend/services/unity_socket_client.py new file mode 100644 index 0000000..8ac39cc --- /dev/null +++ b/backend/services/unity_socket_client.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import asyncio +import json +from typing import Any + + +class UnitySocketClient: + def __init__(self, host: str, port: int) -> None: + self._host = host + self._port = port + self._lock = asyncio.Lock() + self._writer: asyncio.StreamWriter | None = None + + @property + def host(self) -> str: + return self._host + + @property + def port(self) -> int: + return self._port + + async def send_json(self, payload: dict[str, Any]) -> None: + data = (json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n").encode("utf-8") + async with self._lock: + writer = await self._ensure_connected() + try: + writer.write(data) + await writer.drain() + except Exception: + await self._close_writer() + raise + + async def _ensure_connected(self) -> asyncio.StreamWriter: + if self._writer is not None and not self._writer.is_closing(): + return self._writer + reader, writer = await asyncio.open_connection(self._host, self._port) + self._writer = writer + return writer + + async def _close_writer(self) -> None: + if self._writer is None: + return + try: + self._writer.close() + await self._writer.wait_closed() + finally: + self._writer = None diff --git a/backend/services/user_service.py b/backend/services/user_service.py new file mode 100644 index 0000000..0c1c30a --- /dev/null +++ b/backend/services/user_service.py @@ -0,0 +1,169 @@ +"""系统用户服务。 + +围绕 sys_user 表提供用户的增删改查、密码设置(写入哈希)、登录时间维护等能力。 +""" + +from __future__ import annotations + +import secrets +from typing import Any + +from sqlalchemy import select, update +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from backend.auth.passwords import hash_password +from backend.database.schema import sys_role, sys_user +from backend.utils import utc_now + + +class UserService: + """系统用户管理服务(SQLAlchemy Core)。""" + + def __init__(self, session_factory: async_sessionmaker[AsyncSession]) -> None: + self._session_factory = session_factory + + async def list_users(self) -> list[dict[str, Any]]: + """查询用户列表(包含角色名称)。""" + async with self._session_factory() as session: + q = ( + select( + sys_user.c.user_id, + sys_user.c.username, + sys_user.c.display_name, + sys_user.c.role_id, + sys_role.c.role_name, + sys_user.c.is_active, + sys_user.c.last_login_at, + sys_user.c.created_at, + sys_user.c.updated_at, + sys_user.c.extra, + ) + .select_from(sys_user.join(sys_role, sys_user.c.role_id == sys_role.c.role_id)) + .order_by(sys_user.c.created_at.desc()) + ) + return [dict(r) for r in (await session.execute(q)).mappings().all()] + + async def get_user(self, user_id: str) -> dict[str, Any] | None: + """按 user_id 查询用户(包含角色名称)。""" + async with self._session_factory() as session: + q = ( + select( + sys_user.c.user_id, + sys_user.c.username, + sys_user.c.display_name, + sys_user.c.role_id, + sys_role.c.role_name, + sys_user.c.is_active, + sys_user.c.last_login_at, + sys_user.c.created_at, + sys_user.c.updated_at, + sys_user.c.extra, + ) + .select_from(sys_user.join(sys_role, sys_user.c.role_id == sys_role.c.role_id)) + .where(sys_user.c.user_id == user_id) + .limit(1) + ) + row = (await session.execute(q)).mappings().first() + return dict(row) if row else None + + async def get_user_by_username(self, username: str) -> dict[str, Any] | None: + """按 username 查询用户(用于登录)。""" + async with self._session_factory() as session: + q = select(sys_user).where(sys_user.c.username == username).limit(1) + row = (await session.execute(q)).mappings().first() + return dict(row) if row else None + + async def create_user( + self, + *, + user_id: str | None, + username: str, + password: str, + role_id: str, + display_name: str | None = None, + is_active: bool = True, + extra: dict | None = None, + ) -> dict[str, Any]: + """创建用户并写入密码哈希。""" + uid = user_id or ("user_" + secrets.token_hex(8)) + password_hash = hash_password(password) + async with self._session_factory() as session: + try: + await session.execute( + sys_user.insert().values( + user_id=uid, + username=username, + display_name=display_name, + password_hash=password_hash, + role_id=role_id, + is_active=is_active, + updated_at=utc_now(), + extra=extra, + ) + ) + await session.commit() + except IntegrityError: + await session.rollback() + raise + created = await self.get_user(uid) + if not created: + raise RuntimeError("failed to create user") + return created + + async def update_user( + self, + user_id: str, + *, + display_name: str | None = None, + role_id: str | None = None, + is_active: bool | None = None, + extra: dict | None = None, + ) -> dict[str, Any] | None: + """更新用户字段(仅更新传入的字段)。""" + patch: dict[str, Any] = {"updated_at": utc_now()} + if display_name is not None: + patch["display_name"] = display_name + if role_id is not None: + patch["role_id"] = role_id + if is_active is not None: + patch["is_active"] = is_active + if extra is not None: + patch["extra"] = extra + async with self._session_factory() as session: + try: + res = await session.execute(update(sys_user).where(sys_user.c.user_id == user_id).values(**patch)) + await session.commit() + except IntegrityError: + await session.rollback() + raise + if res.rowcount == 0: + return None + return await self.get_user(user_id) + + async def disable_user(self, user_id: str) -> bool: + """禁用用户(软删除)。""" + async with self._session_factory() as session: + res = await session.execute( + update(sys_user).where(sys_user.c.user_id == user_id).values(is_active=False, updated_at=utc_now()) + ) + await session.commit() + return bool(res.rowcount) + + async def set_password(self, user_id: str, new_password: str) -> bool: + """设置用户密码(保存为哈希,不存明文)。""" + password_hash = hash_password(new_password) + async with self._session_factory() as session: + res = await session.execute( + update(sys_user).where(sys_user.c.user_id == user_id).values(password_hash=password_hash, updated_at=utc_now()) + ) + await session.commit() + return bool(res.rowcount) + + async def touch_last_login(self, user_id: str) -> None: + """更新用户最近登录时间。""" + async with self._session_factory() as session: + await session.execute( + update(sys_user).where(sys_user.c.user_id == user_id).values(last_login_at=utc_now(), updated_at=utc_now()) + ) + await session.commit() diff --git a/backend/utils.py b/backend/utils.py index 382163a..c199c44 100644 --- a/backend/utils.py +++ b/backend/utils.py @@ -1,3 +1,11 @@ +"""后端通用工具函数。 + +该模块放置与业务无关的通用能力: +- UTC 时间获取 +- 日志初始化(控制台 + 可选文件滚动) +- 受限路径拼接(防目录穿越) +""" + from __future__ import annotations import logging @@ -8,10 +16,17 @@ from pathlib import Path def utc_now() -> datetime: + """返回当前 UTC 时间(timezone-aware)。""" return datetime.now(timezone.utc) def configure_logging(level: str, log_file: Path | None = None) -> None: + """配置全局日志。 + + Args: + level: 日志级别字符串(例如 "INFO" / "DEBUG")。 + log_file: 可选的日志文件路径;提供时启用按天滚动。 + """ level_value = getattr(logging, level.upper(), logging.INFO) logging_handlers: list[logging.Handler] = [logging.StreamHandler()] if log_file is not None: @@ -39,10 +54,15 @@ def configure_logging(level: str, log_file: Path | None = None) -> None: def project_root() -> Path: + """返回项目根目录(backend 的上一级)。""" return Path(__file__).resolve().parents[1] def safe_join(root: Path, untrusted_path: str) -> Path: + """将不可信路径拼接到 root 下,并阻止目录穿越/绝对路径/UNC 路径。 + + 主要用于下载/文件访问等接口,避免访问到文件根目录之外。 + """ if untrusted_path.startswith(("\\\\", "//")): raise ValueError("UNC path is not allowed") if os.path.isabs(untrusted_path): diff --git a/tools/docx_to_md.py b/tools/docx_to_md.py new file mode 100644 index 0000000..043b205 --- /dev/null +++ b/tools/docx_to_md.py @@ -0,0 +1,78 @@ +from __future__ import annotations + +"""将 .docx 文档转换为 Markdown。 + +实现思路: +- 使用 mammoth 将 docx 转为 HTML(对 Word 样式有较好兼容) +- 再使用 markdownify 将 HTML 转为 Markdown +- 可选导出文档内图片到 assets 目录,并在 Markdown 中引用相对路径 +""" + +import argparse +import os +from pathlib import Path + + +def main() -> int: + """命令行入口:执行 docx -> md 转换。""" + parser = argparse.ArgumentParser() + parser.add_argument("docx_path") + parser.add_argument("md_path") + parser.add_argument("--assets-dir", default=None) + args = parser.parse_args() + + docx_path = Path(args.docx_path).expanduser().resolve() + md_path = Path(args.md_path).expanduser().resolve() + assets_dir = Path(args.assets_dir).expanduser().resolve() if args.assets_dir else None + + if not docx_path.exists() or not docx_path.is_file(): + raise FileNotFoundError(str(docx_path)) + + import mammoth + from markdownify import markdownify as md + + image_index = 0 + + def _convert_image(image): + """将 docx 内嵌图片写入 assets 目录,并返回 Markdown 可用的相对路径。""" + nonlocal image_index + if assets_dir is None: + # 不导出图片时,返回空 src,避免把图片内容直接内联到 Markdown + return {"src": ""} + assets_dir.mkdir(parents=True, exist_ok=True) + ext = _guess_image_extension(image.content_type) + image_index += 1 + name = f"image_{image_index:03d}{ext}" + target = assets_dir / name + with target.open("wb") as f: + f.write(image.read()) + rel = os.path.relpath(target, md_path.parent) + rel = rel.replace("\\", "/") + return {"src": rel} + + result = mammoth.convert_to_html(docx_path, convert_image=mammoth.images.img_element(_convert_image)) + html = result.value + markdown = md(html, heading_style="ATX", bullets="-") + + md_path.parent.mkdir(parents=True, exist_ok=True) + md_path.write_text(markdown, encoding="utf-8") + return 0 + + +def _guess_image_extension(content_type: str) -> str: + """根据图片的 MIME 类型推断文件扩展名。""" + mapping = { + "image/png": ".png", + "image/jpeg": ".jpg", + "image/jpg": ".jpg", + "image/gif": ".gif", + "image/bmp": ".bmp", + "image/tiff": ".tiff", + "image/webp": ".webp", + "image/svg+xml": ".svg", + } + return mapping.get(content_type, "") + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/汽车数字孪生系统_数据交互协议.docx b/汽车数字孪生系统_数据交互协议.docx new file mode 100644 index 0000000..0e83379 Binary files /dev/null and b/汽车数字孪生系统_数据交互协议.docx differ diff --git a/汽车数字孪生系统_数据交互协议.md b/汽车数字孪生系统_数据交互协议.md new file mode 100644 index 0000000..2f470e7 --- /dev/null +++ b/汽车数字孪生系统_数据交互协议.md @@ -0,0 +1,571 @@ +# 汽车数字孪生系统 + +主控程序和Unity数据交互协议文档 + +# 一、系统交互 + +系统由两台独立主机组成: + +• 主控程序(负责传感器数据采集、系统控制、数据记录存储等) + +• Unity数字孪生(负责三维场景渲染、运动计算、视频录制) + +数据流向: + +| | | | | +| --- | --- | --- | --- | +| **数据类型** | **方向** | **频率** | **说明** | +| InitConfig | 主控 → Unity | 1次/任务 | 初始化车辆、驾驶员、场景配置 | +| Command | 主控 → Unity | 低频 | 开始/停止/暂停等控制指令 | +| DriveData | 主控 → Unity | 50Hz | 传感器数据驱动虚拟车运动 | +| FrameRecord | Unity → 主控 | 50Hz | 车辆位置记录用于回放 | +| Status | Unity → 主控 | 按需 | Unity运行状态反馈 | + +# 二、InitConfig - 初始化配置 + +方向:主控 → Unity + +频率:任务开始时发送 1 次 + +用途:配置车辆信息、驾驶员信息、场景参数、录制设置 + +{ + +"msgType": "init", // 消息类型:初始化配置 + +"timestamp": 1737388800000, // 发送时间戳 + +"session": { // ===== 会话信息 ===== + +"sessionId": "sess\_20250120\_143000\_001", // 会话唯一ID + +"taskId": "task\_brake\_001", // 任务ID + +"taskName": "紧急制动测试", // 任务名称(显示用) + +"createTime": "2025-01-20 14:30:00", // 创建时间(可读格式) + +"syncTimestamp": 1737388800000 // 同步基准时间戳(数据/视频对齐用) + +}, + +"driver": { // ===== 驾驶员信息 ===== + +"driverId": "D20250001", // 驾驶员编号 + +"name": "张三", // 驾驶员姓名 + +"department": "培训一部", // 所属部门 + +"level": "初级学员", // 学员等级/身份 + +"avatar": "" // 头像URL(或预制) + +}, + +"vehicle": { // ===== 车辆配置 ===== + +"vehicleId": "V003", // 车辆编号 + +"model": "改装教学车A型", // 车型名称 + +"plateNo": "沪A·12345", // 车牌号 + +"color": "#FFFFFF", // 车身颜色(十六进制) + +"colorName": "珍珠白" // 颜色名称(显示用) + +}, + +"scene": { // ===== 场景配置 ===== + +"sceneId": "scene\_03", // 场景ID + +"sceneName": "城市道路", // 场景名称 + +"sceneFile": "CityRoad", // Unity场景文件名 + +"weather": "sunny", // 天气:sunny/rain/fog/night + +"spawnPoint": "SpawnPoint\_A", // 出生点预制体名称 + +"hasGpsMapping": true // 该场景是否有GPS坐标映射 + +}, + +"recording": { // ===== 录制设置 ===== + +"enabled": true, // 是否启用录制 + +"recordId": "rec\_20250120\_143000\_001", // 录制ID(与视频文件关联) + +"frameRate": 50, // 数据录制帧率 + +"videoFrameRate": 30, // 视频录制帧率 + +"videoResolution": "1280x720" // 视频分辨率 + +}, + +"vehicleParams": { // ===== 车辆物理参数 ===== + +"wheelRadius": 0.32, // 轮胎半径(米) + +"steeringRatio": 15.5, // 方向盘转向比 + +"wheelbase": 2.68 // 轴距(米) + +} + +} + +# 三、Command - 控制指令 + +方向:主控 → Unity + +频率:低频,人工触发时发送 + +用途:系统控制、模式切换、回放控制、相机和UI控制 + +{ + +"msgType": "command", // 消息类型:控制指令 + +"timestamp": 1737388800000, // 发送时间戳 + +"seqId": 1, // 指令序列号(递增) + +"action": "start", // ===== 主动作 ===== + +// start - 开始任务 + +// stop - 停止任务 + +// pause - 暂停 + +// resume - 恢复 + +// reset - 重置到起点 + +// emergency\_stop - 紧急制动 + +"mode": { // ===== 模式设置(可选)===== + +"type": "realtime", // realtime - 实时驱动模式 + +// playback - 回放模式 + +// standby - 待机模式 + +"playbackId": null // 回放模式时指定录制ID + +}, + +"playback": { // ===== 回放控制(回放模式有效)===== + +"action": "play", // play/pause/seek/setSpeed + +"seekFrame": 0, // 跳转到指定帧号 + +"seekTime": 0.0, // 跳转到指定时间(秒) + +"playSpeed": 1.0 // 播放速度倍率 + +}, + +"camera": { // ===== 相机控制(可选)===== + +"viewMode": "chase", // driver - 驾驶员视角 + +// chase - 追尾视角 + +"fov": 60 // 视野角度 + +}, + +"ui": { // ===== UI控制(暂定,根据UI调整)===== + +"showDashboard": true, // 显示仪表盘 + +"showTelemetry": true, // 显示遥测数据面板 + +"showTrajectory": false, // 显示行驶轨迹线 + +"showMinimap": true // 显示小地图 + +} + +} + +# 四、DriveData - 实时驱动数据 + +方向:主控 → Unity + +频率:50Hz(每 20ms 一帧) + +用途:传感器数据驱动Unity虚拟车辆运动 + +{ + +"msgType": "drive", // 消息类型:驱动数据 + +"ts": 1737388800123, // 时间戳(毫秒) + +"seq": 50001, // 帧序列号(递增,丢帧检测用) + +// ==================== 运动控制核心 ==================== + +"speed": 45.6, // 车速(km/h) + +"steer": 12.5, // 方向盘角度(度)左负右正 + +"wheelRpm": { // 各轮转速(rpm) + +"fl": 650.0, // 左前轮 Front-Left + +"fr": 652.0, // 右前轮 Front-Right + +"rl": 648.0, // 左后轮 Rear-Left + +"rr": 649.0 // 右后轮 Rear-Right + +}, + +// ==================== 踏板状态 ==================== + +"throttle": 0.35, // 油门/电门开度 0.0~1.0 + +"brake": 0.0, // 刹车开度 0.0~1.0 + +"handbrake": 0.0, // 手刹开度 0.0~1.0 + +// ==================== 传动系统 ==================== + +"gear": 3, // 挡位:-1倒挡/0空挡/1~N前进挡 + +"engine": { // 发动机/电机数据(可选) + +"rpm": 2800, // 转速(rpm)(可选) + +"torque": 180.5 // 输出扭矩(N·m)(可选) + +}, + +// ==================== IMU数据(可选)==================== + +"imu": { // 惯性测量单元 + +"ax": 0.15, // X轴加速度(m/s²)纵向 + +"ay": -0.02, // Y轴加速度(m/s²)垂直 + +"az": 0.05, // Z轴加速度(m/s²)横向 + +"gx": 0.01, // X轴角速度(rad/s)俯仰 + +"gy": 0.005, // Y轴角速度(rad/s)横滚 + +"gz": 0.12 // Z轴角速度(rad/s)偏航 + +}, + +// ==================== 车灯状态 ==================== + +"lights": { // 车灯 + +"head": 1, // 大灯:0关/1近光/2远光 + +"turn": 0, // 转向灯:0关/1左/2右/3双闪 + +"brake": false, // 刹车灯 + +"reverse": false, // 倒车灯 + +"fog": false // 雾灯 + +}, + +// ==================== 其他状态 ==================== + +"misc": { // 其他 + +"horn": false, // 喇叭 + +"wiper": 0, // 雨刷:0关/1低速/2高速 + +"seatbelt": true // 安全带是否系好 + +} + +} + +# 五、FrameRecord - 帧记录数据 + +方向:Unity → 主控 + +频率:50Hz(每 20ms 一帧) + +用途:记录Unity计算的车辆绝对位置,用于精确回放 + +{ + +"msgType": "frame", // 消息类型:帧记录 + +"ts": 1737388800123, // 时间戳(毫秒) + +"seq": 50001, // 帧序列号(与DriveData对应) + +"elapsed": 100.02, // 相对任务开始的时间(秒) + +// ==================== 车辆位置(Unity世界坐标)==================== + +"pos": { // 位置 + +"x": 1250.35, // X坐标(米) + +"y": 0.42, // Y坐标/高度(米) + +"z": 3560.78 // Z坐标(米) + +}, + +// ==================== 车辆旋转 ==================== + +"rot": { // 旋转(四元数)程序用 + +"x": 0.0, + +"y": 0.383, + +"z": 0.0, + +"w": 0.924 + +}, + +"euler": { // 旋转(欧拉角,度)便于查看 + +"pitch": 0.5, // 俯仰角(抬头为正) + +"yaw": 45.2, // 偏航角/航向(顺时针为正) + +"roll": -0.1 // 横滚角(右倾为正) + +}, + +// ==================== 速度向量 ==================== + +"vel": { // 速度 + +"x": 32.1, // X方向速度(m/s) + +"y": 0.0, // Y方向速度(m/s) + +"z": 31.8, // Z方向速度(m/s) + +"speed": 45.2 // 合速度(km/h) + +}, + +// ==================== 车轮状态(视觉还原用)==================== + +"wheels": { // 车轮 + +"steerAngle": 12.5, // 前轮实际转向角(度) + +"fl": { // 左前轮 + +"rot": 1250.6, // 累计旋转角度(度) + +"susp": 0.05 // 悬挂压缩量(米) + +}, + +"fr": { // 右前轮 + +"rot": 1252.1, + +"susp": 0.04 + +}, + +"rl": { // 左后轮 + +"rot": 1248.3, + +"susp": 0.03 + +}, + +"rr": { // 右后轮 + +"rot": 1249.0, + +"susp": 0.03 + +} + +}, + +// ==================== 视觉状态 ==================== + +"visual": { // 视觉状态 + +"gear": 3, // 挡位显示 + +"steerWheel": 45.0, // 方向盘角度(内饰视角用) + +"speedometer": 45.6, // 速度表读数 + +"tachometer": 2800, // 转速表读数 + +"lights": { // 车灯状态 + +"head": 1, + +"turn": 0, + +"brake": false, + +"reverse": false + +} + +}, + +// ==================== GPS显示数据(学校场景用) ==================== + +"gps": { // GPS坐标(从Unity坐标换算) + +"valid": true, // 该场景是否有GPS映射 + +"lat": 31.230416, // 纬度 + +"lng": 121.473701, // 经度 + +"alt": 4.5, // 海拔(米) + +"heading": 45.2 // GPS航向(度) + +} + +} + +# 六、Status - Unity状态反馈 + +方向:Unity → 主控 + +频率:状态变化时发送,或主控查询时响应 + +用途:反馈Unity运行状态、性能指标、错误信息 + +{ + +"msgType": "status", // 消息类型:状态反馈 + +"ts": 1737388800000, // 时间戳 + +"state": "running", // ===== Unity当前状态 ===== + +// loading - 加载场景中 + +// ready - 就绪,等待开始 + +// running - 任务运行中 + +// paused - 已暂停 + +// stopped - 已停止 + +// playback - 回放中 + +// error - 出错 + +"scene": { // ===== 场景状态 ===== + +"loaded": true, // 场景是否加载完成 + +"sceneId": "scene\_03", // 当前场景ID + +"spawnPointFound": true // 出生点预制体是否找到 + +}, + +"recording": { // ===== 录制状态 ===== + +"isRecording": true, // 是否正在录制 + +"recordId": "rec\_20250120\_143000\_001", // 当前录制ID + +"frameCount": 15000, // 已录制帧数 + +"duration": 300.0 // 已录制时长(秒) + +}, + +"performance": { // ===== 性能指标 ===== + +"fps": 58, // 当前帧率 + +"renderTime": 12.5, // 渲染耗时(ms) + +"physicsTime": 2.3, // 物理计算耗时(ms) + +"encodeTime": 8.2 // 视频编码耗时(ms) + +}, + +"connection": { // ===== 连接状态 ===== + +"lastDriveDataSeq": 50001, // 最后收到的DriveData序列号 + +"lastDriveDataTime": 1737388800123, // 最后收到DriveData的时间 + +"dataLossCount": 0 // 累计丢帧数 + +}, + +"error": { // ===== 错误信息 ===== + +"code": 0, // 错误码:0表示无错误 + +"message": "" // 错误描述 + +} + +} + +# 状态汇总: + +## 车灯状态 + +| | | | +| --- | --- | --- | +| **字段** | **值** | **说明** | +| head | 0 / 1 / 2 | 大灯:关 / 近光 / 远光 | +| turn | 0 / 1 / 2 / 3 | 转向灯:关 / 左 / 右 / 双闪 | +| wiper | 0 / 1 / 2 | 雨刷:关 / 低速 / 高速 | +| gear | -1 / 0 / 1~N | 挡位:倒挡 / 空挡(N/P) / 前进挡 | + +## Unity状态 + +| | | +| --- | --- | +| **state值** | **说明** | +| loading | 加载场景中 | +| ready | 就绪,等待开始指令 | +| running | 任务运行中(实时模式) | +| paused | 已暂停 | +| stopped | 已停止 | +| playback | 回放模式运行中 | +| error | 发生错误 | + +## 天气 + +| | | +| --- | --- | +| **weather值** | **说明** | +| sunny | 晴天 | +| rain | 雨天 | +| fog | 雾天 | +| night | 夜间 | \ No newline at end of file