"""OpDefinition router - template management endpoints."""
from typing import Any
from fastapi import APIRouter, HTTPException, status
from gyoza.server.api.models import (
ErrorResponse,
OpDefinitionCreate,
OpDefinitionResponse,
OpRunCreateFromDefinition,
OpRunResponse,
)
from gyoza.server.op_definition import (
Constraints,
EventDelivery,
InputValidationError,
OpDefinition,
RetryPolicy,
op_definition_repo,
)
from gyoza.server.op_run import OpRun, op_run_repo
router = APIRouter()
[docs]
@router.post(
"",
response_model=OpDefinitionResponse,
status_code=status.HTTP_201_CREATED,
responses={
201: {"description": "OpDefinition created or updated"},
400: {"model": ErrorResponse, "description": "Invalid input"},
},
)
def upsert_op_definition(request: OpDefinitionCreate) -> dict[str, Any]:
"""Create or update an OpDefinition.
If an OpDefinition with the same id and version exists, it will be replaced.
If the version is different, a new entry is created.
Parameters
----------
request : OpDefinitionCreate
OpDefinition data.
Returns
-------
dict[str, Any]
Created or updated OpDefinition.
"""
# Convert request models to domain objects
input_specs = None
if request.input_specs:
input_specs = {
key: {
"type": spec.type,
"required": spec.required,
"default": spec.default,
}
for key, spec in request.input_specs.items()
}
constraints = None
if request.constraints:
constraints = Constraints(
ram_mb=request.constraints.ram_mb,
vram_mb=request.constraints.vram_mb,
cpu=request.constraints.cpu,
)
retry_policy = None
if request.retry_policy:
retry_policy = RetryPolicy(
max_attempts=request.retry_policy.max_attempts,
backoff_ms=request.retry_policy.backoff_ms,
)
event_delivery = None
if request.event_delivery:
event_delivery = EventDelivery(
topic=request.event_delivery.topic,
attributes=request.event_delivery.attributes or {},
)
definition = OpDefinition.create(
id=request.id,
version=request.version,
image=request.image,
input_specs=input_specs,
constraints=constraints,
retry_policy=retry_policy,
event_delivery=event_delivery,
)
result = op_definition_repo.upsert(definition)
return result.to_dict()
[docs]
@router.get(
"/{name}",
response_model=OpDefinitionResponse,
responses={
200: {"description": "OpDefinition found"},
404: {"model": ErrorResponse, "description": "OpDefinition not found"},
},
)
def get_op_definition(name: str, version: str | None = None) -> dict[str, Any]:
"""Get an OpDefinition by name and optionally version.
Parameters
----------
name : str
OpDefinition identifier.
version : str | None
Specific version to retrieve. If None, returns latest version.
Returns
-------
dict[str, Any]
OpDefinition data.
Raises
------
HTTPException
If OpDefinition not found.
"""
definition = op_definition_repo.get(name, version)
if not definition:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpDefinition '{name}' not found",
)
return definition.to_dict()
[docs]
@router.get(
"",
response_model=list[OpDefinitionResponse],
responses={
200: {"description": "List of all OpDefinitions"},
},
)
def list_op_definitions() -> list[dict[str, Any]]:
"""List all OpDefinitions (all versions).
Returns
-------
list[dict[str, Any]]
List of all OpDefinitions.
"""
definitions = op_definition_repo.list_all()
return [d.to_dict() for d in definitions]
[docs]
@router.post(
"/{name}/runs",
response_model=OpRunResponse,
status_code=status.HTTP_201_CREATED,
responses={
201: {"description": "OpRun created"},
404: {"model": ErrorResponse, "description": "OpDefinition not found"},
400: {"model": ErrorResponse, "description": "Invalid inputs"},
},
)
def create_op_run_from_definition(
name: str, request: OpRunCreateFromDefinition, version: str | None = None
) -> dict[str, Any]:
"""Create an OpRun from an OpDefinition.
Parameters
----------
name : str
OpDefinition identifier.
request : OpRunCreate
OpRun creation parameters.
version : str | None
Specific version to use. If None, uses latest version.
Returns
-------
dict[str, Any]
Created OpRun data.
Raises
------
HTTPException
If OpDefinition not found or inputs are invalid.
"""
definition = op_definition_repo.get(name, version)
if not definition:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpDefinition '{name}' not found",
)
# Validate and prepare inputs
try:
prepared_inputs = definition.prepare_inputs(request.inputs)
except InputValidationError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Input validation failed",
) from e
# Create OpRun using definition's configuration
run = OpRun.create(
image=definition.image,
priority=request.priority,
inputs=prepared_inputs,
constraints=definition.constraints,
retry_policy=definition.retry_policy,
event_delivery=definition.event_delivery,
op_definition=definition.id,
)
op_run_repo.save(run)
return run.to_dict()