"""OpRun aggregate root — execution run lifecycle management."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4
from gyoza.models.op_attempt import OpAttempt
from gyoza.models.resources import (
Constraints,
EventDelivery,
EventEntry,
ExecutionSummary,
OpRunState,
RetryPolicy,
)
[docs]
class OpRunLockedException(Exception):
"""Raised when trying to modify a terminal-state OpRun."""
[docs]
@dataclass
class OpRun:
"""Aggregate root for execution runs.
Encapsulates all attempts internally. Users interact only with OpRun;
the current attempt's data (progress, events, outputs, execution_summary)
is exposed directly via properties.
A new OpRun always starts with its first attempt. Additional attempts
are created by ``RETRIED`` (including after :meth:`release_assignment`) or
automatic retry after ``FAILED`` when policy allows.
Parameters
----------
id : str
Unique identifier for the run.
state : OpRunState
Current state of the run.
priority : int
Execution queue priority.
image : str
Docker image reference.
inputs : dict[str, Any]
Input parameters for execution.
constraints : Constraints
Hardware requirements.
retry_policy : RetryPolicy
Failure recovery rules.
event_delivery : EventDelivery
Event delivery configuration.
op_definition : str | None
Parent OpDefinition name (``None`` for ad-hoc runs).
created_at : datetime
When the run was created.
updated_at : datetime
Last state update timestamp.
"""
id: str
state: OpRunState
priority: int
image: str
inputs: dict[str, Any] = field(default_factory=dict)
constraints: Constraints = field(default_factory=Constraints)
retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
event_delivery: EventDelivery = field(default_factory=EventDelivery)
op_definition: str | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
_attempts: list[OpAttempt] = field(default_factory=list, repr=False)
_current_attempt_index: int = field(default=-1, repr=False)
# ------------------------------------------------------------------
# Factory
# ------------------------------------------------------------------
[docs]
@classmethod
def create(
cls,
image: str,
priority: int = 0,
inputs: dict[str, Any] | None = None,
constraints: Constraints | None = None,
retry_policy: RetryPolicy | None = None,
event_delivery: EventDelivery | None = None,
op_definition: str | None = None,
) -> OpRun:
"""Create a new OpRun in PENDING state with its first attempt.
Parameters
----------
image : str
Docker image reference.
priority : int
Scheduling priority (default 0).
inputs : dict[str, Any] | None
Input parameters.
constraints : Constraints | None
Hardware requirements.
retry_policy : RetryPolicy | None
Failure recovery rules.
event_delivery : EventDelivery | None
Event delivery configuration.
op_definition : str | None
Parent OpDefinition name.
Returns
-------
OpRun
A new OpRun in PENDING state with its first attempt created.
"""
now = datetime.now(UTC)
run = cls(
id=f"run_{uuid4().hex[:12]}",
state=OpRunState.PENDING,
priority=priority,
image=image,
inputs=inputs or {},
constraints=constraints or Constraints(),
retry_policy=retry_policy or RetryPolicy(),
event_delivery=event_delivery or EventDelivery(),
op_definition=op_definition,
created_at=now,
updated_at=now,
_attempts=[],
_current_attempt_index=-1,
)
run._create_attempt()
return run
# ------------------------------------------------------------------
# Properties — current attempt data exposed on the run
# ------------------------------------------------------------------
@property
def active_attempt(self) -> OpAttempt:
"""Current attempt (always present for a valid run)."""
return self._attempts[self._current_attempt_index]
@property
def current_attempt(self) -> int:
"""Current attempt number (1-indexed)."""
return self.active_attempt.attempt
@property
def progress(self) -> int:
"""Current progress percentage (0–100)."""
return self.active_attempt.progress
@property
def events(self) -> list[EventEntry]:
"""Event timeline from the current attempt."""
return self.active_attempt.events
@property
def outputs(self) -> dict[str, Any]:
"""Outputs from the current attempt."""
return self.active_attempt.outputs
@property
def execution_summary(self) -> ExecutionSummary:
"""Execution metadata from the current attempt."""
return self.active_attempt.execution_summary
@property
def attempts(self) -> list[OpAttempt]:
"""All attempts in order."""
return self._attempts
@property
def is_locked(self) -> bool:
"""True when the run is in a terminal state."""
return self.state in (
OpRunState.COMPLETED,
OpRunState.FAILED,
OpRunState.CANCELLED,
)
# ------------------------------------------------------------------
# State management
# ------------------------------------------------------------------
[docs]
def set_state(self, new_state: OpRunState) -> None:
"""Update the run state directly.
Parameters
----------
new_state : OpRunState
New state to apply.
"""
self.state = new_state
self.updated_at = datetime.now(UTC)
[docs]
def assign(self, worker_id: str, gpu_id: int | None = None) -> None:
"""Allocate this run to a worker (scheduler / internal).
Emits an ``ASSIGNED`` event and transitions PENDING → ASSIGNED.
Parameters
----------
worker_id : str
Worker that will execute this run.
gpu_id : int | None
Selected GPU, if any.
Raises
------
ValueError
If the run is not PENDING.
OpRunLockedException
If the run is terminal.
"""
payload: dict[str, Any] = {"worker_id": worker_id}
if gpu_id is not None:
payload["gpu_id"] = gpu_id
self.add_event("ASSIGNED", f"Assigned to {worker_id}", payload)
[docs]
def release_assignment(self) -> None:
"""Reclaim a run whose worker is gone (reaper / internal).
Validates ASSIGNED or RUNNING, then emits ``RETRIED`` with
``force`` so a fresh attempt is started without enforcing
``max_attempts``.
Raises
------
ValueError
If the run is not ASSIGNED or RUNNING.
OpRunLockedException
If the run is terminal.
"""
self._ensure_not_locked()
if self.state not in (OpRunState.ASSIGNED, OpRunState.RUNNING):
raise ValueError(
f"Can only release ASSIGNED or RUNNING runs. "
f"Current state: {self.state.value}"
)
self.add_event(
"RETRIED",
"Assignment released (worker inactive)",
{"force": True},
)
# ------------------------------------------------------------------
# Event handling
# ------------------------------------------------------------------
[docs]
def add_event(
self, event_type: str, msg: str | int, payload: dict[str, Any] | None = None
) -> None:
"""Append an event to the current attempt and react to its type.
Lifecycle transitions are implemented in
:mod:`gyoza.models.op_run_events`. Notable types:
- ``ASSIGNED``: PENDING → ASSIGNED (requires ``worker_id`` in payload)
- ``STARTED``: ASSIGNED → RUNNING
- ``PROGRESS``: updates progress (msg must be 0–100)
- ``COMPLETED``: terminal COMPLETED
- ``FAILED``: terminal FAILED; if retries remain, appends ``RETRIED``
and starts a new attempt (PENDING)
- ``CANCELLED``: terminal CANCELLED
- ``RETRIED``: records retry on the current attempt, and create a new attempt
Parameters
----------
event_type : str
Type of event (see :class:`~gyoza.models.resources.EventType`).
msg : str | int
Human-readable message or progress value.
payload : dict[str, Any] | None
Optional event-specific data.
Raises
------
OpRunLockedException
If the run is already in a terminal state (except for ``RETRIED``).
ValueError
If a required payload is missing or the event is invalid in the
current state.
"""
from gyoza.models.op_run_events import apply_op_run_event
apply_op_run_event(self, event_type, msg, payload)
# ------------------------------------------------------------------
# Retry policy
# ------------------------------------------------------------------
[docs]
def can_retry(self) -> bool:
"""Return True if another attempt is allowed.
Returns
-------
bool
True when current attempt count is below max_attempts.
"""
return self.current_attempt < self.retry_policy.max_attempts
# ------------------------------------------------------------------
# Serialisation
# ------------------------------------------------------------------
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Mixed view combining run-level and current-attempt fields.
"""
return {
"id": self.id,
"state": self.state.value,
"priority": self.priority,
"progress": self.progress,
"current_attempt": self.current_attempt,
"image": self.image,
"inputs": self.inputs,
"outputs": self.outputs,
"events": [e.to_dict() for e in self.events],
"execution_summary": self.execution_summary.to_dict(),
"constraints": self.constraints.to_dict(),
"retry_policy": self.retry_policy.to_dict(),
"event_delivery": self.event_delivery.to_dict(),
"op_definition": self.op_definition,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat(),
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> OpRun:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary as returned by the server API.
Returns
-------
OpRun
Reconstructed instance. For full attempt history use the
repository.
"""
created_at = data.get("createdAt") or data.get("created_at")
updated_at = data.get("updatedAt") or data.get("updated_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if isinstance(updated_at, str):
updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
event_delivery = (
EventDelivery.from_dict(data["event_delivery"])
if data.get("event_delivery")
else None
)
run = cls(
id=data["id"],
state=OpRunState(data["state"]),
priority=data.get("priority", 0),
image=data["image"],
inputs=data.get("inputs", {}),
constraints=Constraints.from_dict(data.get("constraints", {})),
retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})),
event_delivery=event_delivery,
op_definition=data.get("op_definition"),
created_at=created_at or datetime.now(UTC),
updated_at=updated_at or datetime.now(UTC),
_attempts=[],
_current_attempt_index=-1,
)
current_attempt_num = data.get("current_attempt", 1)
if current_attempt_num > 0:
events = [EventEntry.from_dict(e) for e in data.get("events", [])]
attempt = OpAttempt(
id=f"{run.id}_att_{current_attempt_num}",
op_run_id=run.id,
attempt=current_attempt_num,
state=OpRunState(data["state"]),
progress=data.get("progress", 0),
events=events,
inputs=dict(run.inputs),
outputs=data.get("outputs", {}),
execution_summary=ExecutionSummary.from_dict(
data.get("execution_summary", {})
),
constraints=run.constraints,
started_at=None,
finished_at=None,
)
run._attempts.append(attempt)
run._current_attempt_index = 0
if not run._attempts:
run._create_attempt()
return run
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _ensure_not_locked(self) -> None:
if self.is_locked:
raise OpRunLockedException(
f"OpRun {self.id} is in terminal state {self.state.value} "
"and cannot receive new events."
)
def _create_attempt(self) -> None:
attempt_num = len(self._attempts) + 1
now = datetime.now(UTC)
self._attempts.append(
OpAttempt(
id=f"{self.id}_att_{attempt_num}",
op_run_id=self.id,
attempt=attempt_num,
state=OpRunState.PENDING,
progress=0,
events=[],
inputs=dict(self.inputs),
outputs={},
execution_summary=ExecutionSummary(),
constraints=self.constraints,
started_at=now,
finished_at=None,
)
)
self._current_attempt_index = len(self._attempts) - 1