emcp/backend/app/cache/memory_store.py

38 lines
1.1 KiB
Python
Raw Permalink Normal View History

2026-05-18 09:12:14 +08:00
from collections import deque
from dataclasses import dataclass, field
from threading import Lock
from typing import Deque, Optional
from app.schemas.platform import AlarmEvent, DeviceStatus, RealtimeData
@dataclass
class MemoryStore:
realtime_data: Optional[RealtimeData] = None
device_status: Optional[DeviceStatus] = None
recent_alarms: Deque[AlarmEvent] = field(default_factory=lambda: deque(maxlen=100))
_lock: Lock = field(default_factory=Lock)
def set_realtime(self, data: RealtimeData) -> None:
with self._lock:
self.realtime_data = data
def get_realtime(self) -> Optional[RealtimeData]:
with self._lock:
return self.realtime_data
def set_status(self, status: DeviceStatus) -> None:
with self._lock:
self.device_status = status
def get_status(self) -> Optional[DeviceStatus]:
with self._lock:
return self.device_status
def push_alarm(self, alarm: AlarmEvent) -> None:
with self._lock:
self.recent_alarms.appendleft(alarm)
memory_store = MemoryStore()