emcp/backend/app/adapters/device_client.py

158 lines
5.5 KiB
Python
Raw Permalink Normal View History

2026-05-18 09:12:14 +08:00
from __future__ import annotations
from abc import ABC, abstractmethod
from datetime import datetime
from random import Random
from typing import Any, Dict, List
from app.schemas.platform import (
AlarmEvent,
ChannelConfigIn,
DeviceConfigIn,
DeviceStatus,
LineAlarmSettingIn,
LineData,
RealtimeData,
SystemConfigIn,
SwitchControlIn,
ValueGroup,
)
class DeviceClient(ABC):
@abstractmethod
def read_realtime_data(self) -> RealtimeData:
raise NotImplementedError
@abstractmethod
def read_device_status(self) -> DeviceStatus:
raise NotImplementedError
@abstractmethod
def read_alarm_events(self) -> List[AlarmEvent]:
raise NotImplementedError
@abstractmethod
def send_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def send_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def send_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def send_ai_alarm_setting(self, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def send_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
raise NotImplementedError
@abstractmethod
def send_switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
raise NotImplementedError
class MockDeviceClient(DeviceClient):
def __init__(self) -> None:
self._random = Random(3568)
self._tick = 0
def _value_group(self, base_u: float, base_i: float, base_p: float) -> ValueGroup:
delta = self._tick % 5
return ValueGroup(
Ua=base_u + delta,
Ub=base_u + 2 + delta,
Uc=base_u - 1 + delta,
Ia=base_i + 0.1 * delta,
Ib=base_i - 0.1 + 0.1 * delta,
Ic=base_i + 0.2 + 0.1 * delta,
Pa=base_p + delta,
Pb=base_p - 2 + delta,
Pc=base_p + 3 + delta,
Pt=base_p * 3 + delta,
Qa=12 + delta,
Qb=11 + delta,
Qc=13 + delta,
Qt=36 + delta,
Sa=base_p + 15 + delta,
Sb=base_p + 10 + delta,
Sc=base_p + 16 + delta,
St=base_p * 3 + 44 + delta,
PFa=0.98,
PFb=0.97,
PFc=0.99,
PFt=0.98,
Uab=base_u * 1.73,
Ubc=base_u * 1.72,
Uca=base_u * 1.73,
frq=50.0,
)
def read_realtime_data(self) -> RealtimeData:
self._tick += 1
line_list = [
LineData(line_no=1, pri_val=self._value_group(6000, 150, 820), sec_val=self._value_group(57.6, 1.2, 68)),
LineData(line_no=2, pri_val=self._value_group(5990, 148, 810), sec_val=self._value_group(57.2, 1.1, 66)),
LineData(line_no=3, pri_val=self._value_group(6010, 151, 830), sec_val=self._value_group(58.1, 1.3, 70)),
LineData(line_no=4, pri_val=self._value_group(6020, 149, 825), sec_val=self._value_group(58.4, 1.2, 69)),
]
switch = {f"di{i}": int((i + self._tick) % 2 == 0) for i in range(1, 13)}
switch.update({f"do{i}": int((i + self._tick + 1) % 2 == 0) for i in range(1, 13)})
ai_collect = {f"ai{i}": round(1 + self._random.random() * 5 + (self._tick % 3) * 0.1, 2) for i in range(1, 13)}
return RealtimeData(line_list=line_list, switch=switch, ai_collect=ai_collect)
def read_device_status(self) -> DeviceStatus:
return DeviceStatus(
self_check="正常",
net1="正常",
net2="正常",
uart1="正常",
uart2="正常" if self._tick % 4 else "断开",
)
def read_alarm_events(self) -> List[AlarmEvent]:
if self._tick % 6 != 0:
return []
return [
AlarmEvent(
alarm_type="line_alarm",
time=datetime.now(),
no=str((self._tick // 6) % 4 + 1),
type="电压",
content="模拟告警:线路电压越限",
level="",
)
]
def send_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
return {"send_status": "成功", "target": "device", "items": len(payload.net) + len(payload.uart)}
def send_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
return {
"send_status": "成功",
"target": "channel",
"items": len(payload.ai_channel) + len(payload.ao_channel),
}
def send_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
return {"send_status": "成功", "target": "line_alarm", "line_no": payload.line_no}
def send_ai_alarm_setting(self, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
return {"send_status": "成功", "target": "ai_alarm", "items": len(payload)}
def send_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
return {"send_status": "成功", "target": "system", "brightness": payload.brightness}
def send_switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
action_text = "" if payload.action == 1 else ""
return {"control_status": f"执行成功: 开关{payload.ch}{action_text}"}
class CDeviceClient(MockDeviceClient):
"""真实 C 程序接入前先复用 mock 行为,后续在此类中替换协议实现。"""