emcp/backend/app/ws/manager.py
2026-05-18 09:12:14 +08:00

33 lines
1.0 KiB
Python

from __future__ import annotations
import json
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Set
from fastapi import WebSocket
class WebSocketManager:
def __init__(self) -> None:
self._connections: DefaultDict[str, Set[WebSocket]] = defaultdict(set)
async def connect(self, channel: str, websocket: WebSocket) -> None:
await websocket.accept()
self._connections[channel].add(websocket)
def disconnect(self, channel: str, websocket: WebSocket) -> None:
self._connections[channel].discard(websocket)
async def broadcast(self, channel: str, payload: Dict[str, Any]) -> None:
disconnected: List[WebSocket] = []
for websocket in self._connections[channel]:
try:
await websocket.send_text(json.dumps(payload, ensure_ascii=False, default=str))
except Exception:
disconnected.append(websocket)
for websocket in disconnected:
self.disconnect(channel, websocket)
ws_manager = WebSocketManager()