38 lines
1.1 KiB
Python
38 lines
1.1 KiB
Python
|
|
from collections import deque
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from threading import Lock
|
||
|
|
from typing import Deque, Optional
|
||
|
|
|
||
|
|
from app.schemas.platform import AlarmEvent, DeviceStatus, RealtimeData
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class MemoryStore:
|
||
|
|
realtime_data: Optional[RealtimeData] = None
|
||
|
|
device_status: Optional[DeviceStatus] = None
|
||
|
|
recent_alarms: Deque[AlarmEvent] = field(default_factory=lambda: deque(maxlen=100))
|
||
|
|
_lock: Lock = field(default_factory=Lock)
|
||
|
|
|
||
|
|
def set_realtime(self, data: RealtimeData) -> None:
|
||
|
|
with self._lock:
|
||
|
|
self.realtime_data = data
|
||
|
|
|
||
|
|
def get_realtime(self) -> Optional[RealtimeData]:
|
||
|
|
with self._lock:
|
||
|
|
return self.realtime_data
|
||
|
|
|
||
|
|
def set_status(self, status: DeviceStatus) -> None:
|
||
|
|
with self._lock:
|
||
|
|
self.device_status = status
|
||
|
|
|
||
|
|
def get_status(self) -> Optional[DeviceStatus]:
|
||
|
|
with self._lock:
|
||
|
|
return self.device_status
|
||
|
|
|
||
|
|
def push_alarm(self, alarm: AlarmEvent) -> None:
|
||
|
|
with self._lock:
|
||
|
|
self.recent_alarms.appendleft(alarm)
|
||
|
|
|
||
|
|
|
||
|
|
memory_store = MemoryStore()
|