gyoza.worker#

gyoza worker runtime.

Provides the heartbeat loop, execution pipeline, and worker context used by the worker process to claim and execute op runs.

class gyoza.worker.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''#
image: str = ''#
constraints: dict[str, Any]#
inputs: dict[str, Any]#
outputs: dict[str, Any]#
env_vars: dict[str, str]#
volumes: dict[str, str]#
logs: str = ''#
exit_code: int | None = None#
extras: dict[str, Any]#
class gyoza.worker.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#

Bases: object

Singleton context storing worker state for heartbeats.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[dict[str, Any]]) – Running OpRuns reported at heartbeat time.

worker_id: str = ''#
resources: Resources#
tags: list[str]#
running_ops: list[dict[str, Any]]#
add_running_op(oprun_id, image, inputs)[source]#

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

Return type:

None

remove_running_op(oprun_id)[source]#

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

clear_running_ops()[source]#

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]#

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

gyoza.worker.start()[source]#

Start the worker heartbeat and execution loop.

Blocks until interrupted with Ctrl-C or a signal. On each tick:

  1. Sends a heartbeat to the server (registers/updates the worker).

  2. Claims pending op runs allocated to this worker.

  3. Dispatches each claimed run to a background thread.

Return type:

None

Worker main loop.

Entry point called by gyoza worker start. Runs the heartbeat/claim cycle in a blocking loop and dispatches op runs to a thread pool.

gyoza.worker.runner.start()[source]#

Start the worker heartbeat and execution loop.

Blocks until interrupted with Ctrl-C or a signal. On each tick:

  1. Sends a heartbeat to the server (registers/updates the worker).

  2. Claims pending op runs allocated to this worker.

  3. Dispatches each claimed run to a background thread.

Return type:

None

Execution Pipeline#

The Execution pipeline provides a pre-configured execution pipeline that can be used to run the full execution of an OpRun.

Example

>>> from gyoza.worker.execution_pipeline import execution_pipeline, ExecutionContext
>>>
>>> ctx = ExecutionContext(
...     oprun_id="run_abc123",
...     image="geoiahub/product:latest",
...     inputs={"param": "value"},
... )
>>> result = execution_pipeline.run(ctx)
>>> print(result.exit_code)
class gyoza.worker.execution_pipeline.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''
image: str = ''
constraints: dict[str, Any]
inputs: dict[str, Any]
outputs: dict[str, Any]
env_vars: dict[str, str]
volumes: dict[str, str]
logs: str = ''
exit_code: int | None = None
extras: dict[str, Any]
class gyoza.worker.execution_pipeline.Step(*args, **kwargs)[source]

Bases: Protocol

A single step in the pipeline.

Steps receive the context, process it, and return the (possibly mutated) context.

Parameters:

ctx (ExecutionContext) – Mutable execution context shared across all steps.

Returns:

The context (same or mutated) for the next step.

Return type:

ExecutionContext

class gyoza.worker.execution_pipeline.Pipeline(steps=None)[source]

Bases: object

A sequence of steps executed in order.

Parameters:

steps (list[Step]) – Ordered list of steps to execute.

run(ctx=None)[source]

Execute all steps in sequence.

If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.

Parameters:

ctx (ExecutionContext | None) – Shared context. Created if not provided.

Returns:

The context after all steps have been applied.

Return type:

ExecutionContext

Raises:

Exception – Whatever exception the failing step raised.

gyoza.worker.execution_pipeline.pull_latest_image(ctx)[source]

Pull the latest version of the Docker image before execution.

Parameters:

ctx (ExecutionContext) – Execution context with image name configured.

Returns:

Unchanged context after pulling the image.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.start_oprun(ctx)[source]

Send STARTED event to the master API.

Parameters:

ctx (ExecutionContext) – Execution context with oprun_id.

Returns:

Unchanged context.

Return type:

ExecutionContext

Pipeline implementation.

class gyoza.worker.execution_pipeline.pipeline.Pipeline(steps=None)[source]#

Bases: object

A sequence of steps executed in order.

Parameters:

steps (list[Step]) – Ordered list of steps to execute.

run(ctx=None)[source]#

