Source code for gyoza.server.worker.worker

"""Worker entity.

Workers are ephemeral resources that execute tasks and send heartbeats.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any

from gyoza.server.worker.resources import Resources, WorkerOpRun


[docs] @dataclass class Worker: """Representation of a worker that can execute tasks. Workers are identified by their id (name) and register themselves via heartbeats containing their resources and tags. Parameters ---------- id : str Worker name/identifier. resources : Resources Hardware resources available. tags : list[str] Worker-level tags. running_ops : list[WorkerOpRun] Current running Op runs on the worker. created_at : datetime First heartbeat time. last_heartbeat_at : datetime Most recent heartbeat time. """ id: str resources: Resources tags: list[str] = field(default_factory=list) running_ops: list[WorkerOpRun] = field(default_factory=list) created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) last_heartbeat_at: datetime = field(default_factory=lambda: datetime.now(UTC))
[docs] def is_active(self, timeout_seconds: int = 30) -> bool: """Check if worker is active (heartbeat within timeout). Parameters ---------- timeout_seconds : int Maximum seconds since last heartbeat to be considered active. Returns ------- bool True if last heartbeat is within timeout. """ now = datetime.now(UTC) elapsed = (now - self.last_heartbeat_at).total_seconds() return elapsed <= timeout_seconds
[docs] def update_heartbeat( self, resources: Resources, tags: list[str], running_ops: list[WorkerOpRun] | None = None, ) -> None: """Update worker with new heartbeat data. Parameters ---------- resources : Resources Updated hardware resources. tags : list[str] Updated worker tags. running_ops : list[WorkerOpRun] | None Running Op runs, if provided by the heartbeat. """ self.resources = resources self.tags = tags if running_ops is not None: self.running_ops = running_ops self.last_heartbeat_at = datetime.now(UTC)
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "id": self.id, "resources": self.resources.to_dict(), "tags": self.tags, "running_ops": [task.to_dict() for task in self.running_ops], "created_at": self.created_at.isoformat(), "last_heartbeat_at": self.last_heartbeat_at.isoformat(), }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> Worker: """Create from dictionary representation.""" created_at = data.get("created_at") last_heartbeat_at = data.get("last_heartbeat_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) if isinstance(last_heartbeat_at, str): last_heartbeat_at = datetime.fromisoformat( last_heartbeat_at.replace("Z", "+00:00") ) running_ops_data = data.get("running_ops") or [] return cls( id=data["id"], resources=Resources.from_dict(data["resources"]), tags=data.get("tags", []), running_ops=[WorkerOpRun.from_dict(item) for item in running_ops_data], created_at=created_at or datetime.now(UTC), last_heartbeat_at=last_heartbeat_at or datetime.now(UTC), )