SmartEDT/backend/services/server_monitor.py

141 lines
5.1 KiB
Python
Raw Permalink Normal View History

"""服务器监控采集服务。
以较高频率采样系统指标CPU/内存并以较低频率进行下采样后
- 通过 WebSocket 广播给前端
- 写入 TimescaleDBserver_metrics hypertable
"""
import asyncio
import time
import logging
import psutil
import socket
import platform
from datetime import datetime, timezone
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from backend.database.schema import server_metrics
from backend.services.broadcaster import Broadcaster
logger = logging.getLogger("backend.monitor")
class ServerMonitorService:
"""服务器资源监控服务(采样 + 下采样 + 广播 + 落库)。"""
def __init__(self, session_factory: async_sessionmaker, broadcaster: Broadcaster):
self._session_factory = session_factory
self._broadcaster = broadcaster
self._host_name = socket.gethostname()
self._running = False
self._task = None
self._sample_interval = 1.0 / 50.0 # 50Hz (20ms)
self._report_interval = 1.0 / 10.0 # 10Hz (100ms)
self._last_report_time = 0.0
# Buffer for downsampling
self._buffer_cpu = []
self._buffer_mem = []
async def start(self):
"""启动监控循环(幂等)。"""
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._run_loop())
logger.info("ServerMonitorService started")
async def stop(self):
"""停止监控循环并等待任务结束。"""
self._running = False
if self._task:
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("ServerMonitorService stopped")
async def _run_loop(self):
"""采样循环50Hz 采样10Hz 报告。"""
loop = asyncio.get_running_loop()
next_time = loop.time()
while self._running:
# High frequency sampling (50Hz)
# psutil.cpu_percent(interval=None) is non-blocking
cpu_percent = psutil.cpu_percent(interval=None)
mem = psutil.virtual_memory()
self._buffer_cpu.append(cpu_percent)
self._buffer_mem.append(mem)
current_time = loop.time()
# Check if it's time to report (10Hz)
if current_time - self._last_report_time >= self._report_interval:
await self._process_and_report()
self._last_report_time = current_time
self._buffer_cpu.clear()
self._buffer_mem.clear()
# Precise timing control
next_time += self._sample_interval
sleep_time = next_time - loop.time()
if sleep_time > 0:
await asyncio.sleep(sleep_time)
else:
# If we are lagging, yield execution but don't sleep
await asyncio.sleep(0)
async def _process_and_report(self):
"""对采样缓冲区做下采样,并完成广播与落库。"""
if not self._buffer_cpu:
return
# Downsampling: Calculate average of buffered samples
avg_cpu = sum(self._buffer_cpu) / len(self._buffer_cpu)
# Take the latest memory reading (memory doesn't fluctuate as fast as CPU)
last_mem = self._buffer_mem[-1]
payload = {
"ts": datetime.now(timezone.utc).isoformat(),
"host_name": self._host_name,
"cpu_usage_percent": {
"total": round(avg_cpu, 2),
# Note: per-core usage is expensive to query at 50Hz, so we only track total here
# or we could sample per-core at lower frequency
},
"memory_usage_bytes": {
"total": last_mem.total,
"available": last_mem.available,
"used": last_mem.used,
"percent": last_mem.percent
},
"disk_usage_bytes": {} # Optional: disk usage changes slowly, maybe check every 1s
}
# 1. Broadcast via WebSocket (10Hz)
await self._broadcaster.broadcast_json({
"type": "server.metrics",
"payload": payload
})
# 2. Persist to Database (10Hz)
# Note: In production, consider batching inserts further (e.g., every 1s)
# to reduce DB load, but 10Hz single insert is manageable for TimescaleDB.
async with self._session_factory() as session:
try:
stmt = insert(server_metrics).values(
ts=datetime.fromisoformat(payload["ts"]),
host_name=payload["host_name"],
cpu_usage_percent=payload["cpu_usage_percent"],
memory_usage_bytes=payload["memory_usage_bytes"],
disk_usage_bytes=payload["disk_usage_bytes"]
)
await session.execute(stmt)
await session.commit()
except Exception as e:
logger.warning("Failed to persist server metrics: %s", e)
# Don't raise, keep monitoring running