Execute all steps in sequence.

If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.

Parameters:

ctx (ExecutionContext | None) – Shared context. Created if not provided.

Returns:

The context after all steps have been applied.

Return type:

ExecutionContext

Raises:

Exception – Whatever exception the failing step raised.

Execution context for the pipeline.

class gyoza.worker.execution_pipeline.types.context.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#

Bases: object

Mutable context shared across all pipeline steps.

Contains OpRun metadata and container execution state.

Parameters:
  • oprun_id (str) – Unique identifier for the operation run.

  • image (str) – Docker image identifier for the container.

  • constraints (dict[str, Any]) – Hardware constraints (ram_mb, vram_mb, cpu).

  • inputs (dict[str, Any]) – Input parameters for the operation.

  • outputs (dict[str, Any]) – Output results from the operation.

  • env_vars (dict[str, str]) – Environment variables for the container.

  • volumes (dict[str, str]) – Volume mounts for the container (host:container).

  • logs (str) – Container execution logs.

  • exit_code (int | None) – Exit code from the container process.

  • extras (dict[str, Any]) – Arbitrary key-value store for step communication.

oprun_id: str = ''#
image: str = ''#
constraints: dict[str, Any]#
inputs: dict[str, Any]#
outputs: dict[str, Any]#
env_vars: dict[str, str]#
volumes: dict[str, str]#
logs: str = ''#
exit_code: int | None = None#
extras: dict[str, Any]#

Step protocol for the execution pipeline.

class gyoza.worker.execution_pipeline.types.step.Step(*args, **kwargs)[source]#

Bases: Protocol

A single step in the pipeline.

Steps receive the context, process it, and return the (possibly mutated) context.

Parameters:

ctx (ExecutionContext) – Mutable execution context shared across all steps.

Returns:

The context (same or mutated) for the next step.

Return type:

ExecutionContext

Steps#

class gyoza.worker.execution_pipeline.steps.prepare_inputs.PrepareInputs[source]#

Bases: object

Prepare the inputs for the container execution.

Creates a unique gyoza directory for the run, saves inputs as a pickle file, and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.

Pull latest image step - ensures the latest version of the image is available.

gyoza.worker.execution_pipeline.steps.pull_latest_image.pull_latest_image(ctx)[source]#

Pull the latest version of the Docker image before execution.

Parameters:

ctx (ExecutionContext) – Execution context with image name configured.

Returns:

Unchanged context after pulling the image.

Return type:

ExecutionContext

Run container step - executes container and streams logs.

class gyoza.worker.execution_pipeline.steps.run_container.RunContainer[source]#

Bases: object

Run the container and stream logs.

Runs the container attached, streaming logs in real-time. Logs compatible with the gyoza events are parsed and sent to master.

Start OpRun step - sends STARTED event to master.

gyoza.worker.execution_pipeline.steps.start_oprun.start_oprun(ctx)[source]#

Send STARTED event to the master API.

Parameters:

ctx (ExecutionContext) – Execution context with oprun_id.

Returns:

Unchanged context.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.steps.complete_oprun.complete_oprun(ctx)[source]#

Send COMPLETED or FAILED event to the master API based on exit code.

Loads outputs from the pickle file and sends them as payload on success.

Parameters:

ctx (ExecutionContext) – Execution context with exit_code and env_vars set.

Returns:

Context with outputs populated on success.

Return type:

ExecutionContext

gyoza.worker.execution_pipeline.steps.cleanup_op_dir.cleanup_op_dir(ctx)[source]#

Delete the op directory created by prepare_inputs.

Removes the gyoza_dir generated by the pipeline, which contains inputs, outputs, and any other files produced during execution.

Parameters:

ctx (ExecutionContext) – Execution context with gyoza_dir in extras.

Returns:

Context unchanged (directory removed from disk).

Return type:

ExecutionContext

Worker Context#

Worker context.

Provides a easy to use worker context that can be used to

track worker identity, resources, and running operations.

Example

