Source code for gyoza.models.resources

"""Shared value objects for the gyoza domain.

Contains hardware requirements, policy types, op run states, event types,
and execution metadata — all pure data structures with no dependencies on
other model files, making them safe to import from anywhere.
"""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


[docs] @dataclass class Constraints: """Hardware requirements for execution. Parameters ---------- cpu : int | None Number of CPU cores required. ram_mb : int | None RAM in megabytes. vram_mb : int | None VRAM in megabytes. """ cpu: int | None = None ram_mb: int | None = None vram_mb: int | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the constraints. """ return {"cpu": self.cpu, "ram_mb": self.ram_mb, "vram_mb": self.vram_mb}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> Constraints: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with optional keys ``cpu``, ``ram_mb``, ``vram_mb``. Returns ------- Constraints Populated instance. """ return cls( cpu=data.get("cpu"), ram_mb=data.get("ram_mb"), vram_mb=data.get("vram_mb"), )
[docs] @dataclass class RetryPolicy: """Rules for failure recovery. Parameters ---------- max_attempts : int Maximum number of execution attempts (including the first). backoff_ms : int Milliseconds to wait between consecutive attempts. """ max_attempts: int = 1 backoff_ms: int = 0
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the retry policy. """ return {"max_attempts": self.max_attempts, "backoff_ms": self.backoff_ms}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> RetryPolicy: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with optional keys ``max_attempts`` and ``backoff_ms``. Returns ------- RetryPolicy Populated instance. """ return cls( max_attempts=data.get("max_attempts", 1), backoff_ms=data.get("backoff_ms", 0), )
[docs] @dataclass class EventDelivery: """Event delivery configuration for op runs. Parameters ---------- topic : str Topic name to publish events to. attributes : dict[str, Any] Additional metadata attached to each event. """ topic: str = "" attributes: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the event delivery config. """ return {"topic": self.topic, "attributes": self.attributes}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> EventDelivery: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with optional keys ``topic`` and ``attributes``. Returns ------- EventDelivery Populated instance. """ return cls( topic=data.get("topic", ""), attributes=data.get("attributes", {}), )
[docs] class OpRunState(str, Enum): """Lifecycle states for an op run.""" PENDING = "PENDING" ASSIGNED = "ASSIGNED" RUNNING = "RUNNING" COMPLETED = "COMPLETED" FAILED = "FAILED" CANCELLED = "CANCELLED"
[docs] class EventType(str, Enum): """Event types emitted during an op run.""" STARTED = "STARTED" INFO = "INFO" PROGRESS = "PROGRESS" COMPLETED = "COMPLETED" FAILED = "FAILED" CANCELLED = "CANCELLED" ERROR = "ERROR"
[docs] @dataclass class EventEntry: """A single event in the op run timeline. Parameters ---------- t : datetime Timestamp when the event occurred. type : str Event type string (see :class:`EventType`). msg : str | int Human-readable message or progress value (0–100 for PROGRESS events). payload : dict[str, Any] Optional event-specific data. """ t: datetime type: str msg: str | int payload: dict[str, Any] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the event entry. """ result: dict[str, Any] = { "t": self.t.isoformat() if isinstance(self.t, datetime) else self.t, "type": self.type, "msg": self.msg, } if self.payload: result["payload"] = self.payload return result
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> EventEntry: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with keys ``t``, ``type``, ``msg``, optional ``payload``. Returns ------- EventEntry Populated instance. """ t = data["t"] if isinstance(t, str): t = datetime.fromisoformat(t.replace("Z", "+00:00")) return cls( t=t, type=data["type"], msg=data["msg"], payload=data.get("payload", {}), )
[docs] @dataclass class ExecutionSummary: """High-level execution metadata attached to a completed op run. Parameters ---------- duration_ms : int | None Total execution time in milliseconds. worker_id : str | None Identifier of the worker that ran the operation. error_message : str | None Error description when the run failed. gpu_id : int | None GPU identifier used during execution, if any. """ duration_ms: int | None = None worker_id: str | None = None error_message: str | None = None gpu_id: int | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the execution summary. """ return { "duration_ms": self.duration_ms, "worker_id": self.worker_id, "error_message": self.error_message, "gpu_id": self.gpu_id, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> ExecutionSummary: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with optional keys matching the field names. Returns ------- ExecutionSummary Populated instance. """ return cls( duration_ms=data.get("duration_ms"), worker_id=data.get("worker_id"), error_message=data.get("error_message"), gpu_id=data.get("gpu_id"), )