Source code for gyoza.server.op_definition.repository

"""Repository for OpDefinition persistence.

This is the only place that knows about MongoDB. It handles serialization
and deserialization of OpDefinition entities. OpDefinitions are identified
by a composite key (id + version). Multiple versions of the same definition
can coexist.
"""

from datetime import UTC, datetime
from typing import Any

from pymongo.collection import Collection

from gyoza.models import (
    Constraints,
    EventDelivery,
    InputSpecs,
    OpDefinition,
    RetryPolicy,
)


[docs] class OpDefinitionRepository: """Repository for OpDefinition persistence. Supports versioning - definitions are identified by (id, version). Same id + version = replace. Same id + different version = new entry. Parameters ---------- collection : Collection MongoDB collection for OpDefinitions. """ def __init__(self, collection: Collection) -> None: self._collection = collection def _make_doc_id(self, definition_id: str, version: str) -> str: """Create composite document ID from id and version.""" return f"{definition_id}:{version}"
[docs] def get( self, definition_id: str, version: str | None = None ) -> OpDefinition | None: """Retrieve an OpDefinition by its ID and optionally version. Parameters ---------- definition_id : str The unique identifier (name) of the OpDefinition. version : str | None The version of the definition. If None, returns the latest version. Returns ------- OpDefinition | None The OpDefinition if found, None otherwise. """ if version: doc_id = self._make_doc_id(definition_id, version) doc = self._collection.find_one({"_id": doc_id}) else: # Get latest by created_at doc = self._collection.find_one( {"definition_id": definition_id}, sort=[("created_at", -1)], ) if not doc: return None return self._doc_to_definition(doc)
[docs] def upsert(self, definition: OpDefinition) -> OpDefinition: """Create or update an OpDefinition. If an OpDefinition with the same id AND version exists, it will be replaced. If the version is different, a new entry is created (preserving old versions). Parameters ---------- definition : OpDefinition The OpDefinition to upsert. Returns ------- OpDefinition The upserted OpDefinition with updated timestamps. """ doc_id = self._make_doc_id(definition.id, definition.version) existing = self._collection.find_one({"_id": doc_id}) if existing: # Preserve original created_at when updating same version definition.created_at = existing.get("created_at", definition.created_at) definition.updated_at = datetime.now(UTC) doc = self._definition_to_doc(definition) self._collection.replace_one({"_id": doc_id}, doc, upsert=True) return definition
[docs] def save(self, definition: OpDefinition) -> None: """Persist an OpDefinition (insert or update). Parameters ---------- definition : OpDefinition The OpDefinition to persist. """ doc_id = self._make_doc_id(definition.id, definition.version) doc = self._definition_to_doc(definition) self._collection.replace_one({"_id": doc_id}, doc, upsert=True)
[docs] def delete(self, definition_id: str, version: str | None = None) -> int: """Delete an OpDefinition by its ID and optionally version. Parameters ---------- definition_id : str The unique identifier of the OpDefinition. version : str | None The version to delete. If None, deletes all versions. Returns ------- int Number of definitions deleted. """ if version: doc_id = self._make_doc_id(definition_id, version) result = self._collection.delete_one({"_id": doc_id}) return result.deleted_count result = self._collection.delete_many({"definition_id": definition_id}) return result.deleted_count
[docs] def list_all(self) -> list[OpDefinition]: """List all OpDefinitions (all versions). Returns ------- list[OpDefinition] All OpDefinitions in the collection. """ docs = list(self._collection.find()) return [self._doc_to_definition(doc) for doc in docs]
[docs] def list_versions(self, definition_id: str) -> list[OpDefinition]: """List all versions of an OpDefinition. Parameters ---------- definition_id : str The unique identifier of the OpDefinition. Returns ------- list[OpDefinition] All versions of the definition, sorted by created_at descending. """ docs = list( self._collection.find({"definition_id": definition_id}).sort( "created_at", -1 ) ) return [self._doc_to_definition(doc) for doc in docs]
[docs] def exists(self, definition_id: str, version: str | None = None) -> bool: """Check if an OpDefinition exists. Parameters ---------- definition_id : str The unique identifier of the OpDefinition. version : str | None The version to check. If None, checks if any version exists. Returns ------- bool True if the definition exists, False otherwise. """ if version: doc_id = self._make_doc_id(definition_id, version) return self._collection.count_documents({"_id": doc_id}, limit=1) > 0 return ( self._collection.count_documents({"definition_id": definition_id}, limit=1) > 0 )
# ------------------------------------------------------------------------- # Internal helpers # ------------------------------------------------------------------------- def _definition_to_doc(self, definition: OpDefinition) -> dict[str, Any]: """Convert OpDefinition to MongoDB document.""" return { "_id": self._make_doc_id(definition.id, definition.version), "definition_id": definition.id, "version": definition.version, "image": definition.image, "input_specs": definition.input_specs.to_dict(), "constraints": definition.constraints.to_dict(), "retry_policy": definition.retry_policy.to_dict(), "event_delivery": definition.event_delivery.to_dict(), "created_at": definition.created_at, "updated_at": definition.updated_at, } def _doc_to_definition(self, doc: dict[str, Any]) -> OpDefinition: """Convert MongoDB document to OpDefinition.""" return OpDefinition( id=doc["definition_id"], version=doc["version"], image=doc["image"], input_specs=InputSpecs.from_dict(doc.get("input_specs", {})), constraints=Constraints.from_dict(doc.get("constraints", {})), retry_policy=RetryPolicy.from_dict(doc.get("retry_policy", {})), event_delivery=EventDelivery.from_dict(doc.get("event_delivery", {})), created_at=doc.get("created_at", datetime.now(UTC)), updated_at=doc.get("updated_at", datetime.now(UTC)), )