Source code for gyoza.server.op_run.repository

"""Repository for OpRun aggregate persistence.

This is the only place that knows about MongoDB. It handles serialization
and deserialization of the OpRun aggregate (including its attempts).
"""

from datetime import UTC, datetime
from typing import Any

from pymongo.collection import Collection

from gyoza.models import (
    Constraints,
    EventDelivery,
    EventEntry,
    ExecutionSummary,
    OpAttempt,
    OpRun,
    OpRunState,
    RetryPolicy,
)


[docs] class OpRunRepository: """Repository for OpRun aggregate persistence. Handles both OpRun and its attempts as a single aggregate. Attempts are stored in a separate collection but loaded together. Parameters ---------- runs_collection : Collection MongoDB collection for OpRuns. attempts_collection : Collection MongoDB collection for OpAttempts. """ def __init__( self, runs_collection: Collection, attempts_collection: Collection ) -> None: self._runs = runs_collection self._attempts = attempts_collection
[docs] def get(self, run_id: str) -> OpRun | None: """Retrieve an OpRun aggregate by its ID. Loads the run and all its attempts, reconstructing the full aggregate. Parameters ---------- run_id : str The unique identifier of the OpRun. Returns ------- OpRun | None The complete OpRun aggregate if found, None otherwise. """ run_doc = self._runs.find_one({"_id": run_id}) if not run_doc: return None # Load all attempts for this run attempt_docs = list( self._attempts.find({"op_run_id": run_id}).sort("attempt", 1) ) return self._reconstruct_aggregate(run_doc, attempt_docs)
[docs] def save(self, run: OpRun) -> None: """Persist an OpRun aggregate (insert or update). Saves both the run and all its attempts. Parameters ---------- run : OpRun The OpRun aggregate to persist. """ # Save the run document run_doc = self._op_run_to_doc(run) self._runs.replace_one({"_id": run.id}, run_doc, upsert=True) # Save all attempts for attempt in run._attempts: attempt_doc = self._attempt_to_doc(attempt) self._attempts.replace_one({"_id": attempt.id}, attempt_doc, upsert=True)
[docs] def delete(self, run_id: str) -> bool: """Delete an OpRun and all its attempts. Parameters ---------- run_id : str The unique identifier of the OpRun to delete. Returns ------- bool True if the run was deleted, False if not found. """ self._attempts.delete_many({"op_run_id": run_id}) result = self._runs.delete_one({"_id": run_id}) return result.deleted_count > 0
[docs] def list_by_state(self, state: OpRunState) -> list[OpRun]: """List all OpRuns with a specific state. Parameters ---------- state : OpRunState The state to filter by. Returns ------- list[OpRun] List of OpRuns matching the state. """ run_docs = list(self._runs.find({"state": state.value})) return [self._load_full_aggregate(doc) for doc in run_docs]
[docs] def list_pending_ordered(self) -> list[OpRun]: """List pending OpRuns ordered by priority (descending). Returns ------- list[OpRun] Pending OpRuns sorted by priority (highest first). """ run_docs = list( self._runs.find({"state": OpRunState.PENDING.value}).sort("priority", -1) ) return [self._load_full_aggregate(doc) for doc in run_docs]
# ------------------------------------------------------------------------- # Internal helpers # ------------------------------------------------------------------------- def _load_full_aggregate(self, run_doc: dict[str, Any]) -> OpRun: """Load a full aggregate from a run document.""" attempt_docs = list( self._attempts.find({"op_run_id": run_doc["_id"]}).sort("attempt", 1) ) return self._reconstruct_aggregate(run_doc, attempt_docs) def _reconstruct_aggregate( self, run_doc: dict[str, Any], attempt_docs: list[dict[str, Any]] ) -> OpRun: """Reconstruct OpRun aggregate from documents.""" constraints_doc = run_doc.get("constraints", {}) retry_doc = run_doc.get("retry_policy", {}) event_delivery_doc = run_doc.get("event_delivery", {}) run = OpRun( id=run_doc["_id"], state=OpRunState(run_doc["state"]), priority=run_doc.get("priority", 0), image=run_doc["image"], inputs=run_doc.get("inputs", {}), constraints=Constraints.from_dict(constraints_doc), retry_policy=RetryPolicy.from_dict(retry_doc), event_delivery=EventDelivery.from_dict(event_delivery_doc), op_definition=run_doc.get("op_definition"), created_at=run_doc.get("created_at", datetime.now(UTC)), updated_at=run_doc.get("updated_at", datetime.now(UTC)), _attempts=[], _current_attempt_index=-1, ) # Reconstruct attempts for attempt_doc in attempt_docs: attempt = self._doc_to_attempt(attempt_doc) run._attempts.append(attempt) # Set current attempt index if run._attempts: run._current_attempt_index = len(run._attempts) - 1 return run def _op_run_to_doc(self, run: OpRun) -> dict[str, Any]: """Convert OpRun to MongoDB document.""" doc: dict[str, Any] = { "_id": run.id, "state": run.state.value, "priority": run.priority, "image": run.image, "inputs": run.inputs, "constraints": run.constraints.to_dict(), "retry_policy": run.retry_policy.to_dict(), "event_delivery": run.event_delivery.to_dict(), "op_definition": run.op_definition, "created_at": run.created_at, "updated_at": run.updated_at, } return doc def _attempt_to_doc(self, attempt: OpAttempt) -> dict[str, Any]: """Convert attempt to MongoDB document.""" return { "_id": attempt.id, "op_run_id": attempt.op_run_id, "attempt": attempt.attempt, "state": attempt.state.value, "progress": attempt.progress, "events": [e.to_dict() for e in attempt.events], "inputs": attempt.inputs, "outputs": attempt.outputs, "execution_summary": attempt.execution_summary.to_dict(), "constraints": attempt.constraints.to_dict(), "started_at": attempt.started_at, "finished_at": attempt.finished_at, } def _doc_to_attempt(self, doc: dict[str, Any]) -> OpAttempt: """Convert MongoDB document to attempt.""" events = [EventEntry.from_dict(e) for e in doc.get("events", [])] return OpAttempt( id=doc["_id"], op_run_id=doc["op_run_id"], attempt=doc["attempt"], state=OpRunState(doc["state"]), progress=doc.get("progress", 0), events=events, inputs=doc.get("inputs", {}), outputs=doc.get("outputs", {}), execution_summary=ExecutionSummary.from_dict( doc.get("execution_summary", {}) ), constraints=Constraints.from_dict(doc.get("constraints", {})), started_at=doc.get("started_at"), finished_at=doc.get("finished_at"), )