Source code for gyoza.server.worker.worker
"""Worker entity.
Workers are ephemeral resources that execute tasks and send heartbeats.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from gyoza.server.worker.resources import Resources, WorkerOpRun
[docs]
@dataclass
class Worker:
"""Representation of a worker that can execute tasks.
Workers are identified by their id (name) and register themselves
via heartbeats containing their resources and tags.
Parameters
----------
id : str
Worker name/identifier.
resources : Resources
Hardware resources available.
tags : list[str]
Worker-level tags.
running_ops : list[WorkerOpRun]
Current running Op runs on the worker.
created_at : datetime
First heartbeat time.
last_heartbeat_at : datetime
Most recent heartbeat time.
"""
id: str
resources: Resources
tags: list[str] = field(default_factory=list)
running_ops: list[WorkerOpRun] = field(default_factory=list)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
last_heartbeat_at: datetime = field(default_factory=lambda: datetime.now(UTC))
[docs]
def is_active(self, timeout_seconds: int = 30) -> bool:
"""Check if worker is active (heartbeat within timeout).
Parameters
----------
timeout_seconds : int
Maximum seconds since last heartbeat to be considered active.
Returns
-------
bool
True if last heartbeat is within timeout.
"""
now = datetime.now(UTC)
elapsed = (now - self.last_heartbeat_at).total_seconds()
return elapsed <= timeout_seconds
[docs]
def update_heartbeat(
self,
resources: Resources,
tags: list[str],
running_ops: list[WorkerOpRun] | None = None,
) -> None:
"""Update worker with new heartbeat data.
Parameters
----------
resources : Resources
Updated hardware resources.
tags : list[str]
Updated worker tags.
running_ops : list[WorkerOpRun] | None
Running Op runs, if provided by the heartbeat.
"""
self.resources = resources
self.tags = tags
if running_ops is not None:
self.running_ops = running_ops
self.last_heartbeat_at = datetime.now(UTC)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation."""
return {
"id": self.id,
"resources": self.resources.to_dict(),
"tags": self.tags,
"running_ops": [task.to_dict() for task in self.running_ops],
"created_at": self.created_at.isoformat(),
"last_heartbeat_at": self.last_heartbeat_at.isoformat(),
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Worker:
"""Create from dictionary representation."""
created_at = data.get("created_at")
last_heartbeat_at = data.get("last_heartbeat_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if isinstance(last_heartbeat_at, str):
last_heartbeat_at = datetime.fromisoformat(
last_heartbeat_at.replace("Z", "+00:00")
)
running_ops_data = data.get("running_ops") or []
return cls(
id=data["id"],
resources=Resources.from_dict(data["resources"]),
tags=data.get("tags", []),
running_ops=[WorkerOpRun.from_dict(item) for item in running_ops_data],
created_at=created_at or datetime.now(UTC),
last_heartbeat_at=last_heartbeat_at or datetime.now(UTC),
)