Source code for gyoza.server.api.routers.definitions

"""OpDefinition router - template management endpoints."""

from typing import Any

from fastapi import APIRouter, HTTPException, status

from gyoza.server.api.models import (
    ErrorResponse,
    OpDefinitionCreate,
    OpDefinitionResponse,
    OpRunCreateFromDefinition,
    OpRunResponse,
)
from gyoza.server.op_definition import (
    Constraints,
    EventDelivery,
    InputValidationError,
    OpDefinition,
    RetryPolicy,
    op_definition_repo,
)
from gyoza.server.op_run import OpRun, op_run_repo

router = APIRouter()


[docs] @router.post( "", response_model=OpDefinitionResponse, status_code=status.HTTP_201_CREATED, responses={ 201: {"description": "OpDefinition created or updated"}, 400: {"model": ErrorResponse, "description": "Invalid input"}, }, ) def upsert_op_definition(request: OpDefinitionCreate) -> dict[str, Any]: """Create or update an OpDefinition. If an OpDefinition with the same id and version exists, it will be replaced. If the version is different, a new entry is created. Parameters ---------- request : OpDefinitionCreate OpDefinition data. Returns ------- dict[str, Any] Created or updated OpDefinition. """ # Convert request models to domain objects input_specs = None if request.input_specs: input_specs = { key: { "type": spec.type, "required": spec.required, "default": spec.default, } for key, spec in request.input_specs.items() } constraints = None if request.constraints: constraints = Constraints( ram_mb=request.constraints.ram_mb, vram_mb=request.constraints.vram_mb, cpu=request.constraints.cpu, ) retry_policy = None if request.retry_policy: retry_policy = RetryPolicy( max_attempts=request.retry_policy.max_attempts, backoff_ms=request.retry_policy.backoff_ms, ) event_delivery = None if request.event_delivery: event_delivery = EventDelivery( topic=request.event_delivery.topic, attributes=request.event_delivery.attributes or {}, ) definition = OpDefinition.create( id=request.id, version=request.version, image=request.image, input_specs=input_specs, constraints=constraints, retry_policy=retry_policy, event_delivery=event_delivery, ) result = op_definition_repo.upsert(definition) return result.to_dict()
[docs] @router.get( "/{name}", response_model=OpDefinitionResponse, responses={ 200: {"description": "OpDefinition found"}, 404: {"model": ErrorResponse, "description": "OpDefinition not found"}, }, ) def get_op_definition(name: str, version: str | None = None) -> dict[str, Any]: """Get an OpDefinition by name and optionally version. Parameters ---------- name : str OpDefinition identifier. version : str | None Specific version to retrieve. If None, returns latest version. Returns ------- dict[str, Any] OpDefinition data. Raises ------ HTTPException If OpDefinition not found. """ definition = op_definition_repo.get(name, version) if not definition: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpDefinition '{name}' not found", ) return definition.to_dict()
[docs] @router.get( "", response_model=list[OpDefinitionResponse], responses={ 200: {"description": "List of all OpDefinitions"}, }, ) def list_op_definitions() -> list[dict[str, Any]]: """List all OpDefinitions (all versions). Returns ------- list[dict[str, Any]] List of all OpDefinitions. """ definitions = op_definition_repo.list_all() return [d.to_dict() for d in definitions]
[docs] @router.post( "/{name}/runs", response_model=OpRunResponse, status_code=status.HTTP_201_CREATED, responses={ 201: {"description": "OpRun created"}, 404: {"model": ErrorResponse, "description": "OpDefinition not found"}, 400: {"model": ErrorResponse, "description": "Invalid inputs"}, }, ) def create_op_run_from_definition( name: str, request: OpRunCreateFromDefinition, version: str | None = None ) -> dict[str, Any]: """Create an OpRun from an OpDefinition. Parameters ---------- name : str OpDefinition identifier. request : OpRunCreate OpRun creation parameters. version : str | None Specific version to use. If None, uses latest version. Returns ------- dict[str, Any] Created OpRun data. Raises ------ HTTPException If OpDefinition not found or inputs are invalid. """ definition = op_definition_repo.get(name, version) if not definition: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpDefinition '{name}' not found", ) # Validate and prepare inputs try: prepared_inputs = definition.prepare_inputs(request.inputs) except InputValidationError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Input validation failed", ) from e # Create OpRun using definition's configuration run = OpRun.create( image=definition.image, priority=request.priority, inputs=prepared_inputs, constraints=definition.constraints, retry_policy=definition.retry_policy, event_delivery=definition.event_delivery, op_definition=definition.id, ) op_run_repo.save(run) return run.to_dict()