emcp/backend/app/services/platform_service.py

449 lines
18 KiB
Python
Raw Permalink Normal View History

2026-05-18 09:12:14 +08:00
from __future__ import annotations
from datetime import datetime
from typing import Any, Dict, List, Optional
2026-05-18 09:12:14 +08:00
from app.adapters.device_client import CDeviceClient, MockDeviceClient
from app.cache.memory_store import memory_store
from app.core.config import settings
from app.core.security import hash_password
from app.repositories.alarm_repo import AlarmRepository
from app.repositories.json_config_repo import JsonConfigRepository
from app.schemas.platform import (
AiAlarmSettingIn,
ChannelConfigIn,
DeviceConfigIn,
2026-05-19 14:47:08 +08:00
DevicePasswordUpdateIn,
2026-05-18 09:12:14 +08:00
DeviceStatus,
LineAlarmSettingIn,
2026-05-19 09:26:51 +08:00
NetConfigItem,
2026-05-19 14:47:08 +08:00
LineData,
2026-05-18 09:12:14 +08:00
RealtimeData,
SwitchControlIn,
SystemConfigIn,
2026-05-19 14:47:08 +08:00
ValueGroup,
2026-05-19 09:26:51 +08:00
UartConfigItem,
2026-05-18 09:12:14 +08:00
)
from app.ws.manager import ws_manager
class PlatformService:
def __init__(self) -> None:
self.config_repo = JsonConfigRepository()
self.alarm_repo = AlarmRepository()
self.device_client = MockDeviceClient() if settings.use_mock_device else CDeviceClient()
2026-05-19 14:47:08 +08:00
def _safe_ratio(self, primary: float, secondary: float, default: float = 1.0) -> float:
if abs(float(secondary)) < 1e-9:
return default
return float(primary) / float(secondary)
def _get_line_transformer_ratios(self, line: LineData) -> Dict[str, float]:
pri = line.pri_val
sec = line.sec_val
pt_ratio_candidates = [
self._safe_ratio(pri.Ua, sec.Ua, 0.0),
self._safe_ratio(pri.Ub, sec.Ub, 0.0),
self._safe_ratio(pri.Uc, sec.Uc, 0.0),
self._safe_ratio(pri.Uab, sec.Uab, 0.0),
self._safe_ratio(pri.Ubc, sec.Ubc, 0.0),
self._safe_ratio(pri.Uca, sec.Uca, 0.0),
]
ct_ratio_candidates = [
self._safe_ratio(pri.Ia, sec.Ia, 0.0),
self._safe_ratio(pri.Ib, sec.Ib, 0.0),
self._safe_ratio(pri.Ic, sec.Ic, 0.0),
]
pt_ratio = next((ratio for ratio in pt_ratio_candidates if ratio > 0), 1.0)
ct_ratio = next((ratio for ratio in ct_ratio_candidates if ratio > 0), 1.0)
return {"pt_ratio": pt_ratio, "ct_ratio": ct_ratio}
def _build_primary_from_secondary(self, secondary: ValueGroup, pt_ratio: float, ct_ratio: float) -> ValueGroup:
power_ratio = pt_ratio * ct_ratio
return ValueGroup(
Ua=secondary.Ua * pt_ratio,
Ub=secondary.Ub * pt_ratio,
Uc=secondary.Uc * pt_ratio,
Ia=secondary.Ia * ct_ratio,
Ib=secondary.Ib * ct_ratio,
Ic=secondary.Ic * ct_ratio,
Pa=secondary.Pa * power_ratio,
Pb=secondary.Pb * power_ratio,
Pc=secondary.Pc * power_ratio,
Pt=secondary.Pt * power_ratio,
Qa=secondary.Qa * power_ratio,
Qb=secondary.Qb * power_ratio,
Qc=secondary.Qc * power_ratio,
Qt=secondary.Qt * power_ratio,
Sa=secondary.Sa * power_ratio,
Sb=secondary.Sb * power_ratio,
Sc=secondary.Sc * power_ratio,
St=secondary.St * power_ratio,
PFa=secondary.PFa,
PFb=secondary.PFb,
PFc=secondary.PFc,
PFt=secondary.PFt,
Uab=secondary.Uab * pt_ratio,
Ubc=secondary.Ubc * pt_ratio,
Uca=secondary.Uca * pt_ratio,
frq=secondary.frq,
)
def _round_value_group(self, group: ValueGroup) -> ValueGroup:
group_data = group.model_dump()
precision_map = {
"Ua": 3,
"Ub": 3,
"Uc": 3,
"Ia": 3,
"Ib": 3,
"Ic": 3,
"Pa": 2,
"Pb": 2,
"Pc": 2,
"Pt": 2,
"Qa": 2,
"Qb": 2,
"Qc": 2,
"Qt": 2,
"Sa": 2,
"Sb": 2,
"Sc": 2,
"St": 2,
"PFa": 3,
"PFb": 3,
"PFc": 3,
"PFt": 3,
"Uab": 3,
"Ubc": 3,
"Uca": 3,
"frq": 3,
}
for field_name, precision in precision_map.items():
value = group_data.get(field_name)
if isinstance(value, (int, float)):
group_data[field_name] = round(float(value), precision)
return ValueGroup(**group_data)
def _normalize_realtime_precision(self, realtime: RealtimeData) -> RealtimeData:
line_list = []
for line in realtime.line_list:
ratios = self._get_line_transformer_ratios(line)
line_list.append(
LineData(
line_no=line.line_no,
pri_val=self._round_value_group(
self._build_primary_from_secondary(
line.sec_val,
ratios["pt_ratio"],
ratios["ct_ratio"],
)
),
sec_val=self._round_value_group(line.sec_val),
)
)
ai_collect = {
key: round(float(value), 2) if isinstance(value, (int, float)) else value
for key, value in realtime.ai_collect.items()
}
return RealtimeData(line_list=line_list, switch=dict(realtime.switch), ai_collect=ai_collect)
2026-05-18 09:12:14 +08:00
def get_realtime_data(self) -> RealtimeData:
cached = memory_store.get_realtime()
if cached is not None:
2026-05-19 14:47:08 +08:00
normalized = self._normalize_realtime_precision(cached)
memory_store.set_realtime(normalized)
return normalized
2026-05-18 09:12:14 +08:00
realtime = self.device_client.read_realtime_data()
2026-05-19 14:47:08 +08:00
normalized = self._normalize_realtime_precision(realtime)
memory_store.set_realtime(normalized)
return normalized
2026-05-18 09:12:14 +08:00
def get_device_status(self) -> DeviceStatus:
cached = memory_store.get_status()
if cached is not None:
return cached
status = self.device_client.read_device_status()
memory_store.set_status(status)
return status
2026-05-19 09:26:51 +08:00
def _default_device_config(self) -> Dict[str, Any]:
return {
"password": "",
"hardware_version": {
"board_version": "B001.001.001",
"display_version": "S001.001.001",
"other_version": "Y001.001.001",
},
"software_version": {
"display_program": "001.001.001",
"communication_program": "001.001.001",
"measurement_program": "001.001.001",
},
"net": [
{"nic": "网卡一", "ip": "192.168.1.10", "mask": "255.255.255.0", "gateway": "192.168.1.1", "protocol": "Modbus TCP"},
{"nic": "网卡二", "ip": "192.168.1.56", "mask": "255.255.255.255", "gateway": "192.168.1.56", "protocol": "Modbus TCP"},
],
"uart": [
{"port": "COM1", "baud": 9600, "parity": "NONE", "data_bits": 8, "stop_bits": 1, "protocol": ""},
{"port": "COM2", "baud": 115200, "parity": "NONE", "data_bits": 8, "stop_bits": 1, "protocol": "Modbus RTU"},
],
}
2026-05-19 09:26:51 +08:00
def _merge_keyed_items(self, defaults: List[Dict[str, Any]], current: Any, key_field: str) -> List[Dict[str, Any]]:
merged = [dict(item) for item in defaults]
if not isinstance(current, list):
return merged
index_map = {
item.get(key_field): index
for index, item in enumerate(merged)
if isinstance(item, dict) and item.get(key_field) is not None
}
for item in current:
if not isinstance(item, dict):
continue
key = item.get(key_field)
if key in index_map:
merged[index_map[key]].update(item)
else:
merged.append(dict(item))
return merged
def _upsert_keyed_item(self, items: List[Dict[str, Any]], payload: Dict[str, Any], key_field: str) -> List[Dict[str, Any]]:
key = payload.get(key_field)
updated = False
for index, item in enumerate(items):
if item.get(key_field) == key:
items[index] = payload
updated = True
break
if not updated:
items.append(payload)
return items
def _find_keyed_item(self, items: List[Dict[str, Any]], key_field: str, key_value: str) -> Optional[Dict[str, Any]]:
for item in items:
if item.get(key_field) == key_value:
return item
return None
def _load_device_config_with_defaults(self) -> Dict[str, Any]:
current = self.config_repo.read_device_config()
defaults = self._default_device_config()
data = {
"password": current.get("password", "") if isinstance(current, dict) else "",
"hardware_version": dict(defaults["hardware_version"]),
"software_version": dict(defaults["software_version"]),
"net": [],
"uart": [],
}
if isinstance(current, dict):
if isinstance(current.get("hardware_version"), dict):
data["hardware_version"].update(current["hardware_version"])
if isinstance(current.get("software_version"), dict):
data["software_version"].update(current["software_version"])
data["net"] = self._merge_keyed_items(defaults["net"], current.get("net"), "nic")
data["uart"] = self._merge_keyed_items(defaults["uart"], current.get("uart"), "port")
else:
data["net"] = self._merge_keyed_items(defaults["net"], None, "nic")
data["uart"] = self._merge_keyed_items(defaults["uart"], None, "port")
return data
def save_device_config(self, payload: DeviceConfigIn) -> Dict[str, Any]:
data = self._load_device_config_with_defaults()
payload_data = payload.model_dump()
data["hardware_version"] = payload_data["hardware_version"]
data["software_version"] = payload_data["software_version"]
data["net"] = self._merge_keyed_items(data["net"], payload_data["net"], "nic")
data["uart"] = self._merge_keyed_items(data["uart"], payload_data["uart"], "port")
if payload.password.strip():
data["password"] = hash_password(payload.password)
path = self.config_repo.write_device_config(data)
device_result = self.device_client.send_device_config(payload)
return {"save_path": f"/config/{path.name}", **device_result}
2026-05-19 14:47:08 +08:00
def save_device_password(self, payload: DevicePasswordUpdateIn) -> Dict[str, Any]:
data = self._load_device_config_with_defaults()
data["password"] = hash_password(payload.password)
path = self.config_repo.write_device_config(data)
return {"save_path": f"/config/{path.name}", "target": "password", "send_status": "成功"}
2026-05-19 09:26:51 +08:00
def get_device_config(self) -> Dict[str, Any]:
data = self._load_device_config_with_defaults()
# 不返回已保存的哈希密码,避免前端把哈希串直接显示到设置界面
data["password"] = ""
return data
2026-05-19 09:26:51 +08:00
def get_net_config(self, nic: str) -> Dict[str, Any]:
device_config = self._load_device_config_with_defaults()
item = self._find_keyed_item(device_config["net"], "nic", nic)
if item is not None:
return item
return {"nic": nic, "ip": "", "mask": "", "gateway": "", "protocol": ""}
def save_net_config(self, payload: NetConfigItem) -> Dict[str, Any]:
device_config = self._load_device_config_with_defaults()
device_config["net"] = self._upsert_keyed_item(device_config["net"], payload.model_dump(), "nic")
path = self.config_repo.write_device_config(device_config)
return {"save_path": f"/config/{path.name}", "target": "net", "nic": payload.nic, "send_status": "成功"}
def get_uart_config(self, port: str) -> Dict[str, Any]:
device_config = self._load_device_config_with_defaults()
item = self._find_keyed_item(device_config["uart"], "port", port)
if item is not None:
return item
return {"port": port, "baud": 9600, "parity": "NONE", "data_bits": 8, "stop_bits": 1, "protocol": ""}
def save_uart_config(self, payload: UartConfigItem) -> Dict[str, Any]:
device_config = self._load_device_config_with_defaults()
device_config["uart"] = self._upsert_keyed_item(device_config["uart"], payload.model_dump(), "port")
path = self.config_repo.write_device_config(device_config)
return {"save_path": f"/config/{path.name}", "target": "uart", "port": payload.port, "send_status": "成功"}
2026-05-18 09:12:14 +08:00
def save_channel_config(self, payload: ChannelConfigIn) -> Dict[str, Any]:
path = self.config_repo.write_channel_config(payload.model_dump())
device_result = self.device_client.send_channel_config(payload)
return {"save_path": f"/config/{path.name}", **device_result}
def get_channel_config(self) -> Dict[str, Any]:
current = self.config_repo.read_channel_config()
data = {
"ai_channel": [{"ch": 1, "singal_type": "4-20mA", "line_no": 1, "type": "UA", "limit_low": 0, "limit_high": 20}],
"ao_channel": [{"ch": 1, "singal_type": "1-5v", "line_no": 2, "type": "UA", "limit_low": 0, "limit_high": 20}],
}
data.update(current)
return data
2026-05-18 17:05:23 +08:00
def _default_line_alarm_setting(self, line_no: int) -> Dict[str, Any]:
return {
2026-05-18 17:05:23 +08:00
"line_no": line_no,
"over_limit_alarm": [
{"category": "电压", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": True},
{"category": "电流", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": True},
{"category": "差流", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": False},
{"category": "频率", "limit": 180, "delay": 180, "output_node": "开出1", "enabled": False},
],
"fault_alarm": [{"category": "PT断线", "delay": 180, "output_node": "开出1", "enabled": True}],
}
2026-05-18 17:05:23 +08:00
def _normalize_line_alarm_settings(self, raw: Any) -> List[Dict[str, Any]]:
if isinstance(raw, list):
return [item for item in raw if isinstance(item, dict)]
if isinstance(raw, dict):
return [raw]
return []
def save_line_alarm_setting(self, payload: LineAlarmSettingIn) -> Dict[str, Any]:
current = self.config_repo.read_json("setting.json")
if not isinstance(current, dict):
current = {}
line_alarm_list = self._normalize_line_alarm_settings(current.get("line_alarm_setting"))
payload_data = payload.model_dump()
updated = False
for index, item in enumerate(line_alarm_list):
if item.get("line_no") == payload.line_no:
line_alarm_list[index] = payload_data
updated = True
break
if not updated:
line_alarm_list.append(payload_data)
current["line_alarm_setting"] = line_alarm_list
path = self.config_repo.write_setting_config(current)
device_result = self.device_client.send_line_alarm_setting(payload)
return {"save_path": f"/config/{path.name}", **device_result}
def get_line_alarm_setting(self, line_no: int = 1) -> Dict[str, Any]:
current = self.config_repo.read_setting_section("line_alarm_setting")
line_alarm_list = self._normalize_line_alarm_settings(current)
for item in line_alarm_list:
if item.get("line_no") == line_no:
return item
return self._default_line_alarm_setting(line_no)
2026-05-18 09:12:14 +08:00
def save_ai_alarm_setting(self, payload: List[AiAlarmSettingIn]) -> Dict[str, Any]:
current = self.config_repo.read_json("setting.json")
if not isinstance(current, dict):
current = {}
current["ai_alarm_setting"] = [item.model_dump() for item in payload]
path = self.config_repo.write_setting_config(current)
device_result = self.device_client.send_ai_alarm_setting(current["ai_alarm_setting"])
return {"save_path": f"/config/{path.name}", **device_result}
def get_ai_alarm_setting(self) -> List[Dict[str, Any]]:
current = self.config_repo.read_setting_section("ai_alarm_setting")
if isinstance(current, list):
return current
return [
{
"channel_no": 1,
"singal_type": "4-20mA",
"limit_low": 0,
"limit_high": 20,
"delay": 180,
"output_node": "开出1",
"enabled": True,
}
]
2026-05-18 09:12:14 +08:00
def save_system_config(self, payload: SystemConfigIn) -> Dict[str, Any]:
current = self.config_repo.read_json("setting.json")
if not isinstance(current, dict):
current = {}
current["system_config"] = payload.model_dump()
self.config_repo.write_setting_config(current)
return self.device_client.send_system_config(payload)
def get_system_config(self) -> Dict[str, Any]:
current = self.config_repo.read_setting_section("system_config")
data = SystemConfigIn().model_dump()
if isinstance(current, dict):
data.update(current)
return data
def list_alarms(
self,
page: int,
size: int,
no: str = "",
alarm_type: str = "",
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
return self.alarm_repo.list_alarms(
page=page,
size=size,
no=no,
alarm_type=alarm_type,
start_time=start_time,
end_time=end_time,
)
2026-05-18 09:12:14 +08:00
def switch_control(self, payload: SwitchControlIn) -> Dict[str, Any]:
return self.device_client.send_switch_control(payload)
async def poll_device_once(self) -> None:
2026-05-19 14:47:08 +08:00
realtime = self._normalize_realtime_precision(self.device_client.read_realtime_data())
2026-05-18 09:12:14 +08:00
status = self.device_client.read_device_status()
alarms = self.device_client.read_alarm_events()
memory_store.set_realtime(realtime)
memory_store.set_status(status)
await ws_manager.broadcast("real-time", {"type": "real_time", "data": realtime.model_dump()})
for alarm in alarms:
alarm.id = self.alarm_repo.save_alarm(alarm)
memory_store.push_alarm(alarm)
await ws_manager.broadcast("alarm", alarm.model_dump(mode="json"))
platform_service = PlatformService()