Source code for gyoza.server.api.models

"""Pydantic models for API request/response validation."""

from typing import Any

from pydantic import BaseModel, Field


[docs] class InputSpecModel(BaseModel): """Input specification model.""" type: str required: bool = True default: Any | None = None
[docs] class ConstraintsModel(BaseModel): """Hardware constraints model.""" ram_mb: int = 4096 vram_mb: int = 0 cpu: int = 1
[docs] class RetryPolicyModel(BaseModel): """Retry policy model.""" max_attempts: int = 3 backoff_ms: int = 1000
[docs] class EventDeliveryModel(BaseModel): """Event delivery configuration model.""" topic: str = "" attributes: dict[str, str] | None = None
[docs] class OpDefinitionCreate(BaseModel): """Request model for creating/updating OpDefinition.""" id: str = Field(..., description="Unique identifier for the operation") version: str = Field(..., description="Semantic version (e.g., '1.0.0')") image: str = Field(..., description="Docker image identifier") input_specs: dict[str, InputSpecModel] | None = None constraints: ConstraintsModel | None = None retry_policy: RetryPolicyModel | None = None event_delivery: EventDeliveryModel | None = None
[docs] class OpDefinitionResponse(BaseModel): """Response model for OpDefinition.""" id: str version: str image: str input_specs: dict[str, Any] constraints: dict[str, Any] retry_policy: dict[str, Any] event_delivery: dict[str, Any] createdAt: str updatedAt: str
[docs] class OpRunCreateFromDefinition(BaseModel): """Request model for creating OpRun from OpDefinition. This is used when creating a run from a template (OpDefinition). For creating runs from scratch, use OpRunCreate (to be implemented). """ inputs: dict[str, Any] = Field(default_factory=dict) priority: int = Field(default=0, ge=0, le=10)
[docs] class OpRunResponse(BaseModel): """Response model for OpRun.""" id: str state: str priority: int progress: int current_attempt: int image: str inputs: dict[str, Any] outputs: dict[str, Any] events: list[dict[str, Any]] execution_summary: dict[str, Any] constraints: dict[str, Any] retry_policy: dict[str, Any] event_delivery: dict[str, Any] op_definition: str | None createdAt: str updatedAt: str
[docs] class OpRunCreate(BaseModel): """Request model for creating OpRun directly (ad-hoc). This is used when creating a run from scratch without a template. For creating runs from OpDefinition, use OpRunCreateFromDefinition. """ image: str = Field(..., description="Docker image identifier") priority: int = Field(default=0, ge=0, le=10) inputs: dict[str, Any] = Field(default_factory=dict) constraints: ConstraintsModel | None = None retry_policy: RetryPolicyModel | None = None event_delivery: EventDeliveryModel | None = None
[docs] class OpRunUpdate(BaseModel): """Request model for updating OpRun via PATCH.""" state: str | None = None outputs: dict[str, Any] | None = None priority: int | None = Field(default=None, ge=0, le=10)
[docs] class OpAttemptResponse(BaseModel): """Response model for OpAttempt.""" id: str op_run_id: str attempt: int state: str progress: int events: list[dict[str, Any]] inputs: dict[str, Any] outputs: dict[str, Any] execution_summary: dict[str, Any] constraints: dict[str, Any] started_at: str | None finished_at: str | None
[docs] class AddEventRequest(BaseModel): """Request model for adding an event to an attempt.""" type: str = Field( ..., description="Event type (STARTED, INFO, PROGRESS, COMPLETED, FAILED, etc.)" ) msg: str | int = Field(..., description="Event message or progress value") payload: dict[str, Any] | None = Field( default=None, description="Optional event-specific data" )
[docs] class EventEntryResponse(BaseModel): """Response model for a single event entry.""" id: int type: str t: str msg: str | int state: str
[docs] class EventsResponse(BaseModel): """Response model for polling events.""" events: list[EventEntryResponse]
[docs] class ErrorResponse(BaseModel): """Error response model.""" detail: str errors: list[str] | None = None
[docs] class GPUModel(BaseModel): """GPU resource model.""" id: int vram_mb: int tags: list[str] = Field(default_factory=list)
[docs] class ResourcesModel(BaseModel): """Hardware resources model.""" cpu_cores: int ram_mb: int gpus: list[GPUModel] = Field(default_factory=list)
[docs] class HeartbeatRequest(BaseModel): """Request model for worker heartbeat.""" worker_id: str = Field(..., description="Worker identifier") resources: ResourcesModel tags: list[str] = Field(default_factory=list) running_ops: list[dict[str, Any]] | None = Field( default=None, description="List of running OpRuns" )
[docs] class WorkerResponse(BaseModel): """Response model for Worker.""" id: str resources: dict[str, Any] tags: list[str] running_ops: list[dict[str, Any]] created_at: str last_heartbeat_at: str
[docs] class WorkerOpRunResponse(BaseModel): """Response model for WorkerOpRun.""" id: str image: str inputs: dict[str, Any] constraints: dict[str, Any]
[docs] class ClaimOpsResponse(BaseModel): """Response model for claim ops endpoint.""" ops: list[WorkerOpRunResponse]