gyoza.worker#

gyoza worker runtime.

Provides the heartbeat loop, execution pipeline, and worker context used by the worker process to claim and execute op runs.

class gyoza.worker.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''#
image: str = ''#
constraints: dict[str, Any]#
inputs: dict[str, Any]#
outputs: dict[str, Any]#
env_vars: dict[str, str]#
volumes: dict[str, str]#
logs: str = ''#
exit_code: int | None = None#
extras: dict[str, Any]#
class gyoza.worker.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#

Bases: object

Singleton context storing worker state for heartbeats.

Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[RunningOp]) – Running OpRuns with last known server state.

worker_id: str = ''#
resources: Resources#
tags: list[str]#
running_ops: list[RunningOp]#
add_running_op(oprun_id, image, inputs, constraints=None)[source]#

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

  • constraints (dict[str, Any] | None) – Hardware constraints (for heartbeat payload).

Return type:

None

remove_running_op(oprun_id)[source]#

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

set_run_state(run_id, state)[source]#

Update the last known server state for a running op.

Parameters:
  • run_id (str) – OpRun identifier.

  • state (str) – Current state from server (e.g. RUNNING, CANCELLED).

Return type:

None

get_run_state(run_id)[source]#

Return the last known server state for a run, or None.

Parameters:

run_id (str) – OpRun identifier.

Returns:

State string or None if not found.

Return type:

str | None

is_run_cancelled(run_id)[source]#

Return True if the run is known to be CANCELLED on the server.

Parameters:

run_id (str) – OpRun identifier.

Returns:

True if state is CANCELLED.

Return type:

bool

clear_running_ops()[source]#

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]#

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

gyoza.worker.start()[source]#

Start the worker heartbeat and execution loop.

Blocks until interrupted with Ctrl-C or a signal. On each tick:

  1. Sends a heartbeat to the server (registers/updates the worker).

  2. Claims pending op runs allocated to this worker.

  3. Dispatches each claimed run to a background thread.

Return type:

None

Worker main loop.

Entry point called by gyoza worker start. Runs the heartbeat/claim cycle in a blocking loop and dispatches op runs to a thread pool.

gyoza.worker.runner.start()[source]#

Start the worker heartbeat and execution loop.

Blocks until interrupted with Ctrl-C or a signal. On each tick:

  1. Sends a heartbeat to the server (registers/updates the worker).

  2. Claims pending op runs allocated to this worker.

  3. Dispatches each claimed run to a background thread.

Return type:

None

Execution Pipeline#

The Execution pipeline provides a pre-configured execution pipeline that can be used to run the full execution of an OpRun.

Example

>>> from gyoza.worker.execution_pipeline import execution_pipeline, ExecutionContext
>>>
>>> ctx = ExecutionContext(
...     oprun_id="run_abc123",
...     image="geoiahub/product:latest",
...     inputs={"param": "value"},
... )
>>> result = execution_pipeline.run(ctx)
>>> print(result.exit_code)
class gyoza.worker.execution_pipeline.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''
image: str = ''
constraints: dict[str, Any]
inputs: dict[str, Any]
outputs: dict[str, Any]
env_vars: dict[str, str]
volumes: dict[str, str]
logs: str = ''
exit_code: int | None = None
extras: dict[str, Any]
class gyoza.worker.execution_pipeline.Step(*args, **kwargs)[source]

Bases: Protocol

A single step in the pipeline.

Steps receive the context, process it, and return the (possibly mutated) context.

Parameters:

ctx (ExecutionContext) – Mutable execution context shared across all steps.

Returns:

The context (same or mutated) for the next step.

Return type:

ExecutionContext

class gyoza.worker.execution_pipeline.Pipeline(steps=None)[source]

Bases: object

A sequence of steps executed in order.

Parameters:

steps (list[Step] | None) – Ordered list of steps to execute.

run(ctx=None)[source]

Execute all steps in sequence.

If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.

Parameters:

ctx (ExecutionContext | None) – Shared context. Created if not provided.

Returns:

The context after all steps have been applied.

Return type:

ExecutionContext

Raises:

Exception – Whatever exception the failing step raised.

gyoza.worker.execution_pipeline.pull_latest_image(ctx)[source]

Pull the latest version of the Docker image before execution.

Parameters:

ctx (ExecutionContext) – Execution context with image name configured.

Returns:

Unchanged context after pulling the image.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.start_oprun(ctx)[source]

Send STARTED event to the master API.

Parameters:

ctx (ExecutionContext) – Execution context with oprun_id.

Returns:

Unchanged context.

Return type:

ExecutionContext

Pipeline implementation.

class gyoza.worker.execution_pipeline.pipeline.Pipeline(steps=None)[source]#

Bases: object

A sequence of steps executed in order.

Parameters:

steps (list[Step] | None) – Ordered list of steps to execute.

run(ctx=None)[source]#

Execute all steps in sequence.

If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.

Parameters:

