Source code for gyoza.server.api.routers.runs

"""OpRun router - execution run management endpoints."""

from datetime import UTC, datetime
from typing import Any

from fastapi import APIRouter, HTTPException, Query, status

from gyoza.server.api.models import (
    AddEventRequest,
    ErrorResponse,
    EventsResponse,
    OpAttemptResponse,
    OpRunCreate,
    OpRunListResponse,
    OpRunResponse,
    OpRunUpdate,
)
from gyoza.server.op_run import (
    Constraints,
    EventDelivery,
    OpRun,
    OpRunState,
    RetryPolicy,
    op_run_repo,
)

router = APIRouter()


[docs] @router.get( "", response_model=OpRunListResponse, responses={ 200: {"description": "Paginated list of OpRuns"}, 400: {"model": ErrorResponse, "description": "Invalid pagination parameters"}, }, ) def list_op_runs( limit: int = Query(default=10, ge=1, le=100), starting_after: str | None = Query(default=None), ending_before: str | None = Query(default=None), state: str | None = Query(default=None), status_filter: str | None = Query(default=None, alias="status"), op_definition: str | None = Query(default=None), image: str | None = Query(default=None), created_gte: int | None = Query(default=None, alias="created[gte]"), created_lte: int | None = Query(default=None, alias="created[lte]"), created_gt: int | None = Query(default=None, alias="created[gt]"), created_lt: int | None = Query(default=None, alias="created[lt]"), ) -> dict[str, Any]: """List OpRuns with cursor-based pagination (Stripe-style). Results are sorted by creation date descending (newest first). Parameters ---------- limit : int Maximum number of items to return (1–100, default 10). starting_after : str | None Cursor: return items created after this run ID (next page). ending_before : str | None Cursor: return items created before this run ID (previous page). state : str | None Filter by run state. status_filter : str | None Alias for ``state``. op_definition : str | None Filter by op definition identifier. image : str | None Filter by image. created_gte : int | None Filter runs created at or after a UNIX timestamp. created_lte : int | None Filter runs created at or before a UNIX timestamp. created_gt : int | None Filter runs created strictly after a UNIX timestamp. created_lt : int | None Filter runs created strictly before a UNIX timestamp. Returns ------- dict[str, Any] Stripe-style list object with ``data``, ``has_more``, and ``url``. Raises ------ HTTPException 400 if pagination/filter values are invalid. """ selected_state = state or status_filter if state and status_filter and state != status_filter: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="state and status must match when both are provided", ) filters: dict[str, Any] = {} if selected_state: try: filters["state"] = OpRunState(selected_state).value except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid state '{selected_state}'", ) from e if op_definition: filters["op_definition"] = op_definition if image: filters["image"] = image created_range: dict[str, datetime] = {} try: if created_gte is not None: created_range["$gte"] = datetime.fromtimestamp(created_gte, tz=UTC) if created_lte is not None: created_range["$lte"] = datetime.fromtimestamp(created_lte, tz=UTC) if created_gt is not None: created_range["$gt"] = datetime.fromtimestamp(created_gt, tz=UTC) if created_lt is not None: created_range["$lt"] = datetime.fromtimestamp(created_lt, tz=UTC) except (OverflowError, OSError, ValueError) as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid created timestamp", ) from e if created_range: filters["created_at"] = created_range try: runs = op_run_repo.list_cursor( limit=limit, starting_after=starting_after, ending_before=ending_before, filters=filters, ) except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e has_more = False if runs: direction = "before" if ending_before else "after" boundary_id = runs[0].id if ending_before else runs[-1].id has_more = op_run_repo.has_more( direction=direction, cursor_id=boundary_id, filters=filters, ) return { "object": "list", "data": [run.to_dict() for run in runs], "has_more": has_more, "url": "/runs", }
[docs] @router.post( "", response_model=OpRunResponse, status_code=status.HTTP_201_CREATED, responses={ 201: {"description": "OpRun created"}, 400: {"model": ErrorResponse, "description": "Invalid input"}, }, ) def create_op_run(request: OpRunCreate) -> dict[str, Any]: """Create an OpRun directly (ad-hoc). This endpoint allows creating execution runs without a parent OpDefinition. Useful for ad-hoc usage and development testing. Parameters ---------- request : OpRunCreate Run creation parameters including image, inputs, and constraints. Returns ------- dict[str, Any] The created OpRun with auto-generated ID and default values. Examples -------- >>> POST /runs >>> { ... "image": "geoiahub/my_product:latest", ... "priority": 4, ... "inputs": {"param": "value"}, ... "constraints": {"ram_mb": 4096} ... } """ # Build constraints constraints = Constraints() if request.constraints: constraints = Constraints( ram_mb=request.constraints.ram_mb, vram_mb=request.constraints.vram_mb, cpu=request.constraints.cpu, ) # Build retry policy retry_policy = RetryPolicy() if request.retry_policy: retry_policy = RetryPolicy(max_attempts=request.retry_policy.max_attempts) # Build event delivery event_delivery = None if request.event_delivery: event_delivery = EventDelivery( topic=request.event_delivery.topic, attributes=request.event_delivery.attributes or {}, ) # Create OpRun run = OpRun.create( image=request.image, priority=request.priority, inputs=request.inputs, constraints=constraints, retry_policy=retry_policy, event_delivery=event_delivery, op_definition=None, # Ad-hoc run ) op_run_repo.save(run) return run.to_dict()
[docs] @router.get( "/{id}", response_model=OpRunResponse, responses={ 200: {"description": "OpRun retrieved"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def get_op_run(id: str) -> dict[str, Any]: """Get an OpRun by ID. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun object with all its data. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) return run.to_dict()
[docs] @router.patch( "/{id}", response_model=OpRunResponse, responses={ 200: {"description": "OpRun updated"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Invalid update"}, }, ) def update_op_run(id: str, request: OpRunUpdate) -> dict[str, Any]: """Update an OpRun. Allows updating state, outputs, and priority of a run. Parameters ---------- id : str The unique identifier of the OpRun. request : OpRunUpdate Fields to update. Returns ------- dict[str, Any] The updated OpRun object. Raises ------ HTTPException 404 if OpRun not found. 400 if invalid state or update. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) now = datetime.now(UTC) # Update state if provided if request.state is not None: new_state = OpRunState(request.state) run.set_state(new_state) # Emergency-only direct mutation (bypasses event timeline) if request.outputs is not None: run.outputs.clear() run.outputs.update(request.outputs) run.updated_at = now if request.priority is not None: run.priority = request.priority run.updated_at = now op_run_repo.save(run) return run.to_dict()
[docs] @router.post( "/{id}/retry", response_model=OpRunResponse, responses={ 200: {"description": "OpRun retry initiated"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Cannot retry"}, }, ) def retry_op_run(id: str) -> dict[str, Any]: """Retry an OpRun. Creates a new OpAttempt, increments the current_attempt counter, and resets the state to PENDING. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun with state=PENDING and incremented attempt counter. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) try: run.add_event("RETRIED", "Manual retry", {"force": True}) except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e op_run_repo.save(run) return run.to_dict()
[docs] @router.post( "/{id}/cancel", response_model=OpRunResponse, responses={ 200: {"description": "OpRun cancelled"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Run already terminal"}, }, ) def cancel_op_run(id: str) -> dict[str, Any]: """Cancel an OpRun in a non-terminal state. Sets the run state to CANCELLED. Workers polling run state will see the change and stop the container. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun with state=CANCELLED. Raises ------ HTTPException 404 if OpRun not found. 400 if run is already in a terminal state. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) if run.is_locked: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Run is already in terminal state {run.state.value}", ) try: run.add_event("CANCELLED", "Cancelled by user", None) except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e op_run_repo.save(run) return run.to_dict()
[docs] @router.get( "/{id}/events", response_model=EventsResponse, responses={ 200: {"description": "Events retrieved"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def poll_op_run_events(id: str, after: int | None = None) -> dict[str, Any]: """Poll OpRun events. Returns events using a polling-based mechanism. You can provide the starting event index to receive only events after that index. Parameters ---------- id : str The unique identifier of the OpRun. after : int | None Optional: Only events after this index will be returned. Returns ------- dict[str, Any] Object containing list of events with their IDs and current state. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) # Get all events all_events = run.events # Filter by 'after' if provided start_idx = after + 1 if after is not None else 0 filtered_events = all_events[start_idx:] # Build response with event IDs events_response = [] for idx, event in enumerate(filtered_events, start=start_idx): events_response.append( { "id": idx, "type": event.type, "t": event.t.isoformat(), "msg": event.msg, "state": run.state.value, "payload": dict(event.payload), } ) return {"events": events_response}
# ----------------------------------------------------------------------------- # Get OpAttempts # -----------------------------------------------------------------------------
[docs] @router.get( "/{id}/attempts", response_model=list[OpAttemptResponse], responses={ 200: {"description": "List of OpAttempts"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def get_op_attempts(id: str) -> list[dict[str, Any]]: """Get all attempts for an OpRun. Returns the full history of execution attempts for the run. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- list[dict[str, Any]] List of all OpAttempts for this run. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) return [attempt.to_dict() for attempt in run.attempts]
[docs] @router.post( "/{id}/events", response_model=OpRunResponse, responses={ 200: {"description": "Event added successfully"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Invalid event or run locked"}, }, ) def add_event_to_run(id: str, request: AddEventRequest) -> dict[str, Any]: """Add an event to the current attempt of an OpRun. This endpoint is typically called by the gyoza worker during execution. The event is added to the current attempt's timeline. Special event types with payloads: - ``STARTED`` — payload: ``{"worker_id": str, "gpu_id": int}`` (gpu_id optional). - ``PROGRESS`` — msg: int (0-100), updates the progress field automatically. - ``COMPLETED`` — payload: ``{"outputs": dict}`` (optional). - ``FAILED`` — payload: ``{"error_message": str}`` (optional). If retries remain, the run is automatically retried (state set back to PENDING). - ``CANCELLED`` — no payload. Parameters ---------- id : str The unique identifier of the OpRun. request : AddEventRequest Event details (type, message, and optional payload). Returns ------- dict[str, Any] The updated OpRun object. Raises ------ HTTPException 404 if OpRun not found, 400 if run is locked or event is invalid. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) try: run.add_event(request.type, request.msg, request.payload) except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e op_run_repo.save(run) return run.to_dict()