"""Pydantic models for API request/response validation."""
from typing import Any
from pydantic import BaseModel, Field
[docs]
class ConstraintsModel(BaseModel):
"""Hardware constraints model."""
ram_mb: int = 4096
vram_mb: int = 0
cpu: int = 1
[docs]
class RetryPolicyModel(BaseModel):
"""Retry policy model."""
max_attempts: int = 3
backoff_ms: int = 1000
[docs]
class EventDeliveryModel(BaseModel):
"""Event delivery configuration model."""
topic: str = ""
attributes: dict[str, str] | None = None
[docs]
class OpDefinitionCreate(BaseModel):
"""Request model for creating/updating OpDefinition."""
id: str = Field(..., description="Unique identifier for the operation")
version: str = Field(..., description="Semantic version (e.g., '1.0.0')")
image: str = Field(..., description="Docker image identifier")
input_specs: dict[str, InputSpecModel] | None = None
constraints: ConstraintsModel | None = None
retry_policy: RetryPolicyModel | None = None
event_delivery: EventDeliveryModel | None = None
[docs]
class OpDefinitionResponse(BaseModel):
"""Response model for OpDefinition."""
id: str
version: str
image: str
input_specs: dict[str, Any]
constraints: dict[str, Any]
retry_policy: dict[str, Any]
event_delivery: dict[str, Any]
createdAt: str
updatedAt: str
[docs]
class OpRunCreateFromDefinition(BaseModel):
"""Request model for creating OpRun from OpDefinition.
This is used when creating a run from a template (OpDefinition).
For creating runs from scratch, use OpRunCreate (to be implemented).
"""
inputs: dict[str, Any] = Field(default_factory=dict)
priority: int = Field(default=0, ge=0, le=10)
[docs]
class OpRunResponse(BaseModel):
"""Response model for OpRun."""
id: str
state: str
priority: int
progress: int
current_attempt: int
image: str
inputs: dict[str, Any]
outputs: dict[str, Any]
events: list[dict[str, Any]]
execution_summary: dict[str, Any]
constraints: dict[str, Any]
retry_policy: dict[str, Any]
event_delivery: dict[str, Any]
op_definition: str | None
createdAt: str
updatedAt: str
[docs]
class OpRunCreate(BaseModel):
"""Request model for creating OpRun directly (ad-hoc).
This is used when creating a run from scratch without a template.
For creating runs from OpDefinition, use OpRunCreateFromDefinition.
"""
image: str = Field(..., description="Docker image identifier")
priority: int = Field(default=0, ge=0, le=10)
inputs: dict[str, Any] = Field(default_factory=dict)
constraints: ConstraintsModel | None = None
retry_policy: RetryPolicyModel | None = None
event_delivery: EventDeliveryModel | None = None
[docs]
class OpRunUpdate(BaseModel):
"""Request model for updating OpRun via PATCH."""
state: str | None = None
outputs: dict[str, Any] | None = None
priority: int | None = Field(default=None, ge=0, le=10)
[docs]
class OpAttemptResponse(BaseModel):
"""Response model for OpAttempt."""
id: str
op_run_id: str
attempt: int
state: str
progress: int
events: list[dict[str, Any]]
inputs: dict[str, Any]
outputs: dict[str, Any]
execution_summary: dict[str, Any]
constraints: dict[str, Any]
started_at: str | None
finished_at: str | None
[docs]
class AddEventRequest(BaseModel):
"""Request model for adding an event to an attempt."""
type: str = Field(
..., description="Event type (STARTED, INFO, PROGRESS, COMPLETED, FAILED, etc.)"
)
msg: str | int = Field(..., description="Event message or progress value")
payload: dict[str, Any] | None = Field(
default=None, description="Optional event-specific data"
)
[docs]
class EventEntryResponse(BaseModel):
"""Response model for a single event entry."""
id: int
type: str
t: str
msg: str | int
state: str
[docs]
class EventsResponse(BaseModel):
"""Response model for polling events."""
events: list[EventEntryResponse]
[docs]
class ErrorResponse(BaseModel):
"""Error response model."""
detail: str
errors: list[str] | None = None
[docs]
class GPUModel(BaseModel):
"""GPU resource model."""
id: int
vram_mb: int
tags: list[str] = Field(default_factory=list)
[docs]
class ResourcesModel(BaseModel):
"""Hardware resources model."""
cpu_cores: int
ram_mb: int
gpus: list[GPUModel] = Field(default_factory=list)
[docs]
class HeartbeatRequest(BaseModel):
"""Request model for worker heartbeat."""
worker_id: str = Field(..., description="Worker identifier")
resources: ResourcesModel
tags: list[str] = Field(default_factory=list)
running_ops: list[dict[str, Any]] | None = Field(
default=None, description="List of running OpRuns"
)
[docs]
class WorkerResponse(BaseModel):
"""Response model for Worker."""
id: str
resources: dict[str, Any]
tags: list[str]
running_ops: list[dict[str, Any]]
created_at: str
last_heartbeat_at: str
[docs]
class WorkerOpRunResponse(BaseModel):
"""Response model for WorkerOpRun."""
id: str
image: str
inputs: dict[str, Any]
constraints: dict[str, Any]
[docs]
class ClaimOpsResponse(BaseModel):
"""Response model for claim ops endpoint."""
ops: list[WorkerOpRunResponse]