emcp/backend/app/tasks/polling.py

23 lines
551 B
Python
Raw Normal View History

2026-05-18 09:12:14 +08:00
from __future__ import annotations
import asyncio
import contextlib
from typing import Optional
from app.core.config import settings
from app.services.platform_service import platform_service
async def device_polling_loop() -> None:
while True:
await platform_service.poll_device_once()
await asyncio.sleep(settings.poll_interval_seconds)
async def stop_task(task: Optional[asyncio.Task]) -> None:
if task is None:
return
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task