Source code for gyoza.server.api.routers.runs

"""OpRun router - execution run management endpoints."""

from typing import Any

from fastapi import APIRouter, HTTPException, status

from gyoza.server.api.models import (
    AddEventRequest,
    ErrorResponse,
    EventsResponse,
    OpAttemptResponse,
    OpRunCreate,
    OpRunResponse,
    OpRunUpdate,
)
from gyoza.server.op_run import (
    Constraints,
    EventDelivery,
    OpRun,
    OpRunState,
    RetryPolicy,
    op_run_repo,
)

router = APIRouter()


[docs] @router.post( "", response_model=OpRunResponse, status_code=status.HTTP_201_CREATED, responses={ 201: {"description": "OpRun created"}, 400: {"model": ErrorResponse, "description": "Invalid input"}, }, ) def create_op_run(request: OpRunCreate) -> dict[str, Any]: """Create an OpRun directly (ad-hoc). This endpoint allows creating execution runs without a parent OpDefinition. Useful for ad-hoc usage and development testing. Parameters ---------- request : OpRunCreate Run creation parameters including image, inputs, and constraints. Returns ------- dict[str, Any] The created OpRun with auto-generated ID and default values. Examples -------- >>> POST /runs >>> { ... "image": "geoiahub/my_product:latest", ... "priority": 4, ... "inputs": {"param": "value"}, ... "constraints": {"ram_mb": 4096} ... } """ # Build constraints constraints = Constraints() if request.constraints: constraints = Constraints( ram_mb=request.constraints.ram_mb, vram_mb=request.constraints.vram_mb, cpu=request.constraints.cpu, ) # Build retry policy retry_policy = RetryPolicy() if request.retry_policy: retry_policy = RetryPolicy( max_attempts=request.retry_policy.max_attempts, backoff_ms=request.retry_policy.backoff_ms, ) # Build event delivery event_delivery = None if request.event_delivery: event_delivery = EventDelivery( topic=request.event_delivery.topic, attributes=request.event_delivery.attributes or {}, ) # Create OpRun run = OpRun.create( image=request.image, priority=request.priority, inputs=request.inputs, constraints=constraints, retry_policy=retry_policy, event_delivery=event_delivery, op_definition=None, # Ad-hoc run ) op_run_repo.save(run) return run.to_dict()
[docs] @router.get( "/{id}", response_model=OpRunResponse, responses={ 200: {"description": "OpRun retrieved"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def get_op_run(id: str) -> dict[str, Any]: """Get an OpRun by ID. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun object with all its data. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) return run.to_dict()
[docs] @router.patch( "/{id}", response_model=OpRunResponse, responses={ 200: {"description": "OpRun updated"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Invalid update"}, }, ) def update_op_run(id: str, request: OpRunUpdate) -> dict[str, Any]: """Update an OpRun. Allows updating state, outputs, and priority of a run. Parameters ---------- id : str The unique identifier of the OpRun. request : OpRunUpdate Fields to update. Returns ------- dict[str, Any] The updated OpRun object. Raises ------ HTTPException 404 if OpRun not found. 400 if invalid state or update. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) # Update state if provided if request.state is not None: new_state = OpRunState(request.state) run.set_state(new_state) # Update outputs if provided if request.outputs is not None: run.set_outputs(request.outputs) # Update priority if provided if request.priority is not None: run.set_priority(request.priority) op_run_repo.save(run) return run.to_dict()
[docs] @router.post( "/{id}/retry", response_model=OpRunResponse, responses={ 200: {"description": "OpRun retry initiated"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Cannot retry"}, }, ) def retry_op_run(id: str) -> dict[str, Any]: """Retry an OpRun. Creates a new OpAttempt, increments the current_attempt counter, and resets the state to PENDING. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun with state=PENDING and incremented attempt counter. Raises ------ HTTPException 404 if OpRun not found. 400 if max attempts reached. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) try: run.retry() except ValueError as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e op_run_repo.save(run) return run.to_dict()
[docs] @router.get( "/{id}/events", response_model=EventsResponse, responses={ 200: {"description": "Events retrieved"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def poll_op_run_events(id: str, after: int | None = None) -> dict[str, Any]: """Poll OpRun events. Returns events using a polling-based mechanism. You can provide the starting event index to receive only events after that index. Parameters ---------- id : str The unique identifier of the OpRun. after : int | None Optional: Only events after this index will be returned. Returns ------- dict[str, Any] Object containing list of events with their IDs and current state. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) # Get all events all_events = run.events # Filter by 'after' if provided start_idx = after + 1 if after is not None else 0 filtered_events = all_events[start_idx:] # Build response with event IDs events_response = [] for idx, event in enumerate(filtered_events, start=start_idx): events_response.append( { "id": idx, "type": event.type, "t": event.t.isoformat(), "msg": event.msg, "state": run.state.value, } ) return {"events": events_response}
# ----------------------------------------------------------------------------- # Get OpAttempts # -----------------------------------------------------------------------------
[docs] @router.get( "/{id}/attempts", response_model=list[OpAttemptResponse], responses={ 200: {"description": "List of OpAttempts"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, }, ) def get_op_attempts(id: str) -> list[dict[str, Any]]: """Get all attempts for an OpRun. Returns the full history of execution attempts for the run. Parameters ---------- id : str The unique identifier of the OpRun. Returns ------- list[dict[str, Any]] List of all OpAttempts for this run. Raises ------ HTTPException 404 if OpRun not found. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) return [attempt.to_dict() for attempt in run.attempts]
[docs] @router.post( "/{id}/events", response_model=OpRunResponse, responses={ 200: {"description": "Event added successfully"}, 404: {"model": ErrorResponse, "description": "OpRun not found"}, 400: {"model": ErrorResponse, "description": "Invalid event or run locked"}, }, ) def add_event_to_run(id: str, request: AddEventRequest) -> dict[str, Any]: """Add an event to the current attempt of an OpRun. This endpoint is typically called by the gyoza worker during execution. The event is added to the current attempt's timeline. Special event types with payloads: - ``STARTED`` — payload: ``{"worker_id": str, "gpu_id": int}`` (gpu_id optional). - ``PROGRESS`` — msg: int (0-100), updates the progress field automatically. - ``COMPLETED`` — payload: ``{"outputs": dict}`` (optional). - ``FAILED`` — payload: ``{"error_message": str}`` (optional). - ``CANCELLED`` — no payload. Parameters ---------- id : str The unique identifier of the OpRun. request : AddEventRequest Event details (type, message, and optional payload). Returns ------- dict[str, Any] The updated OpRun object. Raises ------ HTTPException 404 if OpRun not found, 400 if run is locked or event is invalid. """ run = op_run_repo.get(id) if not run: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"OpRun '{id}' not found", ) try: run.add_event(request.type, request.msg, request.payload) except Exception as e: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e), ) from e op_run_repo.save(run) return run.to_dict()