"""Unified HTTP client for the gyoza server API.
Used by the worker (heartbeat, claim ops, send events) and the CLI
deployment pipeline (upsert definitions, create runs).
"""
from __future__ import annotations
from typing import Any
import httpx
[docs]
class GyozaClient:
"""HTTP client covering all gyoza server API endpoints.
Parameters
----------
base_url : str
Base URL of the gyoza server (e.g. ``"http://localhost:5555"``).
api_key : str | None
API key sent as the ``X-API-Key`` header on every request.
timeout : float
Per-request timeout in seconds.
"""
def __init__(
self,
base_url: str,
api_key: str | None = None,
timeout: float = 30.0,
) -> None:
headers = {"X-API-Key": api_key} if api_key else {}
self._http = httpx.Client(
base_url=base_url,
headers=headers,
timeout=timeout,
)
# ------------------------------------------------------------------
# Definitions
# ------------------------------------------------------------------
[docs]
def upsert_definition(self, op_dict: dict[str, Any]) -> dict[str, Any]:
"""Create or replace an OpDefinition on the server.
Parameters
----------
op_dict : dict[str, Any]
Serialised OpDefinition payload (from ``OpDefinition.to_dict()``).
Returns
-------
dict[str, Any]
The created or updated OpDefinition as returned by the server.
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
response = self._http.post("/definitions", json=op_dict)
response.raise_for_status()
return response.json()
[docs]
def get_definition(self, name: str, version: str | None = None) -> dict[str, Any]:
"""Fetch an OpDefinition by name and optional version.
Parameters
----------
name : str
OpDefinition identifier.
version : str | None
Specific version to retrieve; omit to get the latest.
Returns
-------
dict[str, Any]
The OpDefinition data.
Raises
------
httpx.HTTPStatusError
404 if the definition does not exist.
"""
params = {"version": version} if version else {}
response = self._http.get(f"/definitions/{name}", params=params)
response.raise_for_status()
return response.json()
[docs]
def list_definitions(self) -> list[dict[str, Any]]:
"""Fetch all registered OpDefinitions (all versions).
Returns
-------
list[dict[str, Any]]
All OpDefinitions stored on the server.
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
response = self._http.get("/definitions")
response.raise_for_status()
return response.json()
# ------------------------------------------------------------------
# Runs
# ------------------------------------------------------------------
[docs]
def create_run(
self,
image: str,
inputs: dict[str, Any],
*,
priority: int = 5,
constraints: dict[str, Any] | None = None,
retry_policy: dict[str, Any] | None = None,
event_delivery: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Create an ad-hoc OpRun without an OpDefinition.
Parameters
----------
image : str
Docker image reference to execute.
inputs : dict[str, Any]
Input parameters for the run.
priority : int
Scheduling priority (higher value = higher priority).
constraints : dict[str, Any] | None
Hardware requirements override.
retry_policy : dict[str, Any] | None
Retry behaviour override.
event_delivery : dict[str, Any] | None
Event delivery configuration override.
Returns
-------
dict[str, Any]
The created OpRun.
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
payload: dict[str, Any] = {
"image": image,
"inputs": inputs,
"priority": priority,
}
if constraints is not None:
payload["constraints"] = constraints
if retry_policy is not None:
payload["retry_policy"] = retry_policy
if event_delivery is not None:
payload["event_delivery"] = event_delivery
response = self._http.post("/runs", json=payload)
response.raise_for_status()
return response.json()
[docs]
def create_run_from_definition(
self,
name: str,
inputs: dict[str, Any],
*,
version: str | None = None,
priority: int = 5,
) -> dict[str, Any]:
"""Create an OpRun from a registered OpDefinition.
Parameters
----------
name : str
OpDefinition identifier.
inputs : dict[str, Any]
Input parameters for the run.
version : str | None
Specific definition version to use; omit for latest.
priority : int
Scheduling priority.
Returns
-------
dict[str, Any]
The created OpRun.
Raises
------
httpx.HTTPStatusError
404 if the definition does not exist, 400 on invalid inputs.
"""
params = {"version": version} if version else {}
payload: dict[str, Any] = {"inputs": inputs, "priority": priority}
response = self._http.post(
f"/definitions/{name}/runs", json=payload, params=params
)
response.raise_for_status()
return response.json()
[docs]
def get_run(self, run_id: str) -> dict[str, Any]:
"""Fetch an OpRun by ID.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
Returns
-------
dict[str, Any]
Full OpRun data including state, events, and attempts.
Raises
------
httpx.HTTPStatusError
404 if the run does not exist.
"""
response = self._http.get(f"/runs/{run_id}")
response.raise_for_status()
return response.json()
[docs]
def update_run(
self,
run_id: str,
*,
state: str | None = None,
outputs: dict[str, Any] | None = None,
priority: int | None = None,
) -> dict[str, Any]:
"""Partially update an OpRun's state, outputs, or priority.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
state : str | None
New state value (see ``OpRunState``).
outputs : dict[str, Any] | None
Output data to attach to the run.
priority : int | None
New scheduling priority.
Returns
-------
dict[str, Any]
The updated OpRun.
Raises
------
httpx.HTTPStatusError
404 if not found, 400 on invalid state transition.
"""
payload: dict[str, Any] = {}
if state is not None:
payload["state"] = state
if outputs is not None:
payload["outputs"] = outputs
if priority is not None:
payload["priority"] = priority
response = self._http.patch(f"/runs/{run_id}", json=payload)
response.raise_for_status()
return response.json()
[docs]
def retry_run(self, run_id: str) -> dict[str, Any]:
"""Trigger a retry for a failed OpRun.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
Returns
-------
dict[str, Any]
The OpRun reset to PENDING with an incremented attempt counter.
Raises
------
httpx.HTTPStatusError
404 if not found, 400 if max attempts reached.
"""
response = self._http.post(f"/runs/{run_id}/retry")
response.raise_for_status()
return response.json()
[docs]
def add_event(
self,
run_id: str,
event_type: str,
msg: str | int,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Append an event to the current attempt of an OpRun.
Called by the worker during execution to report progress, completion,
or failure. Special event types (``STARTED``, ``COMPLETED``,
``FAILED``, ``CANCELLED``) trigger state transitions on the run.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
event_type : str
Event type string (see ``EventType``).
msg : str | int
Human-readable message or progress value (0–100 for PROGRESS).
payload : dict[str, Any] | None
Optional event-specific data (e.g. ``{"outputs": {...}}`` for
COMPLETED, ``{"error_message": "..."}`` for FAILED).
Returns
-------
dict[str, Any]
The updated OpRun.
Raises
------
httpx.HTTPStatusError
404 if not found, 400 if the run is locked or event is invalid.
"""
body: dict[str, Any] = {"type": event_type, "msg": msg}
if payload is not None:
body["payload"] = payload
response = self._http.post(f"/runs/{run_id}/events", json=body)
response.raise_for_status()
return response.json()
[docs]
def poll_events(self, run_id: str, after: int | None = None) -> dict[str, Any]:
"""Poll events for an OpRun, optionally from a given index.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
after : int | None
Return only events with index greater than this value.
Omit to retrieve all events from the beginning.
Returns
-------
dict[str, Any]
``{"events": [...]}`` where each entry has ``id``, ``type``,
``t``, ``msg``, and ``state``.
Raises
------
httpx.HTTPStatusError
404 if the run does not exist.
"""
params = {"after": after} if after is not None else {}
response = self._http.get(f"/runs/{run_id}/events", params=params)
response.raise_for_status()
return response.json()
[docs]
def get_attempts(self, run_id: str) -> list[dict[str, Any]]:
"""Fetch all execution attempts for an OpRun.
Parameters
----------
run_id : str
Unique identifier of the OpRun.
Returns
-------
list[dict[str, Any]]
Ordered list of all OpAttempts for this run.
Raises
------
httpx.HTTPStatusError
404 if the run does not exist.
"""
response = self._http.get(f"/runs/{run_id}/attempts")
response.raise_for_status()
return response.json()
# ------------------------------------------------------------------
# Workers
# ------------------------------------------------------------------
[docs]
def list_workers(self) -> list[dict[str, Any]]:
"""Fetch all workers registered with the server.
Returns
-------
list[dict[str, Any]]
All workers with their resources, tags, and status.
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
response = self._http.get("/workers")
response.raise_for_status()
return response.json()
[docs]
def heartbeat(
self,
worker_id: str,
resources: dict[str, Any],
tags: list[str] | None = None,
running_ops: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Register or update a worker via heartbeat.
Parameters
----------
worker_id : str
Unique identifier for the worker.
resources : dict[str, Any]
Worker resources with keys ``cpu_cores``, ``ram_mb``, and
``gpus`` (list of ``{"id": int, "vram_mb": int, "tags": [...]}``)
tags : list[str] | None
Worker capability tags (e.g. ``["gpu", "high-mem"]``).
running_ops : list[dict[str, Any]] | None
Currently executing ops as ``WorkerOpRun`` dicts.
Returns
-------
dict[str, Any]
The created or updated Worker object.
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
payload: dict[str, Any] = {
"worker_id": worker_id,
"resources": resources,
"tags": tags or [],
}
if running_ops is not None:
payload["running_ops"] = running_ops
response = self._http.post("/workers/heartbeat", json=payload)
response.raise_for_status()
return response.json()
[docs]
def claim_ops(self, worker_id: str) -> list[dict[str, Any]]:
"""Request work allocation for a worker.
Parameters
----------
worker_id : str
Unique identifier of the worker requesting ops.
Returns
-------
list[dict[str, Any]]
List of claimed ``WorkerOpRun`` objects (may be empty).
Raises
------
httpx.HTTPStatusError
On non-2xx responses.
"""
response = self._http.post(f"/workers/{worker_id}/claim")
response.raise_for_status()
return response.json().get("ops", [])
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def close(self) -> None:
"""Close the underlying HTTP connection pool."""
self._http.close()
def __enter__(self) -> GyozaClient:
return self
def __exit__(self, *args: object) -> None:
self.close()