Source code for gyoza.models.op_run

"""OpRun aggregate root — execution run lifecycle management."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4

from gyoza.models.op_attempt import OpAttempt
from gyoza.models.resources import (
    Constraints,
    EventDelivery,
    EventEntry,
    ExecutionSummary,
    OpRunState,
    RetryPolicy,
)


[docs] class OpRunLockedException(Exception): """Raised when trying to modify a terminal-state OpRun."""
[docs] @dataclass class OpRun: """Aggregate root for execution runs. Encapsulates all attempts internally. Users interact only with OpRun; the current attempt's data (progress, events, outputs, execution_summary) is exposed directly via properties. A new OpRun always starts with its first attempt. Additional attempts are only created via :meth:`retry`. Parameters ---------- id : str Unique identifier for the run. state : OpRunState Current state of the run. priority : int Execution queue priority. image : str Docker image reference. inputs : dict[str, Any] Input parameters for execution. constraints : Constraints Hardware requirements. retry_policy : RetryPolicy Failure recovery rules. event_delivery : EventDelivery Event delivery configuration. op_definition : str | None Parent OpDefinition name (``None`` for ad-hoc runs). created_at : datetime When the run was created. updated_at : datetime Last state update timestamp. """ id: str state: OpRunState priority: int image: str inputs: dict[str, Any] = field(default_factory=dict) constraints: Constraints = field(default_factory=Constraints) retry_policy: RetryPolicy = field(default_factory=RetryPolicy) event_delivery: EventDelivery = field(default_factory=EventDelivery) op_definition: str | None = None created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = field(default_factory=lambda: datetime.now(UTC)) _attempts: list[OpAttempt] = field(default_factory=list, repr=False) _current_attempt_index: int = field(default=-1, repr=False) # ------------------------------------------------------------------ # Factory # ------------------------------------------------------------------
[docs] @classmethod def create( cls, image: str, priority: int = 0, inputs: dict[str, Any] | None = None, constraints: Constraints | None = None, retry_policy: RetryPolicy | None = None, event_delivery: EventDelivery | None = None, op_definition: str | None = None, ) -> OpRun: """Create a new OpRun in PENDING state with its first attempt. Parameters ---------- image : str Docker image reference. priority : int Scheduling priority (default 0). inputs : dict[str, Any] | None Input parameters. constraints : Constraints | None Hardware requirements. retry_policy : RetryPolicy | None Failure recovery rules. event_delivery : EventDelivery | None Event delivery configuration. op_definition : str | None Parent OpDefinition name. Returns ------- OpRun A new OpRun in PENDING state with its first attempt created. """ now = datetime.now(UTC) run = cls( id=f"run_{uuid4().hex[:12]}", state=OpRunState.PENDING, priority=priority, image=image, inputs=inputs or {}, constraints=constraints or Constraints(), retry_policy=retry_policy or RetryPolicy(), event_delivery=event_delivery or EventDelivery(), op_definition=op_definition, created_at=now, updated_at=now, _attempts=[], _current_attempt_index=-1, ) run._create_attempt() return run
# ------------------------------------------------------------------ # Properties — current attempt data exposed on the run # ------------------------------------------------------------------ @property def current_attempt(self) -> int: """Current attempt number (1-indexed).""" if self._current_attempt_index < 0: return 0 return self._attempts[self._current_attempt_index].attempt @property def progress(self) -> int: """Current progress percentage (0–100).""" if self._current_attempt_index < 0: return 0 return self._attempts[self._current_attempt_index].progress @property def events(self) -> list[EventEntry]: """Event timeline from the current attempt.""" if self._current_attempt_index < 0: return [] return self._attempts[self._current_attempt_index].events @property def outputs(self) -> dict[str, Any]: """Outputs from the current attempt.""" if self._current_attempt_index < 0: return {} return self._attempts[self._current_attempt_index].outputs @property def execution_summary(self) -> ExecutionSummary: """Execution metadata from the current attempt.""" if self._current_attempt_index < 0: return ExecutionSummary() return self._attempts[self._current_attempt_index].execution_summary @property def attempts(self) -> list[OpAttempt]: """All attempts in order.""" return self._attempts @property def is_locked(self) -> bool: """True when the run is in a terminal state.""" return self.state in ( OpRunState.COMPLETED, OpRunState.FAILED, OpRunState.CANCELLED, ) # ------------------------------------------------------------------ # State management # ------------------------------------------------------------------
[docs] def set_state(self, new_state: OpRunState) -> None: """Update the run state directly. Parameters ---------- new_state : OpRunState New state to apply. """ self.state = new_state self.updated_at = datetime.now(UTC)
[docs] def assign(self, worker_id: str) -> None: """Transition the run from PENDING to ASSIGNED. Parameters ---------- worker_id : str ID of the worker that will execute this run. Raises ------ ValueError If the run is not in PENDING state. OpRunLockedException If the run is in a terminal state. """ self._ensure_not_locked() if self.state != OpRunState.PENDING: raise ValueError( f"Can only assign PENDING runs. Current state: {self.state.value}" ) self.state = OpRunState.ASSIGNED self.updated_at = datetime.now(UTC) attempt = self._attempts[self._current_attempt_index] attempt.state = OpRunState.ASSIGNED attempt.execution_summary.worker_id = worker_id
[docs] def set_priority(self, priority: int) -> None: """Update the scheduling priority. Parameters ---------- priority : int New priority value (must be ≥ 0). Raises ------ ValueError If priority is negative. """ if priority < 0: raise ValueError("Priority must be >= 0") self.priority = priority self.updated_at = datetime.now(UTC)
[docs] def set_outputs(self, outputs: dict[str, Any]) -> None: """Set outputs on the current attempt. Parameters ---------- outputs : dict[str, Any] Output values to attach. """ if self._current_attempt_index < 0: raise ValueError("No current attempt exists") self._attempts[self._current_attempt_index].outputs = dict(outputs) self.updated_at = datetime.now(UTC)
[docs] def set_execution_summary( self, duration_ms: int | None = None, worker_id: str | None = None, error_message: str | None = None, gpu_id: int | None = None, ) -> None: """Update execution metadata on the current attempt. Parameters ---------- duration_ms : int | None Execution duration in milliseconds. worker_id : str | None ID of the executing worker. error_message : str | None Error description on failure. gpu_id : int | None GPU identifier used. """ if self._current_attempt_index < 0: raise ValueError("No current attempt exists") summary = self._attempts[self._current_attempt_index].execution_summary if duration_ms is not None: summary.duration_ms = duration_ms if worker_id is not None: summary.worker_id = worker_id if error_message is not None: summary.error_message = error_message if gpu_id is not None: summary.gpu_id = gpu_id self.updated_at = datetime.now(UTC)
# ------------------------------------------------------------------ # Event handling # ------------------------------------------------------------------
[docs] def add_event( self, event_type: str, msg: str | int, payload: dict[str, Any] | None = None ) -> None: """Append an event to the current attempt and react to its type. Special event types trigger state transitions: - ``STARTED``: requires ASSIGNED state → transitions to RUNNING - ``PROGRESS``: updates progress percentage (msg must be 0–100) - ``COMPLETED``: locks run, sets state to COMPLETED - ``FAILED``: locks run, sets state to FAILED - ``CANCELLED``: locks run, sets state to CANCELLED - Other (``INFO``, ``ERROR``): recorded without state change Parameters ---------- event_type : str Type of event (see :class:`~gyoza.models.resources.EventType`). msg : str | int Human-readable message or progress value. payload : dict[str, Any] | None Optional event-specific data. Raises ------ OpRunLockedException If the run is already in a terminal state. ValueError If a required payload is missing or the event is invalid in the current state. """ self._ensure_not_locked() if self._current_attempt_index < 0: raise ValueError("No current attempt exists") attempt = self._attempts[self._current_attempt_index] now = datetime.now(UTC) payload = payload or {} attempt.events.append( EventEntry(t=now, type=event_type, msg=msg, payload=payload) ) self.updated_at = now if event_type == "STARTED": if self.state != OpRunState.ASSIGNED: raise ValueError( f"STARTED event requires run to be in ASSIGNED state. " f"Current state: {self.state.value}" ) self.state = OpRunState.RUNNING attempt.state = OpRunState.RUNNING if "worker_id" in payload: attempt.execution_summary.worker_id = payload["worker_id"] if "gpu_id" in payload: attempt.execution_summary.gpu_id = payload["gpu_id"] elif event_type == "PROGRESS": if not isinstance(msg, int) or not 0 <= msg <= 100: raise ValueError("PROGRESS event msg must be int between 0 and 100") attempt.progress = msg elif event_type == "COMPLETED": attempt.state = OpRunState.COMPLETED attempt.finished_at = now self.state = OpRunState.COMPLETED if "outputs" in payload: attempt.outputs = dict(payload["outputs"]) elif event_type == "FAILED": attempt.state = OpRunState.FAILED attempt.finished_at = now self.state = OpRunState.FAILED if "error_message" in payload: attempt.execution_summary.error_message = payload["error_message"] elif event_type == "CANCELLED": attempt.state = OpRunState.CANCELLED attempt.finished_at = now self.state = OpRunState.CANCELLED
# ------------------------------------------------------------------ # Retry # ------------------------------------------------------------------
[docs] def can_retry(self) -> bool: """Return True if another attempt is allowed. Returns ------- bool True when current attempt count is below max_attempts. """ return self.current_attempt < self.retry_policy.max_attempts
[docs] def retry(self, force: bool = False) -> None: """Create a new attempt and reset the run to PENDING. Parameters ---------- force : bool Skip the max_attempts check when True. Raises ------ ValueError If max attempts are exhausted and *force* is False. """ if not force and not self.can_retry(): raise ValueError(f"Max attempts ({self.retry_policy.max_attempts}) reached") self._create_attempt() self.state = OpRunState.PENDING self.updated_at = datetime.now(UTC)
# ------------------------------------------------------------------ # Serialisation # ------------------------------------------------------------------
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Mixed view combining run-level and current-attempt fields. """ return { "id": self.id, "state": self.state.value, "priority": self.priority, "progress": self.progress, "current_attempt": self.current_attempt, "image": self.image, "inputs": self.inputs, "outputs": self.outputs, "events": [e.to_dict() for e in self.events], "execution_summary": self.execution_summary.to_dict(), "constraints": self.constraints.to_dict(), "retry_policy": self.retry_policy.to_dict(), "event_delivery": self.event_delivery.to_dict(), "op_definition": self.op_definition, "createdAt": self.created_at.isoformat(), "updatedAt": self.updated_at.isoformat(), }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> OpRun: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary as returned by the server API. Returns ------- OpRun Reconstructed instance. For full attempt history use the repository. """ created_at = data.get("createdAt") or data.get("created_at") updated_at = data.get("updatedAt") or data.get("updated_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) event_delivery = ( EventDelivery.from_dict(data["event_delivery"]) if data.get("event_delivery") else None ) run = cls( id=data["id"], state=OpRunState(data["state"]), priority=data.get("priority", 0), image=data["image"], inputs=data.get("inputs", {}), constraints=Constraints.from_dict(data.get("constraints", {})), retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})), event_delivery=event_delivery, op_definition=data.get("op_definition"), created_at=created_at or datetime.now(UTC), updated_at=updated_at or datetime.now(UTC), _attempts=[], _current_attempt_index=-1, ) current_attempt_num = data.get("current_attempt", 1) if current_attempt_num > 0: events = [EventEntry.from_dict(e) for e in data.get("events", [])] attempt = OpAttempt( id=f"{run.id}_att_{current_attempt_num}", op_run_id=run.id, attempt=current_attempt_num, state=OpRunState(data["state"]), progress=data.get("progress", 0), events=events, inputs=dict(run.inputs), outputs=data.get("outputs", {}), execution_summary=ExecutionSummary.from_dict( data.get("execution_summary", {}) ), constraints=run.constraints, started_at=None, finished_at=None, ) run._attempts.append(attempt) run._current_attempt_index = 0 return run
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _ensure_not_locked(self) -> None: if self.is_locked: raise OpRunLockedException( f"OpRun {self.id} is in terminal state {self.state.value} " "and cannot receive new events." ) def _create_attempt(self) -> None: attempt_num = len(self._attempts) + 1 now = datetime.now(UTC) self._attempts.append( OpAttempt( id=f"{self.id}_att_{attempt_num}", op_run_id=self.id, attempt=attempt_num, state=OpRunState.PENDING, progress=0, events=[], inputs=dict(self.inputs), outputs={}, execution_summary=ExecutionSummary(), constraints=self.constraints, started_at=now, finished_at=None, ) ) self._current_attempt_index = len(self._attempts) - 1