gyoza.worker#
gyoza worker runtime.
Provides the heartbeat loop, execution pipeline, and worker context used by the worker process to claim and execute op runs.
- class gyoza.worker.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int | None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
- class gyoza.worker.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#
Bases:
objectSingleton context storing worker state for heartbeats.
- Parameters:
worker_id (
str) – Worker name/identifier.resources (
Resources) – Hardware resources available.tags (
list[str]) – Worker-level tags.running_ops (
list[dict[str,Any]]) – Running OpRuns reported at heartbeat time.
- gyoza.worker.start()[source]#
Start the worker heartbeat and execution loop.
Blocks until interrupted with Ctrl-C or a signal. On each tick:
Sends a heartbeat to the server (registers/updates the worker).
Claims pending op runs allocated to this worker.
Dispatches each claimed run to a background thread.
- Return type:
Worker main loop.
Entry point called by gyoza worker start. Runs the heartbeat/claim
cycle in a blocking loop and dispatches op runs to a thread pool.
- gyoza.worker.runner.start()[source]#
Start the worker heartbeat and execution loop.
Blocks until interrupted with Ctrl-C or a signal. On each tick:
Sends a heartbeat to the server (registers/updates the worker).
Claims pending op runs allocated to this worker.
Dispatches each claimed run to a background thread.
- Return type:
Execution Pipeline#
The Execution pipeline provides a pre-configured execution pipeline that can be used to run the full execution of an OpRun.
Example
>>> from gyoza.worker.execution_pipeline import execution_pipeline, ExecutionContext
>>>
>>> ctx = ExecutionContext(
... oprun_id="run_abc123",
... image="geoiahub/product:latest",
... inputs={"param": "value"},
... )
>>> result = execution_pipeline.run(ctx)
>>> print(result.exit_code)
- class gyoza.worker.execution_pipeline.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int | None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
- oprun_id: str = ''
- image: str = ''
- logs: str = ''
- class gyoza.worker.execution_pipeline.Step(*args, **kwargs)[source]
Bases:
ProtocolA single step in the pipeline.
Steps receive the context, process it, and return the (possibly mutated) context.
- Parameters:
ctx (
ExecutionContext) – Mutable execution context shared across all steps.- Returns:
The context (same or mutated) for the next step.
- Return type:
ExecutionContext
- class gyoza.worker.execution_pipeline.Pipeline(steps=None)[source]
Bases:
objectA sequence of steps executed in order.
- Parameters:
steps (
list[Step]) – Ordered list of steps to execute.
- run(ctx=None)[source]
Execute all steps in sequence.
If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.
- Parameters:
ctx (
ExecutionContext | None) – Shared context. Created if not provided.- Returns:
The context after all steps have been applied.
- Return type:
ExecutionContext- Raises:
Exception – Whatever exception the failing step raised.
- gyoza.worker.execution_pipeline.pull_latest_image(ctx)[source]
Pull the latest version of the Docker image before execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with image name configured.- Returns:
Unchanged context after pulling the image.
- Return type:
ExecutionContext
- gyoza.worker.execution_pipeline.start_oprun(ctx)[source]
Send STARTED event to the master API.
- Parameters:
ctx (
ExecutionContext) – Execution context with oprun_id.- Returns:
Unchanged context.
- Return type:
ExecutionContext
Pipeline implementation.
- class gyoza.worker.execution_pipeline.pipeline.Pipeline(steps=None)[source]#
Bases:
objectA sequence of steps executed in order.
- Parameters:
steps (
list[Step]) – Ordered list of steps to execute.
- run(ctx=None)[source]#
Execute all steps in sequence.
If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.
- Parameters:
ctx (
ExecutionContext | None) – Shared context. Created if not provided.- Returns:
The context after all steps have been applied.
- Return type:
ExecutionContext- Raises:
Exception – Whatever exception the failing step raised.
Execution context for the pipeline.
- class gyoza.worker.execution_pipeline.types.context.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int | None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
Step protocol for the execution pipeline.
- class gyoza.worker.execution_pipeline.types.step.Step(*args, **kwargs)[source]#
Bases:
ProtocolA single step in the pipeline.
Steps receive the context, process it, and return the (possibly mutated) context.
- Parameters:
ctx (
ExecutionContext) – Mutable execution context shared across all steps.- Returns:
The context (same or mutated) for the next step.
- Return type:
ExecutionContext
Steps#
- class gyoza.worker.execution_pipeline.steps.prepare_inputs.PrepareInputs[source]#
Bases:
objectPrepare the inputs for the container execution.
Creates a unique gyoza directory for the run, saves inputs as a pickle file, and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.
Pull latest image step - ensures the latest version of the image is available.
- gyoza.worker.execution_pipeline.steps.pull_latest_image.pull_latest_image(ctx)[source]#
Pull the latest version of the Docker image before execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with image name configured.- Returns:
Unchanged context after pulling the image.
- Return type:
ExecutionContext
Run container step - executes container and streams logs.
- class gyoza.worker.execution_pipeline.steps.run_container.RunContainer[source]#
Bases:
objectRun the container and stream logs.
Runs the container attached, streaming logs in real-time. Logs compatible with the gyoza events are parsed and sent to master.
Start OpRun step - sends STARTED event to master.
- gyoza.worker.execution_pipeline.steps.start_oprun.start_oprun(ctx)[source]#
Send STARTED event to the master API.
- Parameters:
ctx (
ExecutionContext) – Execution context with oprun_id.- Returns:
Unchanged context.
- Return type:
ExecutionContext
- gyoza.worker.execution_pipeline.steps.complete_oprun.complete_oprun(ctx)[source]#
Send COMPLETED or FAILED event to the master API based on exit code.
Loads outputs from the pickle file and sends them as payload on success.
- Parameters:
ctx (
ExecutionContext) – Execution context with exit_code and env_vars set.- Returns:
Context with outputs populated on success.
- Return type:
ExecutionContext
- gyoza.worker.execution_pipeline.steps.cleanup_op_dir.cleanup_op_dir(ctx)[source]#
Delete the op directory created by prepare_inputs.
Removes the gyoza_dir generated by the pipeline, which contains inputs, outputs, and any other files produced during execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with gyoza_dir in extras.- Returns:
Context unchanged (directory removed from disk).
- Return type:
ExecutionContext
Worker Context#
Worker context.
- Provides a easy to use worker context that can be used to
track worker identity, resources, and running operations.
Example
>>> from gyoza.worker.worker_context import worker_context
>>>
>>> # Resources and worker_id are auto-detected at import time
>>> print(worker_context.worker_id) # from GYOZA_WORKER_ID env var
>>> print(worker_context.resources.cpu_cores)
>>> print(worker_context.resources.ram_mb)
>>>
>>> # Use for heartbeat
>>> from gyoza.client import gyoza_client
>>> payload = worker_context.to_heartbeat_payload()
>>> gyoza_client.heartbeat(**payload)
- class gyoza.worker.worker_context.GPU(id, vram_mb, tags=<factory>)[source]
Bases:
objectRepresentation of a single GPU.
- Parameters:
- id: int
- vram_mb: int
- class gyoza.worker.worker_context.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]
Bases:
objectHardware resources available on the worker.
- Parameters:
- cpu_cores: int = 0
- ram_mb: int = 0
- class gyoza.worker.worker_context.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]
Bases:
objectSingleton context storing worker state for heartbeats.
- Parameters:
worker_id (
str) – Worker name/identifier.resources (
Resources) – Hardware resources available.tags (
list[str]) – Worker-level tags.running_ops (
list[dict[str,Any]]) – Running OpRuns reported at heartbeat time.
- worker_id: str = ''
- resources: Resources
- add_running_op(oprun_id, image, inputs)[source]
Add an OpRun to the running operations list.
- remove_running_op(oprun_id)[source]
Remove an OpRun from the running operations list.
- to_heartbeat_payload()[source]
Convert to heartbeat request payload.
- Returns:
Payload ready for gyoza_client.heartbeat().
- Return type:
dict[str,Any]
- gyoza.worker.worker_context.detect_resources()[source]
Auto-detect hardware resources available on this machine.
- Returns:
Detected CPU cores, RAM, and GPUs.
- Return type:
Resources
Types for the worker context module.
- class gyoza.worker.worker_context.types.GPU(id, vram_mb, tags=<factory>)[source]#
Bases:
objectRepresentation of a single GPU.
- Parameters:
- class gyoza.worker.worker_context.types.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]#
Bases:
objectHardware resources available on the worker.
- Parameters:
- class gyoza.worker.worker_context.types.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#
Bases:
objectSingleton context storing worker state for heartbeats.
- Parameters:
Utilities for auto-detecting worker resources.