>>> from gyoza.worker.worker_context import worker_context
>>>
>>> # Resources and worker_id are auto-detected at import time
>>> print(worker_context.worker_id)  # from GYOZA_WORKER_ID env var
>>> print(worker_context.resources.cpu_cores)
>>> print(worker_context.resources.ram_mb)
>>>
>>> # Use for heartbeat
>>> from gyoza.client import gyoza_client
>>> payload = worker_context.to_heartbeat_payload()
>>> gyoza_client.heartbeat(**payload)
class gyoza.worker.worker_context.GPU(id, vram_mb, tags=<factory>)[source]

Bases: object

Representation of a single GPU.

Parameters:
  • id (int) – Numeric GPU identifier (0, 1, 2…).

  • vram_mb (int) – VRAM in megabytes.

  • tags (list[str]) – GPU tags (e.g., “cuda”, “nvidia-a100”).

id: int
vram_mb: int
tags: list[str]
to_dict()[source]

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]

Bases: object

Hardware resources available on the worker.

Parameters:
  • cpu_cores (int) – Number of CPU cores.

  • ram_mb (int) – RAM in megabytes.

  • gpus (list[GPU]) – List of available GPUs.

cpu_cores: int = 0
ram_mb: int = 0
gpus: list[GPU]
to_dict()[source]

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]

Bases: object

Singleton context storing worker state for heartbeats.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[dict[str, Any]]) – Running OpRuns reported at heartbeat time.

worker_id: str = ''
resources: Resources
tags: list[str]
running_ops: list[dict[str, Any]]
add_running_op(oprun_id, image, inputs)[source]

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

Return type:

None

remove_running_op(oprun_id)[source]

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

clear_running_ops()[source]

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

gyoza.worker.worker_context.detect_resources()[source]

Auto-detect hardware resources available on this machine.

Returns:

Detected CPU cores, RAM, and GPUs.

Return type:

Resources

Types for the worker context module.

class gyoza.worker.worker_context.types.GPU(id, vram_mb, tags=<factory>)[source]#

Bases: object

Representation of a single GPU.

Parameters:
  • id (int) – Numeric GPU identifier (0, 1, 2…).

  • vram_mb (int) – VRAM in megabytes.

  • tags (list[str]) – GPU tags (e.g., “cuda”, “nvidia-a100”).

id: int#
vram_mb: int#
tags: list[str]#
to_dict()[source]#

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.types.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]#

Bases: object

Hardware resources available on the worker.

Parameters:
  • cpu_cores (int) – Number of CPU cores.

  • ram_mb (int) – RAM in megabytes.

  • gpus (list[GPU]) – List of available GPUs.

cpu_cores: int = 0#
ram_mb: int = 0#
gpus: list[GPU]#
to_dict()[source]#

Convert to dictionary representation.

Return type:

dict[str, Any]

class gyoza.worker.worker_context.types.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#

Bases: object

Singleton context storing worker state for heartbeats.

Parameters:
  • worker_id (str) – Worker name/identifier.

  • resources (Resources) – Hardware resources available.

  • tags (list[str]) – Worker-level tags.

  • running_ops (list[dict[str, Any]]) – Running OpRuns reported at heartbeat time.

worker_id: str = ''#
resources: Resources#
tags: list[str]#
running_ops: list[dict[str, Any]]#
add_running_op(oprun_id, image, inputs)[source]#

Add an OpRun to the running operations list.

Parameters:
  • oprun_id (str) – Unique identifier for the OpRun.

  • image (str) – Docker image identifier.

  • inputs (dict[str, Any]) – Input parameters for execution.

Return type:

None

remove_running_op(oprun_id)[source]#

Remove an OpRun from the running operations list.

Parameters:

oprun_id (str) – Unique identifier for the OpRun to remove.

Return type:

None

clear_running_ops()[source]#

Clear the running operations list.

Return type:

None

to_heartbeat_payload()[source]#

Convert to heartbeat request payload.

Returns:

Payload ready for gyoza_client.heartbeat().

Return type:

dict[str, Any]

Utilities for auto-detecting worker resources.

gyoza.worker.worker_context.utils.detect_resources()[source]#

Auto-detect hardware resources available on this machine.

Returns:

Detected CPU cores, RAM, and GPUs.

Return type:

Resources