"""OpDefinition aggregate and input/output specification types."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from gyoza.models.resources import Constraints, EventDelivery, RetryPolicy
[docs]
@dataclass
class OutputSpec:
"""Specification for a single output field.
Parameters
----------
type : str
Type name of the output (e.g. ``"string"``, ``"float"``).
"""
type: str
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation of the output spec.
"""
return {"type": self.type}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> OutputSpec:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary with key ``type``.
Returns
-------
OutputSpec
Populated instance.
"""
return cls(type=data["type"])
[docs]
@dataclass
class OutputSpecs:
"""Collection of output specifications keyed by field name.
Parameters
----------
specs : dict[str, OutputSpec]
Mapping of output field names to their specifications.
"""
specs: dict[str, OutputSpec] = field(default_factory=dict)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Mapping of output names to their serialised specs.
"""
return {key: spec.to_dict() for key, spec in self.specs.items()}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> OutputSpecs:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Mapping of output names to raw spec dictionaries.
Returns
-------
OutputSpecs
Populated instance.
"""
return cls(specs={key: OutputSpec.from_dict(val) for key, val in data.items()})
[docs]
@dataclass
class OpDefinition:
"""Aggregate root describing an operation and its execution requirements.
Serves as the template from which op runs are created. Holds the full
lifecycle of a definition: creation, input validation, defaults, and
updates.
Parameters
----------
id : str
Unique human-readable identifier (e.g. ``"sugarcane_lines_product"``).
version : str
Semantic version string (e.g. ``"1.2.0"``).
image : str
Docker image reference including registry and tag.
input_specs : InputSpecs
Accepted input parameters.
output_specs : OutputSpecs
Output fields produced by the operation.
constraints : Constraints
Hardware requirements for execution.
retry_policy : RetryPolicy
Failure recovery rules.
event_delivery : EventDelivery
Event delivery configuration for runs of this definition.
created_at : datetime
Timestamp when the definition was registered.
updated_at : datetime
Timestamp of the most recent update.
"""
id: str
version: str
image: str
input_specs: InputSpecs = field(default_factory=InputSpecs)
output_specs: OutputSpecs = field(default_factory=OutputSpecs)
constraints: Constraints = field(default_factory=Constraints)
retry_policy: RetryPolicy = field(default_factory=RetryPolicy)
event_delivery: EventDelivery = field(default_factory=EventDelivery)
created_at: datetime = field(default_factory=lambda: datetime.now(UTC))
updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
[docs]
@classmethod
def create(
cls,
id: str,
version: str,
image: str,
input_specs: InputSpecs | dict[str, Any] | None = None,
output_specs: OutputSpecs | dict[str, Any] | None = None,
constraints: Constraints | None = None,
retry_policy: RetryPolicy | None = None,
event_delivery: EventDelivery | None = None,
) -> OpDefinition:
"""Create a new OpDefinition with auto-set timestamps.
Parameters
----------
id : str
Unique human-readable identifier.
version : str
Semantic version of the definition.
image : str
Docker image reference.
input_specs : InputSpecs | dict[str, Any] | None
Accepted input parameters.
output_specs : OutputSpecs | dict[str, Any] | None
Output field definitions.
constraints : Constraints | None
Hardware requirements.
retry_policy : RetryPolicy | None
Failure recovery rules.
event_delivery : EventDelivery | None
Event delivery configuration.
Returns
-------
OpDefinition
A new OpDefinition instance with timestamps set to now.
"""
now = datetime.now(UTC)
if input_specs is None:
resolved_input_specs = InputSpecs()
elif isinstance(input_specs, dict):
resolved_input_specs = InputSpecs.from_dict(input_specs)
else:
resolved_input_specs = input_specs
if output_specs is None:
resolved_output_specs = OutputSpecs()
elif isinstance(output_specs, dict):
resolved_output_specs = OutputSpecs.from_dict(output_specs)
else:
resolved_output_specs = output_specs
return cls(
id=id,
version=version,
image=image,
input_specs=resolved_input_specs,
output_specs=resolved_output_specs,
constraints=constraints or Constraints(),
retry_policy=retry_policy or RetryPolicy(),
event_delivery=event_delivery or EventDelivery(),
created_at=now,
updated_at=now,
)
[docs]
def update(
self,
version: str | None = None,
image: str | None = None,
input_specs: InputSpecs | dict[str, Any] | None = None,
output_specs: OutputSpecs | dict[str, Any] | None = None,
constraints: Constraints | None = None,
retry_policy: RetryPolicy | None = None,
) -> None:
"""Update definition fields in place.
Parameters
----------
version : str | None
New version string.
image : str | None
New Docker image reference.
input_specs : InputSpecs | dict[str, Any] | None
New input specifications.
output_specs : OutputSpecs | dict[str, Any] | None
New output specifications.
constraints : Constraints | None
New hardware requirements.
retry_policy : RetryPolicy | None
New retry policy.
"""
if version is not None:
self.version = version
if image is not None:
self.image = image
if input_specs is not None:
self.input_specs = (
InputSpecs.from_dict(input_specs)
if isinstance(input_specs, dict)
else input_specs
)
if output_specs is not None:
self.output_specs = (
OutputSpecs.from_dict(output_specs)
if isinstance(output_specs, dict)
else output_specs
)
if constraints is not None:
self.constraints = constraints
if retry_policy is not None:
self.retry_policy = retry_policy
self.updated_at = datetime.now(UTC)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialise to a JSON-compatible dictionary.
Returns
-------
dict[str, Any]
Dictionary representation including camelCase timestamp keys for
API compatibility.
"""
return {
"id": self.id,
"version": self.version,
"image": self.image,
"input_specs": self.input_specs.to_dict(),
"output_specs": self.output_specs.to_dict(),
"constraints": self.constraints.to_dict(),
"retry_policy": self.retry_policy.to_dict(),
"event_delivery": self.event_delivery.to_dict(),
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat(),
}
[docs]
@classmethod
def from_dict(cls, data: dict[str, Any]) -> OpDefinition:
"""Deserialise from a dictionary.
Parameters
----------
data : dict[str, Any]
Dictionary as returned by the server API (camelCase or snake_case
timestamp keys are both accepted).
Returns
-------
OpDefinition
Populated instance.
"""
created_at = data.get("createdAt") or data.get("created_at")
updated_at = data.get("updatedAt") or data.get("updated_at")
if isinstance(created_at, str):
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
if isinstance(updated_at, str):
updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00"))
return cls(
id=data["id"],
version=data["version"],
image=data["image"],
input_specs=InputSpecs.from_dict(data.get("input_specs", {})),
output_specs=OutputSpecs.from_dict(data.get("output_specs", {})),
constraints=Constraints.from_dict(data.get("constraints", {})),
retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})),
event_delivery=EventDelivery.from_dict(data.get("event_delivery", {})),
created_at=created_at or datetime.now(UTC),
updated_at=updated_at or datetime.now(UTC),
)