"""OpRun aggregate root — execution run lifecycle management."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4
from gyoza.models.op_attempt import OpAttempt
from gyoza.models.resources import (
Constraints,
EventDelivery,
EventEntry,
ExecutionSummary,
OpRunState,
RetryPolicy,
)
[docs]
class OpRunLockedException(Exception):
"""Raised when trying to modify a terminal-state OpRun."""
[docs]
@dataclass
class OpRun:
"""Aggregate root for execution runs.
Encapsulates all attempts internally. Users interact only with OpRun;
the current attempt's data (progress, events, outputs, execution_summary)
is exposed directly via properties.
A new OpRun always starts with its first attempt. Additional attempts
are only created via :meth:`retry`.
Parameters
----------
id : str
Unique identifier for the run.
state : OpRunState
Current state of the run.
priority : int
Execution queue priority.
image : str
Docker image reference.
inputs : dict[str, Any]
Input parameters for execution.
constraints : Constraints
Hardware requirements.
retry_policy : RetryPolicy
Failure recovery rules.
event_delivery : EventDelivery
Event delivery configuration.
op_definition : str | None
Parent OpDefinition name (``None`` for ad-hoc runs).
created_at : datetime
When the run was created.
updated_at : datetime
Last state update timestamp.
"""
id: str
state: OpRunState
priority: int
image: str
inputs: dict[str, Any] = field(default_factory=dict)
constraints: Constraints = field(default_factory=Constraints)
retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
event_delivery: EventDelivery = field(default_factory=EventDelivery)
op_definition: str | None = None
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
_attempts: list[OpAttempt] = field(default_factory=list, repr=False)
_current_attempt_index: int = field(default=-1, repr=False)
# ------------------------------------------------------------------
# Factory
# ------------------------------------------------------------------
[docs]
@classmethod
def create(
cls,
image: str,
priority: int = 0,
inputs: dict[str, Any] | None = None,
constraints: Constraints | None = None,
retry_policy: RetryPolicy | None = None,
event_delivery: EventDelivery | None = None,
op_definition: str | None = None,
) -> OpRun:
"""Create a new OpRun in PENDING state with its first attempt.
Parameters
----------
image : str
Docker image reference.
priority : int
Scheduling priority (default 0).
inputs : dict[str, Any] | None
Input parameters.
constraints : Constraints | None
Hardware requirements.
retry_policy : RetryPolicy | None
Failure recovery rules.
event_delivery : EventDelivery | None
Event delivery configuration.
op_definition : str | None
Parent OpDefinition name.
Returns
-------
OpRun
A new OpRun in PENDING state with its first attempt created.
"""
now = datetime.now(UTC)
run = cls(
id=f"run_{uuid4().hex[:12]}",
state=OpRunState.PENDING,
priority=priority,
image=image,
inputs=inputs or {},
constraints=constraints or Constraints(),
retry_policy=retry_policy or RetryPolicy(),
event_delivery=event_delivery or EventDelivery(),
op_definition=op_definition,
created_at=now,
updated_at=now,
_attempts=[],
_current_attempt_index=-1,
)
run._create_attempt()
return run
# ------------------------------------------------------------------
# Properties — current attempt data exposed on the run
# ------------------------------------------------------------------
@property
def current_attempt(self) -> int:
"""Current attempt number (1-indexed)."""
if self._current_attempt_index < 0:
return 0
return self._attempts[self._current_attempt_index].attempt
@property
def progress(self) -> int:
"""Current progress percentage (0–100)."""
if self._current_attempt_index < 0:
return 0
return self._attempts[self._current_attempt_index].progress
@property
def events(self) -> list[EventEntry]:
"""Event timeline from the current attempt."""
if self._current_attempt_index < 0:
return []
return self._attempts[self._current_attempt_index].events
@property
def outputs(self) -> dict[str, Any]:
"""Outputs from the current attempt."""
if self._current_attempt_index < 0:
return {}
return self._attempts[self._current_attempt_index].outputs
@property
def execution_summary(self) -> ExecutionSummary:
"""Execution metadata from the current attempt."""
if self._current_attempt_index < 0:
return ExecutionSummary()
return self._attempts[self._current_attempt_index].execution_summary
@property
def attempts(self) -> list[OpAttempt]:
"""All attempts in order."""
return self._attempts
@property
def is_locked(self) -> bool:
"""True when the run is in a terminal state."""
return self.state in (
OpRunState.COMPLETED,
OpRunState.FAILED,
OpRunState.CANCELLED,
)
# ------------------------------------------------------------------
# State management
# ------------------------------------------------------------------
[docs]
def set_state(self, new_state: OpRunState) -> None:
"""Update the run state directly.
Parameters
----------
new_state : OpRunState
New state to apply.
"""
self.state = new_state
self.updated_at = datetime.now(UTC)
[docs]
def assign(self, worker_id: str) -> None:
"""Transition the run from PENDING to ASSIGNED.
Parameters
----------
worker_id : str
ID of the worker that will execute this run.
Raises
------
ValueError
If the run is not in PENDING state.
OpRunLockedException
If the run is in a terminal state.
"""
self._ensure_not_locked()
if self.state != OpRunState.PENDING:
raise ValueError(
f"Can only assign PENDING runs. Current state: {self.state.value}"
)
self.state = OpRunState.ASSIGNED
self.updated_at = datetime.now(UTC)
attempt = self._attempts[self._current_attempt_index]
attempt.state = OpRunState.ASSIGNED
attempt.execution_summary.worker_id = worker_id
[docs]
def set_priority(self, priority: int) -> None:
"""Update the scheduling priority.
Parameters
----------
priority : int
New priority value (must be ≥ 0).
Raises
------
ValueError
If priority is negative.
"""
if priority < 0:
raise ValueError("Priority must be >= 0")
self.priority = priority
self.updated_at = datetime.now(UTC)
[docs]
def set_outputs(self, outputs: dict[str, Any]) -> None:
"""Set outputs on the current attempt.
Parameters
----------
outputs : dict[str, Any]
Output values to attach.
"""
if self._current_attempt_index < 0:
raise ValueError("No current attempt exists")
self._attempts[self._current_attempt_index].outputs = dict(outputs)
self.updated_at = datetime.now(UTC)
[docs]
def set_execution_summary(
self,
duration_ms: int | None = None,
worker_id: str | None = None,
error_message: str | None = None,
gpu_id: int | None = None,
) -> None:
"""Update execution metadata on the current attempt.
Parameters
----------
duration_ms : int | None
Execution duration in milliseconds.
worker_id : str | None
ID of the executing worker.
error_message : str | None
Error description on failure.
gpu_id : int | None
GPU identifier used.
"""
if self._current_attempt_index < 0:
raise ValueError("No current attempt exists")
summary = self._attempts[self._current_attempt_index].execution_summary
if duration_ms is not None:
summary.duration_ms = duration_ms
if worker_id is not None:
summary.worker_id = worker_id
if error_message is not None:
summary.error_message = error_message
if gpu_id is not None:
summary.gpu_id = gpu_id
self.updated_at = datetime.now(UTC)
# ------------------------------------------------------------------
# Event handling
# ------------------------------------------------------------------
[docs]
def add_event(
self, event_type: str, msg: str | int, payload: dict[str, Any] | None = None
) -> None:
"""Append an event to the current attempt and react to its type.
Special event types trigger state transitions:
- ``STARTED``: requires ASSIGNED state → transitions to RUNNING
- ``PROGRESS``: updates progress percentage (msg must be 0–100)
- ``COMPLETED``: locks run, sets state to COMPLETED
- ``FAILED``: locks run, sets state to FAILED
- ``CANCELLED``: locks run, sets state to CANCELLED
- Other (``INFO``, ``ERROR``): recorded without state change
Parameters
----------
event_type : str
Type of event (see :class:`~gyoza.models.resources.EventType`).
msg : str | int
Human-readable message or progress value.
payload : dict[str, Any] | None
Optional event-specific data.
Raises
------
OpRunLockedException
If the run is already in a terminal state.
ValueError
If a required payload is missing or the event is invalid in the
current state.
"""
self._ensure_not_locked()
if self._current_attempt_index < 0:
raise ValueError("No current attempt exists")
attempt = self._attempts[self._current_attempt_index]
now = datetime.now(UTC)
payload = payload or {}
attempt.events.append(
EventEntry(t=now, type=event_type, msg=msg, payload=payload)
)
self.updated_at = now
if event_type == "STARTED":
if self.state != OpRunState.ASSIGNED:
raise ValueError(
f"STARTED event requires run to be in ASSIGNED state. "
f"Current state: {self.state.value}"
)
self.state = OpRunState.RUNNING
attempt.state = OpRunState.RUNNING
if "worker_id" in payload:
attempt.execution_summary.worker_id = payload["worker_id"]
if "gpu_id" in payload:
attempt.execution_summary.gpu_id = payload["gpu_id"]
elif event_type == "PROGRESS":
if not isinstance(msg, int) or not 0 <= msg <= 100:
raise ValueError("PROGRESS event msg must be int between 0 and 100")
attempt.progress = msg
elif event_type == "COMPLETED":
attempt.state = OpRunState.COMPLETED
attempt.finished_at = now
self.state = OpRunState.COMPLETED
if "outputs" in payload:
attempt.outputs = dict(payload["outputs"])
elif event_type == "FAILED":
attempt.state = OpRunState.FAILED
attempt.finished_at = now
self.state = OpRunState.FAILED
if "error_message" in payload:
attempt.execution_summary.error_message = payload["error_message"]
elif event_type == "CANCELLED":
attempt.state = OpRunState.CANCELLED
attempt.finished_at = now
self.state = OpRunState.CANCELLED
# ------------------------------------------------------------------
# Retry
# ------------------------------------------------------------------
[docs]
def can_retry(self) -> bool:
"""Return True if another attempt is allowed.
Returns
-------
bool
True when current attempt count is below max_attempts.
"""
return self.current_attempt < self.retry_policy.max_attempts
[docs]
def retry(self, force: bool = False) -> None:
"""Create a new attempt and reset the run to PENDING.
Parameters
----------
force : bool
Skip the max_attempts check when True.
Raises
------
ValueError
If max attempts are exhausted and *force* is False.
"""
if not force and not self.can_retry():
raise ValueError(f"Max attempts ({self.retry_policy.max_attempts}) reached")
self._create_attempt()
self.state = OpRunState.PENDING
self.updated_at = datetime.now(UTC)
# ------------------------------------------------------------------
# Serialisation
# ------------------------------------------------------------------
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Mixed view combining run-level and current-attempt fields.
"""
return {
"id": self.id,
"state": self.state.value,
"priority": self.priority,
"progress": self.progress,
"current_attempt": self.current_attempt,
"image": self.image,
"inputs": self.inputs,
"outputs": self.outputs,
"events": [e.to_dict() for e in self.events],
"execution_summary": self.execution_summary.to_dict(),
"constraints": self.constraints.to_dict(),
"retry_policy": self.retry_policy.to_dict(),
"event_delivery": self.event_delivery.to_dict(),
"op_definition": self.op_definition,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat(),
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> OpRun:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary as returned by the server API.
Returns
-------
OpRun
Reconstructed instance. For full attempt history use the
repository.
"""
created_at = data.get("createdAt") or data.get("created_at")
updated_at = data.get("updatedAt") or data.get("updated_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if isinstance(updated_at, str):
updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
event_delivery = (
EventDelivery.from_dict(data["event_delivery"])
if data.get("event_delivery")
else None
)
run = cls(
id=data["id"],
state=OpRunState(data["state"]),
priority=data.get("priority", 0),
image=data["image"],
inputs=data.get("inputs", {}),
constraints=Constraints.from_dict(data.get("constraints", {})),
retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})),
event_delivery=event_delivery,
op_definition=data.get("op_definition"),
created_at=created_at or datetime.now(UTC),
updated_at=updated_at or datetime.now(UTC),
_attempts=[],
_current_attempt_index=-1,
)
current_attempt_num = data.get("current_attempt", 1)
if current_attempt_num > 0:
events = [EventEntry.from_dict(e) for e in data.get("events", [])]
attempt = OpAttempt(
id=f"{run.id}_att_{current_attempt_num}",
op_run_id=run.id,
attempt=current_attempt_num,
state=OpRunState(data["state"]),
progress=data.get("progress", 0),
events=events,
inputs=dict(run.inputs),
outputs=data.get("outputs", {}),
execution_summary=ExecutionSummary.from_dict(
data.get("execution_summary", {})
),
constraints=run.constraints,
started_at=None,
finished_at=None,
)
run._attempts.append(attempt)
run._current_attempt_index = 0
return run
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _ensure_not_locked(self) -> None:
if self.is_locked:
raise OpRunLockedException(
f"OpRun {self.id} is in terminal state {self.state.value} "
"and cannot receive new events."
)
def _create_attempt(self) -> None:
attempt_num = len(self._attempts) + 1
now = datetime.now(UTC)
self._attempts.append(
OpAttempt(
id=f"{self.id}_att_{attempt_num}",
op_run_id=self.id,
attempt=attempt_num,
state=OpRunState.PENDING,
progress=0,
events=[],
inputs=dict(self.inputs),
outputs={},
execution_summary=ExecutionSummary(),
constraints=self.constraints,
started_at=now,
finished_at=None,
)
)
self._current_attempt_index = len(self._attempts) - 1