221 lines
9.0 KiB
Python
221 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from app.adapters.device_client import CDeviceClient, MockDeviceClient
|
|
from app.cache.memory_store import memory_store
|
|
from app.core.config import settings
|
|
from app.core.security import hash_password
|
|
from app.repositories.alarm_repo import AlarmRepository
|
|
from app.repositories.json_config_repo import JsonConfigRepository
|
|
from app.schemas.platform import (
|
|
AiAlarmSettingIn,
|
|
ChannelConfigIn,
|
|
DeviceConfigIn,
|
|
DeviceStatus,
|
|
LineAlarmSettingIn,
|
|
RealtimeData,
|
|
SwitchControlIn,
|
|
SystemConfigIn,
|
|
)
|
|
from app.ws.manager import ws_manager
|
|
|
|
|
|
class PlatformService:
|
|
def __init__(self) -> None:
|
|
self.config_repo = JsonConfigRepository()
|
|
self.alarm_repo = AlarmRepository()
|
|
self.device_client = MockDeviceClient() if settings.use_mock_device else CDeviceClient()
|
|
|
|
def get_realtime_data(self) -> RealtimeData:
|
|
cached = memory_store.get_realtime()
|
|
if cached is not None:
|
|
return cached
|
|
realtime = self.device_client.read_realtime_data()
|
|
memory_store.set_realtime(realtime)
|
|
return realtime
|
|
|
|
def get_device_status(self) -> DeviceStatus:
|
|
cached = memory_store.get_status()
|
|
if cached is not None:
|
|
return cached
|
|
status = self.device_client.read_device_status()
|
|
memory_store.set_status(status)
|
|
return status
|
|
|
|
def save_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
|
|
data = payload.model_dump()
|
|
data["password"] = hash_password(payload.password)
|
|
path = self.config_repo.write_device_config(data)
|
|
device_result = self.device_client.send_device_config(payload)
|
|
return {"save_path": f"/config/{path.name}", **device_result}
|
|
|
|
def get_device_config(self) -> Dict[str, Any]:
|
|
current = self.config_repo.read_device_config()
|
|
data = {
|
|
"password": "",
|
|
"hardware_version": {
|
|
"board_version": "B001.001.001",
|
|
"display_version": "S001.001.001",
|
|
"other_version": "Y001.001.001",
|
|
},
|
|
"software_version": {
|
|
"display_program": "001.001.001",
|
|
"communication_program": "001.001.001",
|
|
"measurement_program": "001.001.001",
|
|
},
|
|
"net": [
|
|
{"nic": "网卡一", "ip": "192.168.1.10", "mask": "255.255.255.0", "gateway": "192.168.1.1", "protocol": "Modbus TCP"},
|
|
{"nic": "网卡二", "ip": "192.168.1.56", "mask": "255.255.255.255", "gateway": "192.168.1.56", "protocol": "Modbus TCP"},
|
|
],
|
|
"uart": [
|
|
{"port": "COM1", "baud": 9600, "parity": "NONE", "data_bits": 8, "stop_bits": 1, "protocol": ""},
|
|
{"port": "COM2", "baud": 115200, "parity": "NONE", "data_bits": 8, "stop_bits": 1, "protocol": "Modbus RTU"},
|
|
],
|
|
}
|
|
data.update(current)
|
|
# 不返回已保存的哈希密码,避免前端把哈希串直接显示到设置界面
|
|
data["password"] = ""
|
|
return data
|
|
|
|
def save_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
|
|
path = self.config_repo.write_channel_config(payload.model_dump())
|
|
device_result = self.device_client.send_channel_config(payload)
|
|
return {"save_path": f"/config/{path.name}", **device_result}
|
|
|
|
def get_channel_config(self) -> Dict[str, Any]:
|
|
current = self.config_repo.read_channel_config()
|
|
data = {
|
|
"ai_channel": [{"ch": 1, "singal_type": "4-20mA", "line_no": 1, "type": "UA", "limit_low": 0, "limit_high": 20}],
|
|
"ao_channel": [{"ch": 1, "singal_type": "1-5v", "line_no": 2, "type": "UA", "limit_low": 0, "limit_high": 20}],
|
|
}
|
|
data.update(current)
|
|
return data
|
|
|
|
def _default_line_alarm_setting(self, line_no: int) -> Dict[str, Any]:
|
|
return {
|
|
"line_no": line_no,
|
|
"over_limit_alarm": [
|
|
{"category": "电压", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": True},
|
|
{"category": "电流", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": True},
|
|
{"category": "差流", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": False},
|
|
{"category": "频率", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": False},
|
|
],
|
|
"fault_alarm": [{"category": "PT断线", "delay": 180, "output_node": "开出1", "enabled": True}],
|
|
}
|
|
|
|
def _normalize_line_alarm_settings(self, raw: Any) -> List[Dict[str, Any]]:
|
|
if isinstance(raw, list):
|
|
return [item for item in raw if isinstance(item, dict)]
|
|
if isinstance(raw, dict):
|
|
return [raw]
|
|
return []
|
|
|
|
def save_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
|
|
current = self.config_repo.read_json("setting.json")
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
line_alarm_list = self._normalize_line_alarm_settings(current.get("line_alarm_setting"))
|
|
payload_data = payload.model_dump()
|
|
|
|
updated = False
|
|
for index, item in enumerate(line_alarm_list):
|
|
if item.get("line_no") == payload.line_no:
|
|
line_alarm_list[index] = payload_data
|
|
updated = True
|
|
break
|
|
if not updated:
|
|
line_alarm_list.append(payload_data)
|
|
|
|
current["line_alarm_setting"] = line_alarm_list
|
|
path = self.config_repo.write_setting_config(current)
|
|
device_result = self.device_client.send_line_alarm_setting(payload)
|
|
return {"save_path": f"/config/{path.name}", **device_result}
|
|
|
|
def get_line_alarm_setting(self, line_no: int = 1) -> Dict[str, Any]:
|
|
current = self.config_repo.read_setting_section("line_alarm_setting")
|
|
line_alarm_list = self._normalize_line_alarm_settings(current)
|
|
for item in line_alarm_list:
|
|
if item.get("line_no") == line_no:
|
|
return item
|
|
return self._default_line_alarm_setting(line_no)
|
|
|
|
def save_ai_alarm_setting(self, payload: List[AiAlarmSettingIn]) -> Dict[str, Any]:
|
|
current = self.config_repo.read_json("setting.json")
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
current["ai_alarm_setting"] = [item.model_dump() for item in payload]
|
|
path = self.config_repo.write_setting_config(current)
|
|
device_result = self.device_client.send_ai_alarm_setting(current["ai_alarm_setting"])
|
|
return {"save_path": f"/config/{path.name}", **device_result}
|
|
|
|
def get_ai_alarm_setting(self) -> List[Dict[str, Any]]:
|
|
current = self.config_repo.read_setting_section("ai_alarm_setting")
|
|
if isinstance(current, list):
|
|
return current
|
|
return [
|
|
{
|
|
"channel_no": 1,
|
|
"singal_type": "4-20mA",
|
|
"limit_low": 0,
|
|
"limit_high": 20,
|
|
"delay": 180,
|
|
"output_node": "开出1",
|
|
"enabled": True,
|
|
}
|
|
]
|
|
|
|
def save_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
|
|
current = self.config_repo.read_json("setting.json")
|
|
if not isinstance(current, dict):
|
|
current = {}
|
|
current["system_config"] = payload.model_dump()
|
|
self.config_repo.write_setting_config(current)
|
|
return self.device_client.send_system_config(payload)
|
|
|
|
def get_system_config(self) -> Dict[str, Any]:
|
|
current = self.config_repo.read_setting_section("system_config")
|
|
data = SystemConfigIn().model_dump()
|
|
if isinstance(current, dict):
|
|
data.update(current)
|
|
return data
|
|
|
|
def list_alarms(
|
|
self,
|
|
page: int,
|
|
size: int,
|
|
no: str = "",
|
|
alarm_type: str = "",
|
|
start_time: Optional[datetime] = None,
|
|
end_time: Optional[datetime] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
return self.alarm_repo.list_alarms(
|
|
page=page,
|
|
size=size,
|
|
no=no,
|
|
alarm_type=alarm_type,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
)
|
|
|
|
def switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
|
|
return self.device_client.send_switch_control(payload)
|
|
|
|
async def poll_device_once(self) -> None:
|
|
realtime = self.device_client.read_realtime_data()
|
|
status = self.device_client.read_device_status()
|
|
alarms = self.device_client.read_alarm_events()
|
|
|
|
memory_store.set_realtime(realtime)
|
|
memory_store.set_status(status)
|
|
await ws_manager.broadcast("real-time", {"type": "real_time", "data": realtime.model_dump()})
|
|
|
|
for alarm in alarms:
|
|
alarm.id = self.alarm_repo.save_alarm(alarm)
|
|
memory_store.push_alarm(alarm)
|
|
await ws_manager.broadcast("alarm", alarm.model_dump(mode="json"))
|
|
|
|
|
|
platform_service = PlatformService()
|