ctx (ExecutionContext | None) – Shared context. Created if not provided.

Returns:

The context after all steps have been applied.

Return type:

ExecutionContext

Raises:

Exception – Whatever exception the failing step raised.

Execution context for the pipeline.

class gyoza.worker.execution_pipeline.types.context.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''#
image: str = ''#
constraints: dict[str, Any]#
inputs: dict[str, Any]#
outputs: dict[str, Any]#
env_vars: dict[str, str]#
volumes: dict[str, str]#
logs: str = ''#
exit_code: int | None = None#
extras: dict[str, Any]#

Step protocol for the execution pipeline.

class gyoza.worker.execution_pipeline.types.step.Step(*args, **kwargs)[source]#

Bases: Protocol

A single step in the pipeline.

Steps receive the context, process it, and return the (possibly mutated) context.

Parameters:

ctx (ExecutionContext) – Mutable execution context shared across all steps.

Returns:

The context (same or mutated) for the next step.

Return type:

ExecutionContext

Steps#

class gyoza.worker.execution_pipeline.steps.prepare_inputs.PrepareInputs[source]#

Bases: object

Prepare the inputs for the container execution.

Creates a unique gyoza directory for the run, saves inputs as a pickle file, and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.

Pull latest image step - ensures the latest version of the image is available.

gyoza.worker.execution_pipeline.steps.pull_latest_image.pull_latest_image(ctx)[source]#

Pull the latest version of the Docker image before execution.

Parameters:

ctx (ExecutionContext) – Execution context with image name configured.

Returns:

Unchanged context after pulling the image.

Return type:

ExecutionContext

Run container step - executes container and streams logs.

class gyoza.worker.execution_pipeline.steps.run_container.RunContainer[source]#

Bases: object

Run the container and stream logs.

Runs the container attached, streaming logs in real-time. Logs compatible with the gyoza events are parsed and sent to master.

Start OpRun step - sends STARTED event to master.

gyoza.worker.execution_pipeline.steps.start_oprun.start_oprun(ctx)[source]#

Send STARTED event to the master API.

Parameters:

ctx (ExecutionContext) – Execution context with oprun_id.

Returns:

Unchanged context.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.steps.complete_oprun.complete_oprun(ctx)[source]#

Send COMPLETED or FAILED event to the master API based on exit code.

Loads outputs from the pickle file and sends them as payload on success. If the server returns 400 or 409 (e.g. run already terminal or not assigned to this worker), the update is skipped and context is returned without raising, so the worker does not treat a reclaimed run as a local failure.

Parameters:

ctx (ExecutionContext) – Execution context with exit_code and env_vars set.

Returns:

Context with outputs populated on success.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.steps.cleanup_op_dir.cleanup_op_dir(ctx)[source]#

Delete the op directory created by prepare_inputs.

Removes the gyoza_dir generated by the pipeline, which contains inputs, outputs, and any other files produced during execution.

Parameters:

ctx (ExecutionContext) – Execution context with gyoza_dir in extras.

Returns:

Context unchanged (directory removed from disk).

Return type:

ExecutionContext

Worker Context#

Worker context.

Provides a easy to use worker context that can be used to

track worker identity, resources, and running operations.

Example

>>> from gyoza.worker.worker_context import worker_context
>>>
>>> # Resources and worker_id are auto-detected at import time
>>> print(worker_context.worker_id)  # from GYOZA_WORKER_ID env var
>>> print(worker_context.resources.cpu_cores)
>>> print(worker_context.resources.ram_mb)
>>>
>>> # Use for heartbeat
>>> from gyoza.client import gyoza_client
>>> payload = worker_context.to_heartbeat_payload()
>>> gyoza_client.heartbeat(**payload)
class gyoza.worker.worker_context.GPU(id, vram_mb, tags=<factory>)[source]

Bases: object

Representation of a single GPU.

Parameters:
  • id (int) – Numeric GPU identifier (0, 1, 2…).

  • vram_mb (int) – VRAM in megabytes.

  • tags (list[str]) – GPU tags (e.g., “cuda”, “nvidia-a100”).

id: int
vram_mb: int
tags: list[str]
to_dict()[source]

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]

Bases: object

Hardware resources available on the worker.

Parameters:
  • cpu_cores (int) – Number of CPU cores.

  • ram_mb (int) – RAM in megabytes.

  • gpus (list[GPU]) – List of available GPUs.

cpu_cores: int = 0
ram_mb: int = 0
gpus: list[GPU]
to_dict()[source]

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.RunningOp(id, image, inputs=<factory>, constraints=<factory>, state=None)[source]

Bases: object

Simplified run representation with last known server state.

Stored in worker context for each op the worker is executing. Server state is refreshed from the main loop via GET /runs/{id}.

Parameters:
  • id (str) – OpRun identifier.

  • image (str) – Docker image.

  • inputs (dict[str, Any]) – Input parameters.

  • constraints (dict[str, Any]) – Hardware constraints (for heartbeat payload).

  • state (str | None) – Last known server state (PENDING, ASSIGNED, RUNNING, CANCELLED, COMPLETED, FAILED). None until refreshed from server.

