"""Shared value objects for the gyoza domain.
Contains hardware requirements, policy types, op run states, event types,
and execution metadata — all pure data structures with no dependencies on
other model files, making them safe to import from anywhere.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any
[docs]
@dataclass
class Constraints:
"""Hardware requirements for execution.
Parameters
----------
cpu : int | None
Number of CPU cores required.
ram_mb : int | None
RAM in megabytes.
vram_mb : int | None
VRAM in megabytes.
"""
cpu: int | None = None
ram_mb: int | None = None
vram_mb: int | None = None
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the constraints.
"""
return {"cpu": self.cpu, "ram_mb": self.ram_mb, "vram_mb": self.vram_mb}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Constraints:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with optional keys ``cpu``, ``ram_mb``, ``vram_mb``.
Returns
-------
Constraints
Populated instance.
"""
return cls(
cpu=data.get("cpu"),
ram_mb=data.get("ram_mb"),
vram_mb=data.get("vram_mb"),
)
[docs]
@dataclass
class RetryPolicy:
"""Rules for failure recovery.
Parameters
----------
max_attempts : int
Maximum number of execution attempts (including the first).
backoff_ms : int
Milliseconds to wait between consecutive attempts.
"""
max_attempts: int = 1
backoff_ms: int = 0
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the retry policy.
"""
return {"max_attempts": self.max_attempts, "backoff_ms": self.backoff_ms}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> RetryPolicy:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with optional keys ``max_attempts`` and ``backoff_ms``.
Returns
-------
RetryPolicy
Populated instance.
"""
return cls(
max_attempts=data.get("max_attempts", 1),
backoff_ms=data.get("backoff_ms", 0),
)
[docs]
@dataclass
class EventDelivery:
"""Event delivery configuration for op runs.
Parameters
----------
topic : str
Topic name to publish events to.
attributes : dict[str, Any]
Additional metadata attached to each event.
"""
topic: str = ""
attributes: dict[str, Any] = field(default_factory=dict)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the event delivery config.
"""
return {"topic": self.topic, "attributes": self.attributes}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EventDelivery:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with optional keys ``topic`` and ``attributes``.
Returns
-------
EventDelivery
Populated instance.
"""
return cls(
topic=data.get("topic", ""),
attributes=data.get("attributes", {}),
)
[docs]
class OpRunState(str, Enum):
"""Lifecycle states for an op run."""
PENDING = "PENDING"
ASSIGNED = "ASSIGNED"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
[docs]
class EventType(str, Enum):
"""Event types emitted during an op run."""
STARTED = "STARTED"
INFO = "INFO"
PROGRESS = "PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
ERROR = "ERROR"
[docs]
@dataclass
class EventEntry:
"""A single event in the op run timeline.
Parameters
----------
t : datetime
Timestamp when the event occurred.
type : str
Event type string (see :class:`EventType`).
msg : str | int
Human-readable message or progress value (0–100 for PROGRESS events).
payload : dict[str, Any]
Optional event-specific data.
"""
t: datetime
type: str
msg: str | int
payload: dict[str, Any] = field(default_factory=dict)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the event entry.
"""
result: dict[str, Any] = {
"t": self.t.isoformat() if isinstance(self.t, datetime) else self.t,
"type": self.type,
"msg": self.msg,
}
if self.payload:
result["payload"] = self.payload
return result
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> EventEntry:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with keys ``t``, ``type``, ``msg``, optional ``payload``.
Returns
-------
EventEntry
Populated instance.
"""
t = data["t"]
if isinstance(t, str):
t = datetime.fromisoformat(t.replace("Z", "+00:00"))
return cls(
t=t,
type=data["type"],
msg=data["msg"],
payload=data.get("payload", {}),
)
[docs]
@dataclass
class ExecutionSummary:
"""High-level execution metadata attached to a completed op run.
Parameters
----------
duration_ms : int | None
Total execution time in milliseconds.
worker_id : str | None
Identifier of the worker that ran the operation.
error_message : str | None
Error description when the run failed.
gpu_id : int | None
GPU identifier used during execution, if any.
"""
duration_ms: int | None = None
worker_id: str | None = None
error_message: str | None = None
gpu_id: int | None = None
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the execution summary.
"""
return {
"duration_ms": self.duration_ms,
"worker_id": self.worker_id,
"error_message": self.error_message,
"gpu_id": self.gpu_id,
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> ExecutionSummary:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with optional keys matching the field names.
Returns
-------
ExecutionSummary
Populated instance.
"""
return cls(
duration_ms=data.get("duration_ms"),
worker_id=data.get("worker_id"),
error_message=data.get("error_message"),
gpu_id=data.get("gpu_id"),
)