Source code for gyoza.server.worker.pool

"""WorkerPool for in-memory worker management.

Workers register via heartbeats and are stored in memory.
No database persistence since workers are ephemeral.
"""

from datetime import UTC, datetime

from gyoza.server.worker.resources import Heartbeat
from gyoza.server.worker.worker import Worker


[docs] class WorkerPool: """In-memory pool of workers. Workers register themselves via heartbeats containing their full configuration. Each heartbeat creates or updates the worker entry. """ def __init__(self) -> None: self._workers: dict[str, Worker] = {}
[docs] def heartbeat(self, hb: Heartbeat) -> Worker: """Register or update a worker via heartbeat. If worker exists, updates resources, tags and last_heartbeat_at. If worker is new, creates it with current timestamp. Parameters ---------- hb : Heartbeat Heartbeat payload from the worker. Returns ------- Worker The created or updated worker. """ if hb.worker_id in self._workers: worker = self._workers[hb.worker_id] worker.update_heartbeat(hb.resources, hb.tags, hb.running_ops) else: now = datetime.now(UTC) worker = Worker( id=hb.worker_id, resources=hb.resources, tags=hb.tags, running_ops=hb.running_ops or [], created_at=now, last_heartbeat_at=now, ) self._workers[hb.worker_id] = worker return worker
[docs] def get(self, worker_id: str) -> Worker | None: """Get a worker by id. Parameters ---------- worker_id : str Worker name/identifier. Returns ------- Worker | None The worker if found, None otherwise. """ return self._workers.get(worker_id)
[docs] def list_active(self, timeout_seconds: int = 60) -> list[Worker]: """List workers with recent heartbeats. Parameters ---------- timeout_seconds : int Maximum seconds since last heartbeat to be considered active. Returns ------- list[Worker] Workers with heartbeat within timeout. """ return [ worker for worker in self._workers.values() if worker.is_active(timeout_seconds) ]
[docs] def list_all(self) -> list[Worker]: """List all workers regardless of status. Returns ------- list[Worker] All registered workers. """ return list(self._workers.values())
[docs] def remove(self, worker_id: str) -> Worker | None: """Remove a worker from the pool. Parameters ---------- worker_id : str Worker name/identifier. Returns ------- Worker | None The removed worker if found, None otherwise. """ return self._workers.pop(worker_id, None)
[docs] def clear(self) -> None: """Remove all workers from the pool.""" self._workers.clear()