Source code for gyoza.server.worker.resources

"""Value objects for the Worker subdomain.

These are simple data structures representing hardware resources and execution tasks.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from gyoza.models import Constraints


[docs] @dataclass class GPU: """Representation of a single GPU. Parameters ---------- id : int Numeric GPU identifier (0, 1, 2...). vram_mb : int VRAM in megabytes. tags : list[str] GPU tags (e.g., "cuda", "nvidia-a100", "tensor-cores"). """ id: int vram_mb: int tags: list[str] = field(default_factory=list)
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "id": self.id, "vram_mb": self.vram_mb, "tags": self.tags, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> GPU: """Create from dictionary representation.""" return cls( id=data["id"], vram_mb=data["vram_mb"], tags=data.get("tags", []), )
[docs] @dataclass class Resources: """Hardware resources available on a worker. Parameters ---------- cpu_cores : int Number of CPU cores. ram_mb : int RAM in megabytes. gpus : list[GPU] List of available GPUs. """ cpu_cores: int ram_mb: int gpus: list[GPU] = field(default_factory=list)
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "cpu_cores": self.cpu_cores, "ram_mb": self.ram_mb, "gpus": [gpu.to_dict() for gpu in self.gpus], }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> Resources: """Create from dictionary representation.""" return cls( cpu_cores=data["cpu_cores"], ram_mb=data["ram_mb"], gpus=[GPU.from_dict(g) for g in data.get("gpus", [])], )
[docs] @dataclass class WorkerOpRun: """Simplified OpRun representation for worker execution. Contains only the essential fields needed for a worker to execute a task. Parameters ---------- id : str Unique identifier for the run. image : str Docker image identifier. inputs : dict[str, Any] Input parameters for execution. constraints : Constraints Hardware requirements. """ id: str image: str inputs: dict[str, Any] = field(default_factory=dict) constraints: Constraints = field(default_factory=Constraints)
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" return { "id": self.id, "image": self.image, "inputs": self.inputs, "constraints": self.constraints.to_dict(), }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> WorkerOpRun: """Create from dictionary representation.""" return cls( id=data["id"], image=data["image"], inputs=data.get("inputs", {}), constraints=Constraints.from_dict(data.get("constraints", {})), )
[docs] @classmethod def from_op_run(cls, op_run) -> WorkerOpRun: """Create WorkerOpRun from OpRun. Extracts only the essential fields needed for worker execution. Parameters ---------- op_run : OpRun The OpRun to convert. Returns ------- WorkerOpRun Simplified OpRun representation for execution. """ return cls( id=op_run.id, image=op_run.image, inputs=op_run.inputs, constraints=op_run.constraints, )
[docs] @dataclass class Heartbeat: """Heartbeat sent by a worker to register or update itself. This is the payload workers send via REST to announce their presence. Parameters ---------- worker_id : str Worker name/identifier. resources : Resources Hardware resources available. tags : list[str] Worker-level tags. running_ops : list[WorkerOpRun] | None Running Op runs reported at heartbeat time. """ worker_id: str resources: Resources tags: list[str] = field(default_factory=list) running_ops: list[WorkerOpRun] | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary representation.""" result = { "worker_id": self.worker_id, "resources": self.resources.to_dict(), "tags": self.tags, } if self.running_ops is not None: result["running_ops"] = [task.to_dict() for task in self.running_ops] return result
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> Heartbeat: """Create from dictionary representation.""" running_ops = data.get("running_ops") if running_ops is not None: running_ops = [WorkerOpRun.from_dict(item) for item in running_ops] return cls( worker_id=data["worker_id"], resources=Resources.from_dict(data["resources"]), tags=data.get("tags", []), running_ops=running_ops, )