"""Repository for OpDefinition persistence.
This is the only place that knows about MongoDB. It handles serialization
and deserialization of OpDefinition entities. OpDefinitions are identified
by a composite key (id + version). Multiple versions of the same definition
can coexist.
"""
from datetime import UTC, datetime
from typing import Any
from pymongo.collection import Collection
from gyoza.models import (
Constraints,
EventDelivery,
InputSpecs,
OpDefinition,
RetryPolicy,
)
[docs]
class OpDefinitionRepository:
"""Repository for OpDefinition persistence.
Supports versioning - definitions are identified by (id, version).
Same id + version = replace. Same id + different version = new entry.
Parameters
----------
collection : Collection
MongoDB collection for OpDefinitions.
"""
def __init__(self, collection: Collection) -> None:
self._collection = collection
def _make_doc_id(self, definition_id: str, version: str) -> str:
"""Create composite document ID from id and version."""
return f"{definition_id}:{version}"
[docs]
def get(
self, definition_id: str, version: str | None = None
) -> OpDefinition | None:
"""Retrieve an OpDefinition by its ID and optionally version.
Parameters
----------
definition_id : str
The unique identifier (name) of the OpDefinition.
version : str | None
The version of the definition. If None, returns the latest version.
Returns
-------
OpDefinition | None
The OpDefinition if found, None otherwise.
"""
if version:
doc_id = self._make_doc_id(definition_id, version)
doc = self._collection.find_one({"_id": doc_id})
else:
# Get latest by created_at
doc = self._collection.find_one(
{"definition_id": definition_id},
sort=[("created_at", -1)],
)
if not doc:
return None
return self._doc_to_definition(doc)
[docs]
def upsert(self, definition: OpDefinition) -> OpDefinition:
"""Create or update an OpDefinition.
If an OpDefinition with the same id AND version exists, it will be replaced.
If the version is different, a new entry is created (preserving old versions).
Parameters
----------
definition : OpDefinition
The OpDefinition to upsert.
Returns
-------
OpDefinition
The upserted OpDefinition with updated timestamps.
"""
doc_id = self._make_doc_id(definition.id, definition.version)
existing = self._collection.find_one({"_id": doc_id})
if existing:
# Preserve original created_at when updating same version
definition.created_at = existing.get("created_at", definition.created_at)
definition.updated_at = datetime.now(UTC)
doc = self._definition_to_doc(definition)
self._collection.replace_one({"_id": doc_id}, doc, upsert=True)
return definition
[docs]
def save(self, definition: OpDefinition) -> None:
"""Persist an OpDefinition (insert or update).
Parameters
----------
definition : OpDefinition
The OpDefinition to persist.
"""
doc_id = self._make_doc_id(definition.id, definition.version)
doc = self._definition_to_doc(definition)
self._collection.replace_one({"_id": doc_id}, doc, upsert=True)
[docs]
def delete(self, definition_id: str, version: str | None = None) -> int:
"""Delete an OpDefinition by its ID and optionally version.
Parameters
----------
definition_id : str
The unique identifier of the OpDefinition.
version : str | None
The version to delete. If None, deletes all versions.
Returns
-------
int
Number of definitions deleted.
"""
if version:
doc_id = self._make_doc_id(definition_id, version)
result = self._collection.delete_one({"_id": doc_id})
return result.deleted_count
result = self._collection.delete_many({"definition_id": definition_id})
return result.deleted_count
[docs]
def list_all(self) -> list[OpDefinition]:
"""List all OpDefinitions (all versions).
Returns
-------
list[OpDefinition]
All OpDefinitions in the collection.
"""
docs = list(self._collection.find())
return [self._doc_to_definition(doc) for doc in docs]
[docs]
def list_versions(self, definition_id: str) -> list[OpDefinition]:
"""List all versions of an OpDefinition.
Parameters
----------
definition_id : str
The unique identifier of the OpDefinition.
Returns
-------
list[OpDefinition]
All versions of the definition, sorted by created_at descending.
"""
docs = list(
self._collection.find({"definition_id": definition_id}).sort(
"created_at", -1
)
)
return [self._doc_to_definition(doc) for doc in docs]
[docs]
def exists(self, definition_id: str, version: str | None = None) -> bool:
"""Check if an OpDefinition exists.
Parameters
----------
definition_id : str
The unique identifier of the OpDefinition.
version : str | None
The version to check. If None, checks if any version exists.
Returns
-------
bool
True if the definition exists, False otherwise.
"""
if version:
doc_id = self._make_doc_id(definition_id, version)
return self._collection.count_documents({"_id": doc_id}, limit=1) > 0
return (
self._collection.count_documents({"definition_id": definition_id}, limit=1)
> 0
)
# -------------------------------------------------------------------------
# Internal helpers
# -------------------------------------------------------------------------
def _definition_to_doc(self, definition: OpDefinition) -> dict[str, Any]:
"""Convert OpDefinition to MongoDB document."""
return {
"_id": self._make_doc_id(definition.id, definition.version),
"definition_id": definition.id,
"version": definition.version,
"image": definition.image,
"input_specs": definition.input_specs.to_dict(),
"constraints": definition.constraints.to_dict(),
"retry_policy": definition.retry_policy.to_dict(),
"event_delivery": definition.event_delivery.to_dict(),
"created_at": definition.created_at,
"updated_at": definition.updated_at,
}
def _doc_to_definition(self, doc: dict[str, Any]) -> OpDefinition:
"""Convert MongoDB document to OpDefinition."""
return OpDefinition(
id=doc["definition_id"],
version=doc["version"],
image=doc["image"],
input_specs=InputSpecs.from_dict(doc.get("input_specs", {})),
constraints=Constraints.from_dict(doc.get("constraints", {})),
retry_policy=RetryPolicy.from_dict(doc.get("retry_policy", {})),
event_delivery=EventDelivery.from_dict(doc.get("event_delivery", {})),
created_at=doc.get("created_at", datetime.now(UTC)),
updated_at=doc.get("updated_at", datetime.now(UTC)),
)