"""Repository for OpRun aggregate persistence.
This is the only place that knows about MongoDB. It handles serialization
and deserialization of the OpRun aggregate (including its attempts).
"""
from datetime import UTC, datetime
from typing import Any
from pymongo.collection import Collection
from gyoza.models import (
Constraints,
EventDelivery,
EventEntry,
ExecutionSummary,
OpAttempt,
OpRun,
OpRunState,
RetryPolicy,
)
[docs]
class OpRunRepository:
"""Repository for OpRun aggregate persistence.
Handles both OpRun and its attempts as a single aggregate.
Attempts are stored in a separate collection but loaded together.
Parameters
----------
runs_collection : Collection
MongoDB collection for OpRuns.
attempts_collection : Collection
MongoDB collection for OpAttempts.
"""
def __init__(
self, runs_collection: Collection, attempts_collection: Collection
) -> None:
self._runs = runs_collection
self._attempts = attempts_collection
[docs]
def get(self, run_id: str) -> OpRun | None:
"""Retrieve an OpRun aggregate by its ID.
Loads the run and all its attempts, reconstructing the full aggregate.
Parameters
----------
run_id : str
The unique identifier of the OpRun.
Returns
-------
OpRun | None
The complete OpRun aggregate if found, None otherwise.
"""
run_doc = self._runs.find_one({"_id": run_id})
if not run_doc:
return None
# Load all attempts for this run
attempt_docs = list(
self._attempts.find({"op_run_id": run_id}).sort("attempt", 1)
)
return self._reconstruct_aggregate(run_doc, attempt_docs)
[docs]
def save(self, run: OpRun) -> None:
"""Persist an OpRun aggregate (insert or update).
Saves both the run and all its attempts.
Parameters
----------
run : OpRun
The OpRun aggregate to persist.
"""
# Save the run document
run_doc = self._op_run_to_doc(run)
self._runs.replace_one({"_id": run.id}, run_doc, upsert=True)
# Save all attempts
for attempt in run._attempts:
attempt_doc = self._attempt_to_doc(attempt)
self._attempts.replace_one({"_id": attempt.id}, attempt_doc, upsert=True)
[docs]
def delete(self, run_id: str) -> bool:
"""Delete an OpRun and all its attempts.
Parameters
----------
run_id : str
The unique identifier of the OpRun to delete.
Returns
-------
bool
True if the run was deleted, False if not found.
"""
self._attempts.delete_many({"op_run_id": run_id})
result = self._runs.delete_one({"_id": run_id})
return result.deleted_count > 0
[docs]
def list_by_state(self, state: OpRunState) -> list[OpRun]:
"""List all OpRuns with a specific state.
Parameters
----------
state : OpRunState
The state to filter by.
Returns
-------
list[OpRun]
List of OpRuns matching the state.
"""
run_docs = list(self._runs.find({"state": state.value}))
return [self._load_full_aggregate(doc) for doc in run_docs]
[docs]
def list_pending_ordered(self) -> list[OpRun]:
"""List pending OpRuns ordered by priority (descending).
Returns
-------
list[OpRun]
Pending OpRuns sorted by priority (highest first).
"""
run_docs = list(
self._runs.find({"state": OpRunState.PENDING.value}).sort("priority", -1)
)
return [self._load_full_aggregate(doc) for doc in run_docs]
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _load_full_aggregate(self, run_doc: dict[str, Any]) -> OpRun:
"""Load a full aggregate from a run document."""
attempt_docs = list(
self._attempts.find({"op_run_id": run_doc["_id"]}).sort("attempt", 1)
)
return self._reconstruct_aggregate(run_doc, attempt_docs)
def _reconstruct_aggregate(
self, run_doc: dict[str, Any], attempt_docs: list[dict[str, Any]]
) -> OpRun:
"""Reconstruct OpRun aggregate from documents."""
constraints_doc = run_doc.get("constraints", {})
retry_doc = run_doc.get("retry_policy", {})
event_delivery_doc = run_doc.get("event_delivery", {})
run = OpRun(
id=run_doc["_id"],
state=OpRunState(run_doc["state"]),
priority=run_doc.get("priority", 0),
image=run_doc["image"],
inputs=run_doc.get("inputs", {}),
constraints=Constraints.from_dict(constraints_doc),
retry_policy=RetryPolicy.from_dict(retry_doc),
event_delivery=EventDelivery.from_dict(event_delivery_doc),
op_definition=run_doc.get("op_definition"),
created_at=run_doc.get("created_at", datetime.now(UTC)),
updated_at=run_doc.get("updated_at", datetime.now(UTC)),
_attempts=[],
_current_attempt_index=-1,
)
# Reconstruct attempts
for attempt_doc in attempt_docs:
attempt = self._doc_to_attempt(attempt_doc)
run._attempts.append(attempt)
# Set current attempt index
if run._attempts:
run._current_attempt_index = len(run._attempts) - 1
return run
def _op_run_to_doc(self, run: OpRun) -> dict[str, Any]:
"""Convert OpRun to MongoDB document."""
doc: dict[str, Any] = {
"_id": run.id,
"state": run.state.value,
"priority": run.priority,
"image": run.image,
"inputs": run.inputs,
"constraints": run.constraints.to_dict(),
"retry_policy": run.retry_policy.to_dict(),
"event_delivery": run.event_delivery.to_dict(),
"op_definition": run.op_definition,
"created_at": run.created_at,
"updated_at": run.updated_at,
}
return doc
def _attempt_to_doc(self, attempt: OpAttempt) -> dict[str, Any]:
"""Convert attempt to MongoDB document."""
return {
"_id": attempt.id,
"op_run_id": attempt.op_run_id,
"attempt": attempt.attempt,
"state": attempt.state.value,
"progress": attempt.progress,
"events": [e.to_dict() for e in attempt.events],
"inputs": attempt.inputs,
"outputs": attempt.outputs,
"execution_summary": attempt.execution_summary.to_dict(),
"constraints": attempt.constraints.to_dict(),
"started_at": attempt.started_at,
"finished_at": attempt.finished_at,
}
def _doc_to_attempt(self, doc: dict[str, Any]) -> OpAttempt:
"""Convert MongoDB document to attempt."""
events = [EventEntry.from_dict(e) for e in doc.get("events", [])]
return OpAttempt(
id=doc["_id"],
op_run_id=doc["op_run_id"],
attempt=doc["attempt"],
state=OpRunState(doc["state"]),
progress=doc.get("progress", 0),
events=events,
inputs=doc.get("inputs", {}),
outputs=doc.get("outputs", {}),
execution_summary=ExecutionSummary.from_dict(
doc.get("execution_summary", {})
),
constraints=Constraints.from_dict(doc.get("constraints", {})),
started_at=doc.get("started_at"),
finished_at=doc.get("finished_at"),
)