158 lines
5.5 KiB
Python
158 lines
5.5 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from abc import ABC, abstractmethod
|
||
|
|
from datetime import datetime
|
||
|
|
from random import Random
|
||
|
|
from typing import Any, Dict, List
|
||
|
|
|
||
|
|
from app.schemas.platform import (
|
||
|
|
AlarmEvent,
|
||
|
|
ChannelConfigIn,
|
||
|
|
DeviceConfigIn,
|
||
|
|
DeviceStatus,
|
||
|
|
LineAlarmSettingIn,
|
||
|
|
LineData,
|
||
|
|
RealtimeData,
|
||
|
|
SystemConfigIn,
|
||
|
|
SwitchControlIn,
|
||
|
|
ValueGroup,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
class DeviceClient(ABC):
|
||
|
|
@abstractmethod
|
||
|
|
def read_realtime_data(self) -> RealtimeData:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def read_device_status(self) -> DeviceStatus:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def read_alarm_events(self) -> List[AlarmEvent]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_ai_alarm_setting(self, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
@abstractmethod
|
||
|
|
def send_switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
|
||
|
|
raise NotImplementedError
|
||
|
|
|
||
|
|
|
||
|
|
class MockDeviceClient(DeviceClient):
|
||
|
|
def __init__(self) -> None:
|
||
|
|
self._random = Random(3568)
|
||
|
|
self._tick = 0
|
||
|
|
|
||
|
|
def _value_group(self, base_u: float, base_i: float, base_p: float) -> ValueGroup:
|
||
|
|
delta = self._tick % 5
|
||
|
|
return ValueGroup(
|
||
|
|
Ua=base_u + delta,
|
||
|
|
Ub=base_u + 2 + delta,
|
||
|
|
Uc=base_u - 1 + delta,
|
||
|
|
Ia=base_i + 0.1 * delta,
|
||
|
|
Ib=base_i - 0.1 + 0.1 * delta,
|
||
|
|
Ic=base_i + 0.2 + 0.1 * delta,
|
||
|
|
Pa=base_p + delta,
|
||
|
|
Pb=base_p - 2 + delta,
|
||
|
|
Pc=base_p + 3 + delta,
|
||
|
|
Pt=base_p * 3 + delta,
|
||
|
|
Qa=12 + delta,
|
||
|
|
Qb=11 + delta,
|
||
|
|
Qc=13 + delta,
|
||
|
|
Qt=36 + delta,
|
||
|
|
Sa=base_p + 15 + delta,
|
||
|
|
Sb=base_p + 10 + delta,
|
||
|
|
Sc=base_p + 16 + delta,
|
||
|
|
St=base_p * 3 + 44 + delta,
|
||
|
|
PFa=0.98,
|
||
|
|
PFb=0.97,
|
||
|
|
PFc=0.99,
|
||
|
|
PFt=0.98,
|
||
|
|
Uab=base_u * 1.73,
|
||
|
|
Ubc=base_u * 1.72,
|
||
|
|
Uca=base_u * 1.73,
|
||
|
|
frq=50.0,
|
||
|
|
)
|
||
|
|
|
||
|
|
def read_realtime_data(self) -> RealtimeData:
|
||
|
|
self._tick += 1
|
||
|
|
line_list = [
|
||
|
|
LineData(line_no=1, pri_val=self._value_group(6000, 150, 820), sec_val=self._value_group(57.6, 1.2, 68)),
|
||
|
|
LineData(line_no=2, pri_val=self._value_group(5990, 148, 810), sec_val=self._value_group(57.2, 1.1, 66)),
|
||
|
|
LineData(line_no=3, pri_val=self._value_group(6010, 151, 830), sec_val=self._value_group(58.1, 1.3, 70)),
|
||
|
|
LineData(line_no=4, pri_val=self._value_group(6020, 149, 825), sec_val=self._value_group(58.4, 1.2, 69)),
|
||
|
|
]
|
||
|
|
switch = {f"di{i}": int((i + self._tick) % 2 == 0) for i in range(1, 13)}
|
||
|
|
switch.update({f"do{i}": int((i + self._tick + 1) % 2 == 0) for i in range(1, 13)})
|
||
|
|
ai_collect = {f"ai{i}": round(1 + self._random.random() * 5 + (self._tick % 3) * 0.1, 2) for i in range(1, 13)}
|
||
|
|
return RealtimeData(line_list=line_list, switch=switch, ai_collect=ai_collect)
|
||
|
|
|
||
|
|
def read_device_status(self) -> DeviceStatus:
|
||
|
|
return DeviceStatus(
|
||
|
|
self_check="正常",
|
||
|
|
net1="正常",
|
||
|
|
net2="正常",
|
||
|
|
uart1="正常",
|
||
|
|
uart2="正常" if self._tick % 4 else "断开",
|
||
|
|
)
|
||
|
|
|
||
|
|
def read_alarm_events(self) -> List[AlarmEvent]:
|
||
|
|
if self._tick % 6 != 0:
|
||
|
|
return []
|
||
|
|
return [
|
||
|
|
AlarmEvent(
|
||
|
|
alarm_type="line_alarm",
|
||
|
|
time=datetime.now(),
|
||
|
|
no=str((self._tick // 6) % 4 + 1),
|
||
|
|
type="电压",
|
||
|
|
content="模拟告警:线路电压越限",
|
||
|
|
level="中",
|
||
|
|
)
|
||
|
|
]
|
||
|
|
|
||
|
|
def send_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
|
||
|
|
return {"send_status": "成功", "target": "device", "items": len(payload.net) + len(payload.uart)}
|
||
|
|
|
||
|
|
def send_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"send_status": "成功",
|
||
|
|
"target": "channel",
|
||
|
|
"items": len(payload.ai_channel) + len(payload.ao_channel),
|
||
|
|
}
|
||
|
|
|
||
|
|
def send_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
|
||
|
|
return {"send_status": "成功", "target": "line_alarm", "line_no": payload.line_no}
|
||
|
|
|
||
|
|
def send_ai_alarm_setting(self, payload: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
|
|
return {"send_status": "成功", "target": "ai_alarm", "items": len(payload)}
|
||
|
|
|
||
|
|
def send_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
|
||
|
|
return {"send_status": "成功", "target": "system", "brightness": payload.brightness}
|
||
|
|
|
||
|
|
def send_switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
|
||
|
|
action_text = "合" if payload.action == 1 else "分"
|
||
|
|
return {"control_status": f"执行成功: 开关{payload.ch}{action_text}"}
|
||
|
|
|
||
|
|
|
||
|
|
class CDeviceClient(MockDeviceClient):
|
||
|
|
"""真实 C 程序接入前先复用 mock 行为,后续在此类中替换协议实现。"""
|