Source code for gyoza.models.op_run

"""OpRun aggregate root — execution run lifecycle management."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4

from gyoza.models.op_attempt import OpAttempt
from gyoza.models.resources import (
    Constraints,
    EventDelivery,
    EventEntry,
    ExecutionSummary,
    OpRunState,
    RetryPolicy,
)


[docs] class OpRunLockedException(Exception): """Raised when trying to modify a terminal-state OpRun."""
[docs] @dataclass class OpRun: """Aggregate root for execution runs. Encapsulates all attempts internally. Users interact only with OpRun; the current attempt's data (progress, events, outputs, execution_summary) is exposed directly via properties. A new OpRun always starts with its first attempt. Additional attempts are created by ``RETRIED`` (including after :meth:`release_assignment`) or automatic retry after ``FAILED`` when policy allows. Parameters ---------- id : str Unique identifier for the run. state : OpRunState Current state of the run. priority : int Execution queue priority. image : str Docker image reference. inputs : dict[str, Any] Input parameters for execution. constraints : Constraints Hardware requirements. retry_policy : RetryPolicy Failure recovery rules. event_delivery : EventDelivery Event delivery configuration. op_definition : str | None Parent OpDefinition name (``None`` for ad-hoc runs). created_at : datetime When the run was created. updated_at : datetime Last state update timestamp. """ id: str state: OpRunState priority: int image: str inputs: dict[str, Any] = field(default_factory=dict) constraints: Constraints = field(default_factory=Constraints) retry_policy: RetryPolicy = field(default_factory=RetryPolicy) event_delivery: EventDelivery = field(default_factory=EventDelivery) op_definition: str | None = None created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = field(default_factory=lambda: datetime.now(UTC)) _attempts: list[OpAttempt] = field(default_factory=list, repr=False) _current_attempt_index: int = field(default=-1, repr=False) # ------------------------------------------------------------------ # Factory # ------------------------------------------------------------------
[docs] @classmethod def create( cls, image: str, priority: int = 0, inputs: dict[str, Any] | None = None, constraints: Constraints | None = None, retry_policy: RetryPolicy | None = None, event_delivery: EventDelivery | None = None, op_definition: str | None = None, ) -> OpRun: """Create a new OpRun in PENDING state with its first attempt. Parameters ---------- image : str Docker image reference. priority : int Scheduling priority (default 0). inputs : dict[str, Any] | None Input parameters. constraints : Constraints | None Hardware requirements. retry_policy : RetryPolicy | None Failure recovery rules. event_delivery : EventDelivery | None Event delivery configuration. op_definition : str | None Parent OpDefinition name. Returns ------- OpRun A new OpRun in PENDING state with its first attempt created. """ now = datetime.now(UTC) run = cls( id=f"run_{uuid4().hex[:12]}", state=OpRunState.PENDING, priority=priority, image=image, inputs=inputs or {}, constraints=constraints or Constraints(), retry_policy=retry_policy or RetryPolicy(), event_delivery=event_delivery or EventDelivery(), op_definition=op_definition, created_at=now, updated_at=now, _attempts=[], _current_attempt_index=-1, ) run._create_attempt() return run
# ------------------------------------------------------------------ # Properties — current attempt data exposed on the run # ------------------------------------------------------------------ @property def active_attempt(self) -> OpAttempt: """Current attempt (always present for a valid run).""" return self._attempts[self._current_attempt_index] @property def current_attempt(self) -> int: """Current attempt number (1-indexed).""" return self.active_attempt.attempt @property def progress(self) -> int: """Current progress percentage (0–100).""" return self.active_attempt.progress @property def events(self) -> list[EventEntry]: """Event timeline from the current attempt.""" return self.active_attempt.events @property def outputs(self) -> dict[str, Any]: """Outputs from the current attempt.""" return self.active_attempt.outputs @property def execution_summary(self) -> ExecutionSummary: """Execution metadata from the current attempt.""" return self.active_attempt.execution_summary @property def attempts(self) -> list[OpAttempt]: """All attempts in order.""" return self._attempts @property def is_locked(self) -> bool: """True when the run is in a terminal state.""" return self.state in ( OpRunState.COMPLETED, OpRunState.FAILED, OpRunState.CANCELLED, ) # ------------------------------------------------------------------ # State management # ------------------------------------------------------------------
[docs] def set_state(self, new_state: OpRunState) -> None: """Update the run state directly. Parameters ---------- new_state : OpRunState New state to apply. """ self.state = new_state self.updated_at = datetime.now(UTC)
[docs] def assign(self, worker_id: str, gpu_id: int | None = None) -> None: """Allocate this run to a worker (scheduler / internal). Emits an ``ASSIGNED`` event and transitions PENDING → ASSIGNED. Parameters ---------- worker_id : str Worker that will execute this run. gpu_id : int | None Selected GPU, if any. Raises ------ ValueError If the run is not PENDING. OpRunLockedException If the run is terminal. """ payload: dict[str, Any] = {"worker_id": worker_id} if gpu_id is not None: payload["gpu_id"] = gpu_id self.add_event("ASSIGNED", f"Assigned to {worker_id}", payload)
[docs] def release_assignment(self) -> None: """Reclaim a run whose worker is gone (reaper / internal). Validates ASSIGNED or RUNNING, then emits ``RETRIED`` with ``force`` so a fresh attempt is started without enforcing ``max_attempts``. Raises ------ ValueError If the run is not ASSIGNED or RUNNING. OpRunLockedException If the run is terminal. """ self._ensure_not_locked() if self.state not in (OpRunState.ASSIGNED, OpRunState.RUNNING): raise ValueError( f"Can only release ASSIGNED or RUNNING runs. " f"Current state: {self.state.value}" ) self.add_event( "RETRIED", "Assignment released (worker inactive)", {"force": True}, )
# ------------------------------------------------------------------ # Event handling # ------------------------------------------------------------------
[docs] def add_event( self, event_type: str, msg: str | int, payload: dict[str, Any] | None = None ) -> None: """Append an event to the current attempt and react to its type. Lifecycle transitions are implemented in :mod:`gyoza.models.op_run_events`. Notable types: - ``ASSIGNED``: PENDING → ASSIGNED (requires ``worker_id`` in payload) - ``STARTED``: ASSIGNED → RUNNING - ``PROGRESS``: updates progress (msg must be 0–100) - ``COMPLETED``: terminal COMPLETED - ``FAILED``: terminal FAILED; if retries remain, appends ``RETRIED`` and starts a new attempt (PENDING) - ``CANCELLED``: terminal CANCELLED - ``RETRIED``: records retry on the current attempt, and create a new attempt Parameters ---------- event_type : str Type of event (see :class:`~gyoza.models.resources.EventType`). msg : str | int Human-readable message or progress value. payload : dict[str, Any] | None Optional event-specific data. Raises ------ OpRunLockedException If the run is already in a terminal state (except for ``RETRIED``). ValueError If a required payload is missing or the event is invalid in the current state. """ from gyoza.models.op_run_events import apply_op_run_event apply_op_run_event(self, event_type, msg, payload)
# ------------------------------------------------------------------ # Retry policy # ------------------------------------------------------------------
[docs] def can_retry(self) -> bool: """Return True if another attempt is allowed. Returns ------- bool True when current attempt count is below max_attempts. """ return self.current_attempt < self.retry_policy.max_attempts
# ------------------------------------------------------------------ # Serialisation # ------------------------------------------------------------------
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Mixed view combining run-level and current-attempt fields. """ return { "id": self.id, "state": self.state.value, "priority": self.priority, "progress": self.progress, "current_attempt": self.current_attempt, "image": self.image, "inputs": self.inputs, "outputs": self.outputs, "events": [e.to_dict() for e in self.events], "execution_summary": self.execution_summary.to_dict(), "constraints": self.constraints.to_dict(), "retry_policy": self.retry_policy.to_dict(), "event_delivery": self.event_delivery.to_dict(), "op_definition": self.op_definition, "createdAt": self.created_at.isoformat(), "updatedAt": self.updated_at.isoformat(), }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> OpRun: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary as returned by the server API. Returns ------- OpRun Reconstructed instance. For full attempt history use the repository. """ created_at = data.get("createdAt") or data.get("created_at") updated_at = data.get("updatedAt") or data.get("updated_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) event_delivery = ( EventDelivery.from_dict(data["event_delivery"]) if data.get("event_delivery") else None ) run = cls( id=data["id"], state=OpRunState(data["state"]), priority=data.get("priority", 0), image=data["image"], inputs=data.get("inputs", {}), constraints=Constraints.from_dict(data.get("constraints", {})), retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})), event_delivery=event_delivery, op_definition=data.get("op_definition"), created_at=created_at or datetime.now(UTC), updated_at=updated_at or datetime.now(UTC), _attempts=[], _current_attempt_index=-1, ) current_attempt_num = data.get("current_attempt", 1) if current_attempt_num > 0: events = [EventEntry.from_dict(e) for e in data.get("events", [])] attempt = OpAttempt( id=f"{run.id}_att_{current_attempt_num}", op_run_id=run.id, attempt=current_attempt_num, state=OpRunState(data["state"]), progress=data.get("progress", 0), events=events, inputs=dict(run.inputs), outputs=data.get("outputs", {}), execution_summary=ExecutionSummary.from_dict( data.get("execution_summary", {}) ), constraints=run.constraints, started_at=None, finished_at=None, ) run._attempts.append(attempt) run._current_attempt_index = 0 if not run._attempts: run._create_attempt() return run
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _ensure_not_locked(self) -> None: if self.is_locked: raise OpRunLockedException( f"OpRun {self.id} is in terminal state {self.state.value} " "and cannot receive new events." ) def _create_attempt(self) -> None: attempt_num = len(self._attempts) + 1 now = datetime.now(UTC) self._attempts.append( OpAttempt( id=f"{self.id}_att_{attempt_num}", op_run_id=self.id, attempt=attempt_num, state=OpRunState.PENDING, progress=0, events=[], inputs=dict(self.inputs), outputs={}, execution_summary=ExecutionSummary(), constraints=self.constraints, started_at=now, finished_at=None, ) ) self._current_attempt_index = len(self._attempts) - 1