from collections import deque from dataclasses import dataclass, field from threading import Lock from typing import Deque, Optional from app.schemas.platform import AlarmEvent, DeviceStatus, RealtimeData @dataclass class MemoryStore: realtime_data: Optional[RealtimeData] = None device_status: Optional[DeviceStatus] = None recent_alarms: Deque[AlarmEvent] = field(default_factory=lambda: deque(maxlen=100)) _lock: Lock = field(default_factory=Lock) def set_realtime(self, data: RealtimeData) -> None: with self._lock: self.realtime_data = data def get_realtime(self) -> Optional[RealtimeData]: with self._lock: return self.realtime_data def set_status(self, status: DeviceStatus) -> None: with self._lock: self.device_status = status def get_status(self) -> Optional[DeviceStatus]: with self._lock: return self.device_status def push_alarm(self, alarm: AlarmEvent) -> None: with self._lock: self.recent_alarms.appendleft(alarm) memory_store = MemoryStore()