id: str
image: str
inputs: dict[str, Any]
constraints: dict[str, Any]
state: str | None = None
to_heartbeat_dict()[source]

Dict for heartbeat running_ops (id, image, inputs, constraints).

Return type:

dict[str, Any]

class gyoza.worker.worker_context.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]

Bases: object

Singleton context storing worker state for heartbeats.

Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[RunningOp]) – Running OpRuns with last known server state.

worker_id: str = ''
resources: Resources
tags: list[str]
running_ops: list[RunningOp]
add_running_op(oprun_id, image, inputs, constraints=None)[source]

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

  • constraints (dict[str, Any] | None) – Hardware constraints (for heartbeat payload).

Return type:

None

remove_running_op(oprun_id)[source]

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

set_run_state(run_id, state)[source]

Update the last known server state for a running op.

Parameters:
  • run_id (str) – OpRun identifier.

  • state (str) – Current state from server (e.g. RUNNING, CANCELLED).

Return type:

None

get_run_state(run_id)[source]

Return the last known server state for a run, or None.

Parameters:

run_id (str) – OpRun identifier.

Returns:

State string or None if not found.

Return type:

str | None

is_run_cancelled(run_id)[source]

Return True if the run is known to be CANCELLED on the server.

Parameters:

run_id (str) – OpRun identifier.

Returns:

True if state is CANCELLED.

Return type:

bool

clear_running_ops()[source]

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

gyoza.worker.worker_context.detect_resources()[source]

Auto-detect hardware resources available on this machine.

Returns:

Detected CPU cores, RAM, and GPUs.

Return type:

Resources

Types for the worker context module.

class gyoza.worker.worker_context.types.RunningOp(id, image, inputs=<factory>, constraints=<factory>, state=None)[source]#

Bases: object

Simplified run representation with last known server state.

Stored in worker context for each op the worker is executing. Server state is refreshed from the main loop via GET /runs/{id}.

Parameters:
  • id (str) – OpRun identifier.

  • image (str) – Docker image.

  • inputs (dict[str, Any]) – Input parameters.

  • constraints (dict[str, Any]) – Hardware constraints (for heartbeat payload).

  • state (str | None) – Last known server state (PENDING, ASSIGNED, RUNNING, CANCELLED, COMPLETED, FAILED). None until refreshed from server.

id: str#
image: str#
inputs: dict[str, Any]#
constraints: dict[str, Any]#
state: str | None = None#
to_heartbeat_dict()[source]#

Dict for heartbeat running_ops (id, image, inputs, constraints).

Return type:

dict[str, Any]

class gyoza.worker.worker_context.types.GPU(id, vram_mb, tags=<factory>)[source]#

Bases: object

Representation of a single GPU.

Parameters:
  • id (int) – Numeric GPU identifier (0, 1, 2…).

  • vram_mb (int) – VRAM in megabytes.

  • tags (list[str]) – GPU tags (e.g., “cuda”, “nvidia-a100”).

id: int#
vram_mb: int#
tags: list[str]#
to_dict()[source]#

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.types.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]#

Bases: object

Hardware resources available on the worker.

Parameters:
  • cpu_cores (int) – Number of CPU cores.

  • ram_mb (int) – RAM in megabytes.

  • gpus (list[GPU]) – List of available GPUs.

cpu_cores: int = 0#
ram_mb: int = 0#
gpus: list[GPU]#
to_dict()[source]#

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.types.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#

Bases: object

Singleton context storing worker state for heartbeats.

Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[RunningOp]) – Running OpRuns with last known server state.

worker_id: str = ''#
resources: Resources#
tags: list[str]#
running_ops: list[RunningOp]#
add_running_op(oprun_id, image, inputs, constraints=None)[source]#

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

  • constraints (dict[str, Any] | None) – Hardware constraints (for heartbeat payload).

Return type:

None

remove_running_op(oprun_id)[source]#

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

set_run_state(run_id, state)[source]#

Update the last known server state for a running op.

Parameters:
  • run_id (str) – OpRun identifier.

  • state (str) – Current state from server (e.g. RUNNING, CANCELLED).

Return type:

None

get_run_state(run_id)[source]#

Return the last known server state for a run, or None.

Parameters:

run_id (str) – OpRun identifier.

Returns:

State string or None if not found.

Return type:

str | None

is_run_cancelled(run_id)[source]#

Return True if the run is known to be CANCELLED on the server.

Parameters:

run_id (str) – OpRun identifier.

Returns:

True if state is CANCELLED.

Return type:

bool

clear_running_ops()[source]#

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]#

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

Utilities for auto-detecting worker resources.

gyoza.worker.worker_context.utils.detect_resources()[source]#

Auto-detect hardware resources available on this machine.

Returns:

Detected CPU cores, RAM, and GPUs.

Return type:

Resources