Source code for gyoza.client._client

"""Unified HTTP client for the gyoza server API.

Used by the worker (heartbeat, claim ops, send events) and the CLI
deployment pipeline (upsert definitions, create runs).
"""

from __future__ import annotations

from typing import Any

import httpx


[docs] class GyozaClient: """HTTP client covering all gyoza server API endpoints. Parameters ---------- base_url : str Base URL of the gyoza server (e.g. ``"http://localhost:5555"``). api_key : str | None API key sent as the ``X-API-Key`` header on every request. timeout : float Per-request timeout in seconds. """ def __init__( self, base_url: str, api_key: str | None = None, timeout: float = 30.0, ) -> None: headers = {"X-API-Key": api_key} if api_key else {} self._http = httpx.Client( base_url=base_url, headers=headers, timeout=timeout, ) # ------------------------------------------------------------------ # Definitions # ------------------------------------------------------------------
[docs] def upsert_definition(self, op_dict: dict[str, Any]) -> dict[str, Any]: """Create or replace an OpDefinition on the server. Parameters ---------- op_dict : dict[str, Any] Serialised OpDefinition payload (from ``OpDefinition.to_dict()``). Returns ------- dict[str, Any] The created or updated OpDefinition as returned by the server. Raises ------ httpx.HTTPStatusError On non-2xx responses. """ response = self._http.post("/definitions", json=op_dict) response.raise_for_status() return response.json()
[docs] def get_definition(self, name: str, version: str | None = None) -> dict[str, Any]: """Fetch an OpDefinition by name and optional version. Parameters ---------- name : str OpDefinition identifier. version : str | None Specific version to retrieve; omit to get the latest. Returns ------- dict[str, Any] The OpDefinition data. Raises ------ httpx.HTTPStatusError 404 if the definition does not exist. """ params = {"version": version} if version else {} response = self._http.get(f"/definitions/{name}", params=params) response.raise_for_status() return response.json()
[docs] def list_definitions(self) -> list[dict[str, Any]]: """Fetch all registered OpDefinitions (all versions). Returns ------- list[dict[str, Any]] All OpDefinitions stored on the server. Raises ------ httpx.HTTPStatusError On non-2xx responses. """ response = self._http.get("/definitions") response.raise_for_status() return response.json()
# ------------------------------------------------------------------ # Runs # ------------------------------------------------------------------
[docs] def create_run( self, image: str, inputs: dict[str, Any], *, priority: int = 5, constraints: dict[str, Any] | None = None, retry_policy: dict[str, Any] | None = None, event_delivery: dict[str, Any] | None = None, ) -> dict[str, Any]: """Create an ad-hoc OpRun without an OpDefinition. Parameters ---------- image : str Docker image reference to execute. inputs : dict[str, Any] Input parameters for the run. priority : int Scheduling priority (higher value = higher priority). constraints : dict[str, Any] | None Hardware requirements override. retry_policy : dict[str, Any] | None Retry behaviour override. event_delivery : dict[str, Any] | None Event delivery configuration override. Returns ------- dict[str, Any] The created OpRun. Raises ------ httpx.HTTPStatusError On non-2xx responses. """ payload: dict[str, Any] = { "image": image, "inputs": inputs, "priority": priority, } if constraints is not None: payload["constraints"] = constraints if retry_policy is not None: payload["retry_policy"] = retry_policy if event_delivery is not None: payload["event_delivery"] = event_delivery response = self._http.post("/runs", json=payload) response.raise_for_status() return response.json()
[docs] def create_run_from_definition( self, name: str, inputs: dict[str, Any], *, version: str | None = None, priority: int = 5, ) -> dict[str, Any]: """Create an OpRun from a registered OpDefinition. Parameters ---------- name : str OpDefinition identifier. inputs : dict[str, Any] Input parameters for the run. version : str | None Specific definition version to use; omit for latest. priority : int Scheduling priority. Returns ------- dict[str, Any] The created OpRun. Raises ------ httpx.HTTPStatusError 404 if the definition does not exist, 400 on invalid inputs. """ params = {"version": version} if version else {} payload: dict[str, Any] = {"inputs": inputs, "priority": priority} response = self._http.post( f"/definitions/{name}/runs", json=payload, params=params ) response.raise_for_status() return response.json()
[docs] def get_run(self, run_id: str) -> dict[str, Any]: """Fetch an OpRun by ID. Parameters ---------- run_id : str Unique identifier of the OpRun. Returns ------- dict[str, Any] Full OpRun data including state, events, and attempts. Raises ------ httpx.HTTPStatusError 404 if the run does not exist. """ response = self._http.get(f"/runs/{run_id}") response.raise_for_status() return response.json()
[docs] def update_run( self, run_id: str, *, state: str | None = None, outputs: dict[str, Any] | None = None, priority: int | None = None, ) -> dict[str, Any]: """Partially update an OpRun's state, outputs, or priority. Parameters ---------- run_id : str Unique identifier of the OpRun. state : str | None New state value (see ``OpRunState``). outputs : dict[str, Any] | None Output data to attach to the run. priority : int | None New scheduling priority. Returns ------- dict[str, Any] The updated OpRun. Raises ------ httpx.HTTPStatusError 404 if not found, 400 on invalid state transition. """ payload: dict[str, Any] = {} if state is not None: payload["state"] = state if outputs is not None: payload["outputs"] = outputs if priority is not None: payload["priority"] = priority response = self._http.patch(f"/runs/{run_id}", json=payload) response.raise_for_status() return response.json()
[docs] def retry_run(self, run_id: str) -> dict[str, Any]: """Trigger a retry for a failed OpRun. Parameters ---------- run_id : str Unique identifier of the OpRun. Returns ------- dict[str, Any] The OpRun reset to PENDING with an incremented attempt counter. Raises ------ httpx.HTTPStatusError 404 if not found, 400 if max attempts reached. """ response = self._http.post(f"/runs/{run_id}/retry") response.raise_for_status() return response.json()
[docs] def add_event( self, run_id: str, event_type: str, msg: str | int, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: """Append an event to the current attempt of an OpRun. Called by the worker during execution to report progress, completion, or failure. Special event types (``STARTED``, ``COMPLETED``, ``FAILED``, ``CANCELLED``) trigger state transitions on the run. Parameters ---------- run_id : str Unique identifier of the OpRun. event_type : str Event type string (see ``EventType``). msg : str | int Human-readable message or progress value (0–100 for PROGRESS). payload : dict[str, Any] | None Optional event-specific data (e.g. ``{"outputs": {...}}`` for COMPLETED, ``{"error_message": "..."}`` for FAILED). Returns ------- dict[str, Any] The updated OpRun. Raises ------ httpx.HTTPStatusError 404 if not found, 400 if the run is locked or event is invalid. """ body: dict[str, Any] = {"type": event_type, "msg": msg} if payload is not None: body["payload"] = payload response = self._http.post(f"/runs/{run_id}/events", json=body) response.raise_for_status() return response.json()
[docs] def poll_events(self, run_id: str, after: int | None = None) -> dict[str, Any]: """Poll events for an OpRun, optionally from a given index. Parameters ---------- run_id : str Unique identifier of the OpRun. after : int | None Return only events with index greater than this value. Omit to retrieve all events from the beginning. Returns ------- dict[str, Any] ``{"events": [...]}`` where each entry has ``id``, ``type``, ``t``, ``msg``, and ``state``. Raises ------ httpx.HTTPStatusError 404 if the run does not exist. """ params = {"after": after} if after is not None else {} response = self._http.get(f"/runs/{run_id}/events", params=params) response.raise_for_status() return response.json()
[docs] def get_attempts(self, run_id: str) -> list[dict[str, Any]]: """Fetch all execution attempts for an OpRun. Parameters ---------- run_id : str Unique identifier of the OpRun. Returns ------- list[dict[str, Any]] Ordered list of all OpAttempts for this run. Raises ------ httpx.HTTPStatusError 404 if the run does not exist. """ response = self._http.get(f"/runs/{run_id}/attempts") response.raise_for_status() return response.json()
# ------------------------------------------------------------------ # Workers # ------------------------------------------------------------------
[docs] def list_workers(self) -> list[dict[str, Any]]: """Fetch all workers registered with the server. Returns ------- list[dict[str, Any]] All workers with their resources, tags, and status. Raises ------ httpx.HTTPStatusError On non-2xx responses. """ response = self._http.get("/workers") response.raise_for_status() return response.json()
[docs] def heartbeat( self, worker_id: str, resources: dict[str, Any], tags: list[str] | None = None, running_ops: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Register or update a worker via heartbeat. Parameters ---------- worker_id : str Unique identifier for the worker. resources : dict[str, Any] Worker resources with keys ``cpu_cores``, ``ram_mb``, and ``gpus`` (list of ``{"id": int, "vram_mb": int, "tags": [...]}``) tags : list[str] | None Worker capability tags (e.g. ``["gpu", "high-mem"]``). running_ops : list[dict[str, Any]] | None Currently executing ops as ``WorkerOpRun`` dicts. Returns ------- dict[str, Any] The created or updated Worker object. Raises ------ httpx.HTTPStatusError On non-2xx responses. """ payload: dict[str, Any] = { "worker_id": worker_id, "resources": resources, "tags": tags or [], } if running_ops is not None: payload["running_ops"] = running_ops response = self._http.post("/workers/heartbeat", json=payload) response.raise_for_status() return response.json()
[docs] def claim_ops(self, worker_id: str) -> list[dict[str, Any]]: """Request work allocation for a worker. Parameters ---------- worker_id : str Unique identifier of the worker requesting ops. Returns ------- list[dict[str, Any]] List of claimed ``WorkerOpRun`` objects (may be empty). Raises ------ httpx.HTTPStatusError On non-2xx responses. """ response = self._http.post(f"/workers/{worker_id}/claim") response.raise_for_status() return response.json().get("ops", [])
# ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[docs] def close(self) -> None: """Close the underlying HTTP connection pool.""" self._http.close()
def __enter__(self) -> GyozaClient: return self def __exit__(self, *args: object) -> None: self.close()