SmartEDT/backend/services/unity_socket_client.py

49 lines
1.4 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import asyncio
import json
from typing import Any
class UnitySocketClient:
def __init__(self, host: str, port: int) -> None:
self._host = host
self._port = port
self._lock = asyncio.Lock()
self._writer: asyncio.StreamWriter | None = None
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._port
async def send_json(self, payload: dict[str, Any]) -> None:
data = (json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + "\n").encode("utf-8")
async with self._lock:
writer = await self._ensure_connected()
try:
writer.write(data)
await writer.drain()
except Exception:
await self._close_writer()
raise
async def _ensure_connected(self) -> asyncio.StreamWriter:
if self._writer is not None and not self._writer.is_closing():
return self._writer
reader, writer = await asyncio.open_connection(self._host, self._port)
self._writer = writer
return writer
async def _close_writer(self) -> None:
if self._writer is None:
return
try:
self._writer.close()
await self._writer.wait_closed()
finally:
self._writer = None