SmartEDT/backend/services/broadcaster.py

43 lines
1.2 KiB
Python
Raw Permalink Normal View History

"""WebSocket 广播器。
维护当前在线的 WebSocket 连接集合并支持向所有连接广播 JSON 消息
"""
from __future__ import annotations
import asyncio
from typing import Any
from starlette.websockets import WebSocket
class Broadcaster:
"""简单的 WebSocket 广播器(线程安全:使用 asyncio.Lock"""
def __init__(self) -> None:
self._clients: set[WebSocket] = set()
self._lock = asyncio.Lock()
async def add(self, ws: WebSocket) -> None:
"""注册连接。"""
async with self._lock:
self._clients.add(ws)
async def remove(self, ws: WebSocket) -> None:
"""移除连接(若不存在则忽略)。"""
async with self._lock:
self._clients.discard(ws)
async def broadcast_json(self, message: dict[str, Any]) -> None:
"""向所有连接广播 JSON。
若某个连接发送失败会被自动移除避免集合泄漏
"""
async with self._lock:
clients = list(self._clients)
for ws in clients:
try:
await ws.send_json(message)
except Exception:
await self.remove(ws)