Source code for gyoza.models.op_definition

"""OpDefinition aggregate and input/output specification types."""

from __future__ import annotations

from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any

from gyoza.models.resources import Constraints, EventDelivery, RetryPolicy


[docs] class InputValidationError(Exception): """Raised when input validation fails against an OpDefinition's specs. Parameters ---------- errors : list[str] List of validation error messages. """ def __init__(self, errors: list[str]) -> None: self.errors = errors super().__init__(f"Input validation failed: {', '.join(errors)}")
[docs] @dataclass class InputSpec: """Specification for a single input parameter. Parameters ---------- type : str Type name (e.g. ``"string"``, ``"float"``, ``"int"``, ``"boolean"``). required : bool Whether the input must be provided by the caller. default : Any | None Default value applied when the input is absent and not required. """ type: str required: bool = True default: Any | None = None
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the input spec. """ result: dict[str, Any] = {"type": self.type, "required": self.required} if self.default is not None: result["default"] = self.default return result
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> InputSpec: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with keys ``type``, optional ``required`` and ``default``. Returns ------- InputSpec Populated instance. """ return cls( type=data["type"], required=data.get("required", True), default=data.get("default"), )
[docs] @dataclass class InputSpecs: """Collection of input specifications keyed by parameter name. Parameters ---------- specs : dict[str, InputSpec] Mapping of input names to their specifications. """ specs: dict[str, InputSpec] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Mapping of input names to their serialised specs. """ return {key: spec.to_dict() for key, spec in self.specs.items()}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> InputSpecs: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Mapping of input names to raw spec dictionaries. Returns ------- InputSpecs Populated instance. """ return cls(specs={key: InputSpec.from_dict(val) for key, val in data.items()})
[docs] def validate(self, inputs: dict[str, Any]) -> list[str]: """Check that all required inputs are present. Parameters ---------- inputs : dict[str, Any] Caller-provided inputs to validate. Returns ------- list[str] Validation error messages; empty when inputs are valid. """ return [ f"Missing required input: {key}" for key, spec in self.specs.items() if spec.required and key not in inputs ]
[docs] def apply_defaults(self, inputs: dict[str, Any]) -> dict[str, Any]: """Return a copy of *inputs* with default values filled in. Parameters ---------- inputs : dict[str, Any] Caller-provided inputs. Returns ------- dict[str, Any] Inputs merged with defaults for absent optional parameters. """ result = dict(inputs) for key, spec in self.specs.items(): if key not in result and spec.default is not None: result[key] = spec.default return result
[docs] @dataclass class OutputSpec: """Specification for a single output field. Parameters ---------- type : str Type name of the output (e.g. ``"string"``, ``"float"``). """ type: str
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation of the output spec. """ return {"type": self.type}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> OutputSpec: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary with key ``type``. Returns ------- OutputSpec Populated instance. """ return cls(type=data["type"])
[docs] @dataclass class OutputSpecs: """Collection of output specifications keyed by field name. Parameters ---------- specs : dict[str, OutputSpec] Mapping of output field names to their specifications. """ specs: dict[str, OutputSpec] = field(default_factory=dict)
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Mapping of output names to their serialised specs. """ return {key: spec.to_dict() for key, spec in self.specs.items()}
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> OutputSpecs: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Mapping of output names to raw spec dictionaries. Returns ------- OutputSpecs Populated instance. """ return cls(specs={key: OutputSpec.from_dict(val) for key, val in data.items()})
[docs] @dataclass class OpDefinition: """Aggregate root describing an operation and its execution requirements. Serves as the template from which op runs are created. Holds the full lifecycle of a definition: creation, input validation, defaults, and updates. Parameters ---------- id : str Unique human-readable identifier (e.g. ``"sugarcane_lines_product"``). version : str Semantic version string (e.g. ``"1.2.0"``). image : str Docker image reference including registry and tag. input_specs : InputSpecs Accepted input parameters. output_specs : OutputSpecs Output fields produced by the operation. constraints : Constraints Hardware requirements for execution. retry_policy : RetryPolicy Failure recovery rules. event_delivery : EventDelivery Event delivery configuration for runs of this definition. created_at : datetime Timestamp when the definition was registered. updated_at : datetime Timestamp of the most recent update. """ id: str version: str image: str input_specs: InputSpecs = field(default_factory=InputSpecs) output_specs: OutputSpecs = field(default_factory=OutputSpecs) constraints: Constraints = field(default_factory=Constraints) retry_policy: RetryPolicy = field(default_factory=RetryPolicy) event_delivery: EventDelivery = field(default_factory=EventDelivery) created_at: datetime = field(default_factory=lambda: datetime.now(UTC)) updated_at: datetime = field(default_factory=lambda: datetime.now(UTC))
[docs] @classmethod def create( cls, id: str, version: str, image: str, input_specs: InputSpecs | dict[str, Any] | None = None, output_specs: OutputSpecs | dict[str, Any] | None = None, constraints: Constraints | None = None, retry_policy: RetryPolicy | None = None, event_delivery: EventDelivery | None = None, ) -> OpDefinition: """Create a new OpDefinition with auto-set timestamps. Parameters ---------- id : str Unique human-readable identifier. version : str Semantic version of the definition. image : str Docker image reference. input_specs : InputSpecs | dict[str, Any] | None Accepted input parameters. output_specs : OutputSpecs | dict[str, Any] | None Output field definitions. constraints : Constraints | None Hardware requirements. retry_policy : RetryPolicy | None Failure recovery rules. event_delivery : EventDelivery | None Event delivery configuration. Returns ------- OpDefinition A new OpDefinition instance with timestamps set to now. """ now = datetime.now(UTC) if input_specs is None: resolved_input_specs = InputSpecs() elif isinstance(input_specs, dict): resolved_input_specs = InputSpecs.from_dict(input_specs) else: resolved_input_specs = input_specs if output_specs is None: resolved_output_specs = OutputSpecs() elif isinstance(output_specs, dict): resolved_output_specs = OutputSpecs.from_dict(output_specs) else: resolved_output_specs = output_specs return cls( id=id, version=version, image=image, input_specs=resolved_input_specs, output_specs=resolved_output_specs, constraints=constraints or Constraints(), retry_policy=retry_policy or RetryPolicy(), event_delivery=event_delivery or EventDelivery(), created_at=now, updated_at=now, )
[docs] def validate_inputs(self, inputs: dict[str, Any]) -> None: """Validate inputs against the definition's input specs. Parameters ---------- inputs : dict[str, Any] The inputs to validate. Raises ------ InputValidationError If required inputs are missing. """ errors = self.input_specs.validate(inputs) if errors: raise InputValidationError(errors)
[docs] def prepare_inputs(self, inputs: dict[str, Any]) -> dict[str, Any]: """Validate inputs and apply defaults. Parameters ---------- inputs : dict[str, Any] The inputs to prepare. Returns ------- dict[str, Any] Inputs with defaults applied. Raises ------ InputValidationError If required inputs are missing. """ self.validate_inputs(inputs) return self.input_specs.apply_defaults(inputs)
[docs] def update( self, version: str | None = None, image: str | None = None, input_specs: InputSpecs | dict[str, Any] | None = None, output_specs: OutputSpecs | dict[str, Any] | None = None, constraints: Constraints | None = None, retry_policy: RetryPolicy | None = None, ) -> None: """Update definition fields in place. Parameters ---------- version : str | None New version string. image : str | None New Docker image reference. input_specs : InputSpecs | dict[str, Any] | None New input specifications. output_specs : OutputSpecs | dict[str, Any] | None New output specifications. constraints : Constraints | None New hardware requirements. retry_policy : RetryPolicy | None New retry policy. """ if version is not None: self.version = version if image is not None: self.image = image if input_specs is not None: self.input_specs = ( InputSpecs.from_dict(input_specs) if isinstance(input_specs, dict) else input_specs ) if output_specs is not None: self.output_specs = ( OutputSpecs.from_dict(output_specs) if isinstance(output_specs, dict) else output_specs ) if constraints is not None: self.constraints = constraints if retry_policy is not None: self.retry_policy = retry_policy self.updated_at = datetime.now(UTC)
[docs] def to_dict(self) -> dict[str, Any]: """Serialise to a JSON-compatible dictionary. Returns ------- dict[str, Any] Dictionary representation including camelCase timestamp keys for API compatibility. """ return { "id": self.id, "version": self.version, "image": self.image, "input_specs": self.input_specs.to_dict(), "output_specs": self.output_specs.to_dict(), "constraints": self.constraints.to_dict(), "retry_policy": self.retry_policy.to_dict(), "event_delivery": self.event_delivery.to_dict(), "createdAt": self.created_at.isoformat(), "updatedAt": self.updated_at.isoformat(), }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> OpDefinition: """Deserialise from a dictionary. Parameters ---------- data : dict[str, Any] Dictionary as returned by the server API (camelCase or snake_case timestamp keys are both accepted). Returns ------- OpDefinition Populated instance. """ created_at = data.get("createdAt") or data.get("created_at") updated_at = data.get("updatedAt") or data.get("updated_at") if isinstance(created_at, str): created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00")) if isinstance(updated_at, str): updated_at = datetime.fromisoformat(updated_at.replace("Z", "+00:00")) return cls( id=data["id"], version=data["version"], image=data["image"], input_specs=InputSpecs.from_dict(data.get("input_specs", {})), output_specs=OutputSpecs.from_dict(data.get("output_specs", {})), constraints=Constraints.from_dict(data.get("constraints", {})), retry_policy=RetryPolicy.from_dict(data.get("retry_policy", {})), event_delivery=EventDelivery.from_dict(data.get("event_delivery", {})), created_at=created_at or datetime.now(UTC), updated_at=updated_at or datetime.now(UTC), )