23 lines
551 B
Python
23 lines
551 B
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
from typing import Optional
|
|
|
|
from app.core.config import settings
|
|
from app.services.platform_service import platform_service
|
|
|
|
|
|
async def device_polling_loop() -> None:
|
|
while True:
|
|
await platform_service.poll_device_once()
|
|
await asyncio.sleep(settings.poll_interval_seconds)
|
|
|
|
|
|
async def stop_task(task: Optional[asyncio.Task]) -> None:
|
|
if task is None:
|
|
return
|
|
task.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await task
|