"""OpRun router - execution run management endpoints."""
from datetime import UTC, datetime
from typing import Any
from fastapi import APIRouter, HTTPException, Query, status
from gyoza.server.api.models import (
AddEventRequest,
ErrorResponse,
EventsResponse,
OpAttemptResponse,
OpRunCreate,
OpRunListResponse,
OpRunResponse,
OpRunUpdate,
)
from gyoza.server.op_run import (
Constraints,
EventDelivery,
OpRun,
OpRunState,
RetryPolicy,
op_run_repo,
)
router = APIRouter()
[docs]
@router.get(
"",
response_model=OpRunListResponse,
responses={
200: {"description": "Paginated list of OpRuns"},
400: {"model": ErrorResponse, "description": "Invalid pagination parameters"},
},
)
def list_op_runs(
limit: int = Query(default=10, ge=1, le=100),
starting_after: str | None = Query(default=None),
ending_before: str | None = Query(default=None),
state: str | None = Query(default=None),
status_filter: str | None = Query(default=None, alias="status"),
op_definition: str | None = Query(default=None),
image: str | None = Query(default=None),
created_gte: int | None = Query(default=None, alias="created[gte]"),
created_lte: int | None = Query(default=None, alias="created[lte]"),
created_gt: int | None = Query(default=None, alias="created[gt]"),
created_lt: int | None = Query(default=None, alias="created[lt]"),
) -> dict[str, Any]:
"""List OpRuns with cursor-based pagination (Stripe-style).
Results are sorted by creation date descending (newest first).
Parameters
----------
limit : int
Maximum number of items to return (1–100, default 10).
starting_after : str | None
Cursor: return items created after this run ID (next page).
ending_before : str | None
Cursor: return items created before this run ID (previous page).
state : str | None
Filter by run state.
status_filter : str | None
Alias for ``state``.
op_definition : str | None
Filter by op definition identifier.
image : str | None
Filter by image.
created_gte : int | None
Filter runs created at or after a UNIX timestamp.
created_lte : int | None
Filter runs created at or before a UNIX timestamp.
created_gt : int | None
Filter runs created strictly after a UNIX timestamp.
created_lt : int | None
Filter runs created strictly before a UNIX timestamp.
Returns
-------
dict[str, Any]
Stripe-style list object with ``data``, ``has_more``, and ``url``.
Raises
------
HTTPException
400 if pagination/filter values are invalid.
"""
selected_state = state or status_filter
if state and status_filter and state != status_filter:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="state and status must match when both are provided",
)
filters: dict[str, Any] = {}
if selected_state:
try:
filters["state"] = OpRunState(selected_state).value
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid state '{selected_state}'",
) from e
if op_definition:
filters["op_definition"] = op_definition
if image:
filters["image"] = image
created_range: dict[str, datetime] = {}
try:
if created_gte is not None:
created_range["$gte"] = datetime.fromtimestamp(created_gte, tz=UTC)
if created_lte is not None:
created_range["$lte"] = datetime.fromtimestamp(created_lte, tz=UTC)
if created_gt is not None:
created_range["$gt"] = datetime.fromtimestamp(created_gt, tz=UTC)
if created_lt is not None:
created_range["$lt"] = datetime.fromtimestamp(created_lt, tz=UTC)
except (OverflowError, OSError, ValueError) as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid created timestamp",
) from e
if created_range:
filters["created_at"] = created_range
try:
runs = op_run_repo.list_cursor(
limit=limit,
starting_after=starting_after,
ending_before=ending_before,
filters=filters,
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
has_more = False
if runs:
direction = "before" if ending_before else "after"
boundary_id = runs[0].id if ending_before else runs[-1].id
has_more = op_run_repo.has_more(
direction=direction,
cursor_id=boundary_id,
filters=filters,
)
return {
"object": "list",
"data": [run.to_dict() for run in runs],
"has_more": has_more,
"url": "/runs",
}
[docs]
@router.post(
"",
response_model=OpRunResponse,
status_code=status.HTTP_201_CREATED,
responses={
201: {"description": "OpRun created"},
400: {"model": ErrorResponse, "description": "Invalid input"},
},
)
def create_op_run(request: OpRunCreate) -> dict[str, Any]:
"""Create an OpRun directly (ad-hoc).
This endpoint allows creating execution runs without a parent OpDefinition.
Useful for ad-hoc usage and development testing.
Parameters
----------
request : OpRunCreate
Run creation parameters including image, inputs, and constraints.
Returns
-------
dict[str, Any]
The created OpRun with auto-generated ID and default values.
Examples
--------
>>> POST /runs
>>> {
... "image": "geoiahub/my_product:latest",
... "priority": 4,
... "inputs": {"param": "value"},
... "constraints": {"ram_mb": 4096}
... }
"""
# Build constraints
constraints = Constraints()
if request.constraints:
constraints = Constraints(
ram_mb=request.constraints.ram_mb,
vram_mb=request.constraints.vram_mb,
cpu=request.constraints.cpu,
)
# Build retry policy
retry_policy = RetryPolicy()
if request.retry_policy:
retry_policy = RetryPolicy(max_attempts=request.retry_policy.max_attempts)
# Build event delivery
event_delivery = None
if request.event_delivery:
event_delivery = EventDelivery(
topic=request.event_delivery.topic,
attributes=request.event_delivery.attributes or {},
)
# Create OpRun
run = OpRun.create(
image=request.image,
priority=request.priority,
inputs=request.inputs,
constraints=constraints,
retry_policy=retry_policy,
event_delivery=event_delivery,
op_definition=None, # Ad-hoc run
)
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.get(
"/{id}",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun retrieved"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def get_op_run(id: str) -> dict[str, Any]:
"""Get an OpRun by ID.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun object with all its data.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
return run.to_dict()
[docs]
@router.patch(
"/{id}",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun updated"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Invalid update"},
},
)
def update_op_run(id: str, request: OpRunUpdate) -> dict[str, Any]:
"""Update an OpRun.
Allows updating state, outputs, and priority of a run.
Parameters
----------
id : str
The unique identifier of the OpRun.
request : OpRunUpdate
Fields to update.
Returns
-------
dict[str, Any]
The updated OpRun object.
Raises
------
HTTPException
404 if OpRun not found.
400 if invalid state or update.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
now = datetime.now(UTC)
# Update state if provided
if request.state is not None:
new_state = OpRunState(request.state)
run.set_state(new_state)
# Emergency-only direct mutation (bypasses event timeline)
if request.outputs is not None:
run.outputs.clear()
run.outputs.update(request.outputs)
run.updated_at = now
if request.priority is not None:
run.priority = request.priority
run.updated_at = now
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.post(
"/{id}/retry",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun retry initiated"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Cannot retry"},
},
)
def retry_op_run(id: str) -> dict[str, Any]:
"""Retry an OpRun.
Creates a new OpAttempt, increments the current_attempt counter,
and resets the state to PENDING.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun with state=PENDING and incremented attempt counter.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
try:
run.add_event("RETRIED", "Manual retry", {"force": True})
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.post(
"/{id}/cancel",
response_model=OpRunResponse,
responses={
200: {"description": "OpRun cancelled"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Run already terminal"},
},
)
def cancel_op_run(id: str) -> dict[str, Any]:
"""Cancel an OpRun in a non-terminal state.
Sets the run state to CANCELLED. Workers polling run state will
see the change and stop the container.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun with state=CANCELLED.
Raises
------
HTTPException
404 if OpRun not found.
400 if run is already in a terminal state.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
if run.is_locked:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Run is already in terminal state {run.state.value}",
)
try:
run.add_event("CANCELLED", "Cancelled by user", None)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
op_run_repo.save(run)
return run.to_dict()
[docs]
@router.get(
"/{id}/events",
response_model=EventsResponse,
responses={
200: {"description": "Events retrieved"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def poll_op_run_events(id: str, after: int | None = None) -> dict[str, Any]:
"""Poll OpRun events.
Returns events using a polling-based mechanism. You can provide the
starting event index to receive only events after that index.
Parameters
----------
id : str
The unique identifier of the OpRun.
after : int | None
Optional: Only events after this index will be returned.
Returns
-------
dict[str, Any]
Object containing list of events with their IDs and current state.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
# Get all events
all_events = run.events
# Filter by 'after' if provided
start_idx = after + 1 if after is not None else 0
filtered_events = all_events[start_idx:]
# Build response with event IDs
events_response = []
for idx, event in enumerate(filtered_events, start=start_idx):
events_response.append(
{
"id": idx,
"type": event.type,
"t": event.t.isoformat(),
"msg": event.msg,
"state": run.state.value,
"payload": dict(event.payload),
}
)
return {"events": events_response}
# -----------------------------------------------------------------------------
# Get OpAttempts
# -----------------------------------------------------------------------------
[docs]
@router.get(
"/{id}/attempts",
response_model=list[OpAttemptResponse],
responses={
200: {"description": "List of OpAttempts"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
},
)
def get_op_attempts(id: str) -> list[dict[str, Any]]:
"""Get all attempts for an OpRun.
Returns the full history of execution attempts for the run.
Parameters
----------
id : str
The unique identifier of the OpRun.
Returns
-------
list[dict[str, Any]]
List of all OpAttempts for this run.
Raises
------
HTTPException
404 if OpRun not found.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
return [attempt.to_dict() for attempt in run.attempts]
[docs]
@router.post(
"/{id}/events",
response_model=OpRunResponse,
responses={
200: {"description": "Event added successfully"},
404: {"model": ErrorResponse, "description": "OpRun not found"},
400: {"model": ErrorResponse, "description": "Invalid event or run locked"},
},
)
def add_event_to_run(id: str, request: AddEventRequest) -> dict[str, Any]:
"""Add an event to the current attempt of an OpRun.
This endpoint is typically called by the gyoza worker during execution.
The event is added to the current attempt's timeline.
Special event types with payloads:
- ``STARTED`` — payload: ``{"worker_id": str, "gpu_id": int}`` (gpu_id optional).
- ``PROGRESS`` — msg: int (0-100), updates the progress field automatically.
- ``COMPLETED`` — payload: ``{"outputs": dict}`` (optional).
- ``FAILED`` — payload: ``{"error_message": str}`` (optional). If retries
remain, the run is automatically retried (state set back to PENDING).
- ``CANCELLED`` — no payload.
Parameters
----------
id : str
The unique identifier of the OpRun.
request : AddEventRequest
Event details (type, message, and optional payload).
Returns
-------
dict[str, Any]
The updated OpRun object.
Raises
------
HTTPException
404 if OpRun not found, 400 if run is locked or event is invalid.
"""
run = op_run_repo.get(id)
if not run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"OpRun '{id}' not found",
)
try:
run.add_event(request.type, request.msg, request.payload)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
) from e
op_run_repo.save(run)
return run.to_dict()