"""Value objects for the Worker subdomain.
These are simple data structures representing hardware resources and execution tasks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from gyoza.models import Constraints
[docs]
@dataclass
class GPU:
"""Representation of a single GPU.
Parameters
----------
id : int
Numeric GPU identifier (0, 1, 2...).
vram_mb : int
VRAM in megabytes.
tags : list[str]
GPU tags (e.g., "cuda", "nvidia-a100", "tensor-cores").
"""
id: int
vram_mb: int
tags: list[str] = field(default_factory=list)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation."""
return {
"id": self.id,
"vram_mb": self.vram_mb,
"tags": self.tags,
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> GPU:
"""Create from dictionary representation."""
return cls(
id=data["id"],
vram_mb=data["vram_mb"],
tags=data.get("tags", []),
)
[docs]
@dataclass
class Resources:
"""Hardware resources available on a worker.
Parameters
----------
cpu_cores : int
Number of CPU cores.
ram_mb : int
RAM in megabytes.
gpus : list[GPU]
List of available GPUs.
"""
cpu_cores: int
ram_mb: int
gpus: list[GPU] = field(default_factory=list)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation."""
return {
"cpu_cores": self.cpu_cores,
"ram_mb": self.ram_mb,
"gpus": [gpu.to_dict() for gpu in self.gpus],
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Resources:
"""Create from dictionary representation."""
return cls(
cpu_cores=data["cpu_cores"],
ram_mb=data["ram_mb"],
gpus=[GPU.from_dict(g) for g in data.get("gpus", [])],
)
[docs]
@dataclass
class WorkerOpRun:
"""Simplified OpRun representation for worker execution.
Contains only the essential fields needed for a worker to execute a task.
Parameters
----------
id : str
Unique identifier for the run.
image : str
Docker image identifier.
inputs : dict[str, Any]
Input parameters for execution.
constraints : Constraints
Hardware requirements.
"""
id: str
image: str
inputs: dict[str, Any] = field(default_factory=dict)
constraints: Constraints = field(default_factory=Constraints)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation."""
return {
"id": self.id,
"image": self.image,
"inputs": self.inputs,
"constraints": self.constraints.to_dict(),
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> WorkerOpRun:
"""Create from dictionary representation."""
return cls(
id=data["id"],
image=data["image"],
inputs=data.get("inputs", {}),
constraints=Constraints.from_dict(data.get("constraints", {})),
)
[docs]
@classmethod
def from_op_run(cls, op_run) -> WorkerOpRun:
"""Create WorkerOpRun from OpRun.
Extracts only the essential fields needed for worker execution.
Parameters
----------
op_run : OpRun
The OpRun to convert.
Returns
-------
WorkerOpRun
Simplified OpRun representation for execution.
"""
return cls(
id=op_run.id,
image=op_run.image,
inputs=op_run.inputs,
constraints=op_run.constraints,
)
[docs]
@dataclass
class Heartbeat:
"""Heartbeat sent by a worker to register or update itself.
This is the payload workers send via REST to announce their presence.
Parameters
----------
worker_id : str
Worker name/identifier.
resources : Resources
Hardware resources available.
tags : list[str]
Worker-level tags.
running_ops : list[WorkerOpRun] | None
Running Op runs reported at heartbeat time.
"""
worker_id: str
resources: Resources
tags: list[str] = field(default_factory=list)
running_ops: list[WorkerOpRun] | None = None
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary representation."""
result = {
"worker_id": self.worker_id,
"resources": self.resources.to_dict(),
"tags": self.tags,
}
if self.running_ops is not None:
result["running_ops"] = [task.to_dict() for task in self.running_ops]
return result
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Heartbeat:
"""Create from dictionary representation."""
running_ops = data.get("running_ops")
if running_ops is not None:
running_ops = [WorkerOpRun.from_dict(item) for item in running_ops]
return cls(
worker_id=data["worker_id"],
resources=Resources.from_dict(data["resources"]),
tags=data.get("tags", []),
running_ops=running_ops,
)