"""OpRun router - execution run management endpoints."""
from typing import Any
from fastapi import APIRouter, HTTPException, status
from gyoza.server.api.models import (
AddEventRequest,
ErrorResponse,
EventsResponse,
OpAttemptResponse,
OpRunCreate,
OpRunResponse,
OpRunUpdate,
)
from gyoza.server.op_run import (
Constraints,
EventDelivery,
OpRun,
OpRunState,
RetryPolicy,
op_run_repo,
)
router = APIRouter()
[docs]
@router.post(
"",
response_model=OpRunResponse,
status_code=status.HTTP_201_CREATED,
responses={
201: {"description": "OpRun created"},
400: {"model": ErrorResponse, "description": "Invalid input"},
},
)
def create_op_run(request: OpRunCreate) -> dict[str, Any]:
"""Create an OpRun directly (ad-hoc).
This endpoint allows creating execution runs without a parent OpDefinition.
Useful for ad-hoc usage and development testing.
Parameters
----------
request : OpRunCreate
Run creation parameters including image, inputs, and constraints.
Returns
-------
dict[str, Any]
The created OpRun with auto-generated ID and default values.
Examples
--------
>>> POST /runs
>>> {
... "image": "geoiahub/my_product:latest",
... "priority": 4,
... "inputs": {"param": "value"},
... "constraints": {"ram_mb": 4096}
... }
"""
# Build constraints
constraints = Constraints()
if request.constraints:
constraints = Constraints(
ram_mb=request.constraints.ram_mb,
vram_mb=request.constraints.vram_mb,
cpu=request.constraints.cpu,
)
# Build retry policy
retry_policy = RetryPolicy()
if request.retry_policy:
retry_policy = RetryPolicy(
max_attempts=request.retry_policy.max_attempts,
backoff_ms=request.retry_policy.backoff_ms,
)
# Build event delivery
event_delivery = None
if request.event_delivery:
event_delivery = EventDelivery(
topic=request.event_delivery.topic,
attributes=request.event_delivery.attributes or {},
)
# Create OpRun
run = OpRun.create(
image=request.image,
priority=request.priority,
inputs=request.inputs,
constraints=constraints,
retry_policy=retry_policy,
event_delivery=event_delivery,
op_definition=None, # Ad-hoc run
)
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.get(
"/{id}",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun retrieved"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def get_op_run(id: str) -> dict[str, Any]:
"""Get an OpRun by ID.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun object with all its data.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
return run.to_dict()
[docs]
@router.patch(
"/{id}",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun updated"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Invalid update"},
},
)
def update_op_run(id: str, request: OpRunUpdate) -> dict[str, Any]:
"""Update an OpRun.
Allows updating state, outputs, and priority of a run.
Parameters
----------
id : str
The unique identifier of the OpRun.
request : OpRunUpdate
Fields to update.
Returns
-------
dict[str, Any]
The updated OpRun object.
Raises
------
HTTPException
404 if OpRun not found.
400 if invalid state or update.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
# Update state if provided
if request.state is not None:
new_state = OpRunState(request.state)
run.set_state(new_state)
# Update outputs if provided
if request.outputs is not None:
run.set_outputs(request.outputs)
# Update priority if provided
if request.priority is not None:
run.set_priority(request.priority)
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.post(
"/{id}/retry",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun retry initiated"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Cannot retry"},
},
)
def retry_op_run(id: str) -> dict[str, Any]:
"""Retry an OpRun.
Creates a new OpAttempt, increments the current_attempt counter,
and resets the state to PENDING.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun with state=PENDING and incremented attempt counter.
Raises
------
HTTPException
404 if OpRun not found.
400 if max attempts reached.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
try:
run.retry()
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.get(
"/{id}/events",
response_model=EventsResponse,
responses={
200: {"description": "Events retrieved"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def poll_op_run_events(id: str, after: int | None = None) -> dict[str, Any]:
"""Poll OpRun events.
Returns events using a polling-based mechanism. You can provide the
starting event index to receive only events after that index.
Parameters
----------
id : str
The unique identifier of the OpRun.
after : int | None
Optional: Only events after this index will be returned.
Returns
-------
dict[str, Any]
Object containing list of events with their IDs and current state.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
# Get all events
all_events = run.events
# Filter by 'after' if provided
start_idx = after + 1 if after is not None else 0
filtered_events = all_events[start_idx:]
# Build response with event IDs
events_response = []
for idx, event in enumerate(filtered_events, start=start_idx):
events_response.append(
{
"id": idx,
"type": event.type,
"t": event.t.isoformat(),
"msg": event.msg,
"state": run.state.value,
}
)
return {"events": events_response}
# -----------------------------------------------------------------------------
# Get OpAttempts
# -----------------------------------------------------------------------------
[docs]
@router.get(
"/{id}/attempts",
response_model=list[OpAttemptResponse],
responses={
200: {"description": "List of OpAttempts"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def get_op_attempts(id: str) -> list[dict[str, Any]]:
"""Get all attempts for an OpRun.
Returns the full history of execution attempts for the run.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
list[dict[str, Any]]
List of all OpAttempts for this run.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
return [attempt.to_dict() for attempt in run.attempts]
[docs]
@router.post(
"/{id}/events",
response_model=OpRunResponse,
responses={
200: {"description": "Event added successfully"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Invalid event or run locked"},
},
)
def add_event_to_run(id: str, request: AddEventRequest) -> dict[str, Any]:
"""Add an event to the current attempt of an OpRun.
This endpoint is typically called by the gyoza worker during execution.
The event is added to the current attempt's timeline.
Special event types with payloads:
- ``STARTED`` — payload: ``{"worker_id": str, "gpu_id": int}`` (gpu_id optional).
- ``PROGRESS`` — msg: int (0-100), updates the progress field automatically.
- ``COMPLETED`` — payload: ``{"outputs": dict}`` (optional).
- ``FAILED`` — payload: ``{"error_message": str}`` (optional).
- ``CANCELLED`` — no payload.
Parameters
----------
id : str
The unique identifier of the OpRun.
request : AddEventRequest
Event details (type, message, and optional payload).
Returns
-------
dict[str, Any]
The updated OpRun object.
Raises
------
HTTPException
404 if OpRun not found, 400 if run is locked or event is invalid.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
try:
run.add_event(request.type, request.msg, request.payload)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
op_run_repo.save(run)
return run.to_dict()