Source code for gyoza.server.worker.pool
"""WorkerPool for in-memory worker management.
Workers register via heartbeats and are stored in memory.
No database persistence since workers are ephemeral.
"""
from datetime import UTC, datetime
from gyoza.server.worker.resources import Heartbeat
from gyoza.server.worker.worker import Worker
[docs]
class WorkerPool:
"""In-memory pool of workers.
Workers register themselves via heartbeats containing their full
configuration. Each heartbeat creates or updates the worker entry.
"""
def __init__(self) -> None:
self._workers: dict[str, Worker] = {}
[docs]
def heartbeat(self, hb: Heartbeat) -> Worker:
"""Register or update a worker via heartbeat.
If worker exists, updates resources, tags and last_heartbeat_at.
If worker is new, creates it with current timestamp.
Parameters
----------
hb : Heartbeat
Heartbeat payload from the worker.
Returns
-------
Worker
The created or updated worker.
"""
if hb.worker_id in self._workers:
worker = self._workers[hb.worker_id]
worker.update_heartbeat(hb.resources, hb.tags, hb.running_ops)
else:
now = datetime.now(UTC)
worker = Worker(
id=hb.worker_id,
resources=hb.resources,
tags=hb.tags,
running_ops=hb.running_ops or [],
created_at=now,
last_heartbeat_at=now,
)
self._workers[hb.worker_id] = worker
return worker
[docs]
def get(self, worker_id: str) -> Worker | None:
"""Get a worker by id.
Parameters
----------
worker_id : str
Worker name/identifier.
Returns
-------
Worker | None
The worker if found, None otherwise.
"""
return self._workers.get(worker_id)
[docs]
def list_active(self, timeout_seconds: int = 60) -> list[Worker]:
"""List workers with recent heartbeats.
Parameters
----------
timeout_seconds : int
Maximum seconds since last heartbeat to be considered active.
Returns
-------
list[Worker]
Workers with heartbeat within timeout.
"""
return [
worker
for worker in self._workers.values()
if worker.is_active(timeout_seconds)
]
[docs]
def list_all(self) -> list[Worker]:
"""List all workers regardless of status.
Returns
-------
list[Worker]
All registered workers.
"""
return list(self._workers.values())
[docs]
def remove(self, worker_id: str) -> Worker | None:
"""Remove a worker from the pool.
Parameters
----------
worker_id : str
Worker name/identifier.
Returns
-------
Worker | None
The removed worker if found, None otherwise.
"""
return self._workers.pop(worker_id, None)
[docs]
def clear(self) -> None:
"""Remove all workers from the pool."""
self._workers.clear()