gyoza.worker#
gyoza worker runtime.
Provides the heartbeat loop, execution pipeline, and worker context used by the worker process to claim and execute op runs.
- class gyoza.worker.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int|None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
- class gyoza.worker.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#
Bases:
objectSingleton context storing worker state for heartbeats.
Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.
- Parameters:
- add_running_op(oprun_id, image, inputs, constraints=None)[source]#
Add an OpRun to the running operations list.
- gyoza.worker.start()[source]#
Start the worker heartbeat and execution loop.
Blocks until interrupted with Ctrl-C or a signal. On each tick:
Sends a heartbeat to the server (registers/updates the worker).
Claims pending op runs allocated to this worker.
Dispatches each claimed run to a background thread.
- Return type:
Worker main loop.
Entry point called by gyoza worker start. Runs the heartbeat/claim
cycle in a blocking loop and dispatches op runs to a thread pool.
- gyoza.worker.runner.start()[source]#
Start the worker heartbeat and execution loop.
Blocks until interrupted with Ctrl-C or a signal. On each tick:
Sends a heartbeat to the server (registers/updates the worker).
Claims pending op runs allocated to this worker.
Dispatches each claimed run to a background thread.
- Return type:
Execution Pipeline#
The Execution pipeline provides a pre-configured execution pipeline that can be used to run the full execution of an OpRun.
Example
>>> from gyoza.worker.execution_pipeline import execution_pipeline, ExecutionContext
>>>
>>> ctx = ExecutionContext(
... oprun_id="run_abc123",
... image="geoiahub/product:latest",
... inputs={"param": "value"},
... )
>>> result = execution_pipeline.run(ctx)
>>> print(result.exit_code)
- class gyoza.worker.execution_pipeline.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int|None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
- oprun_id: str = ''
- image: str = ''
- logs: str = ''
- class gyoza.worker.execution_pipeline.Step(*args, **kwargs)[source]
Bases:
ProtocolA single step in the pipeline.
Steps receive the context, process it, and return the (possibly mutated) context.
- Parameters:
ctx (
ExecutionContext) – Mutable execution context shared across all steps.- Returns:
The context (same or mutated) for the next step.
- Return type:
ExecutionContext
- class gyoza.worker.execution_pipeline.Pipeline(steps=None)[source]
Bases:
objectA sequence of steps executed in order.
- run(ctx=None)[source]
Execute all steps in sequence.
If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.
- Parameters:
ctx (
ExecutionContext|None) – Shared context. Created if not provided.- Returns:
The context after all steps have been applied.
- Return type:
- Raises:
Exception – Whatever exception the failing step raised.
- gyoza.worker.execution_pipeline.pull_latest_image(ctx)[source]
Pull the latest version of the Docker image before execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with image name configured.- Returns:
Unchanged context after pulling the image.
- Return type:
- gyoza.worker.execution_pipeline.start_oprun(ctx)[source]
Send STARTED event to the master API.
- Parameters:
ctx (
ExecutionContext) – Execution context with oprun_id.- Returns:
Unchanged context.
- Return type:
Pipeline implementation.
- class gyoza.worker.execution_pipeline.pipeline.Pipeline(steps=None)[source]#
Bases:
objectA sequence of steps executed in order.
- run(ctx=None)[source]#
Execute all steps in sequence.
If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it.
- Parameters:
ctx (
ExecutionContext|None) – Shared context. Created if not provided.- Returns:
The context after all steps have been applied.
- Return type:
- Raises:
Exception – Whatever exception the failing step raised.
Execution context for the pipeline.
- class gyoza.worker.execution_pipeline.types.context.ExecutionContext(oprun_id='', image='', constraints=<factory>, inputs=<factory>, outputs=<factory>, env_vars=<factory>, volumes=<factory>, logs='', exit_code=None, extras=<factory>)[source]#
Bases:
objectMutable context shared across all pipeline steps.
Contains OpRun metadata and container execution state.
- Parameters:
oprun_id (
str) – Unique identifier for the operation run.image (
str) – Docker image identifier for the container.constraints (
dict[str,Any]) – Hardware constraints (ram_mb, vram_mb, cpu).inputs (
dict[str,Any]) – Input parameters for the operation.outputs (
dict[str,Any]) – Output results from the operation.env_vars (
dict[str,str]) – Environment variables for the container.volumes (
dict[str,str]) – Volume mounts for the container (host:container).logs (
str) – Container execution logs.exit_code (
int|None) – Exit code from the container process.extras (
dict[str,Any]) – Arbitrary key-value store for step communication.
Step protocol for the execution pipeline.
- class gyoza.worker.execution_pipeline.types.step.Step(*args, **kwargs)[source]#
Bases:
ProtocolA single step in the pipeline.
Steps receive the context, process it, and return the (possibly mutated) context.
- Parameters:
ctx (
ExecutionContext) – Mutable execution context shared across all steps.- Returns:
The context (same or mutated) for the next step.
- Return type:
ExecutionContext
Steps#
- class gyoza.worker.execution_pipeline.steps.prepare_inputs.PrepareInputs[source]#
Bases:
objectPrepare the inputs for the container execution.
Creates a unique gyoza directory for the run, saves inputs as a pickle file, and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.
Pull latest image step - ensures the latest version of the image is available.
- gyoza.worker.execution_pipeline.steps.pull_latest_image.pull_latest_image(ctx)[source]#
Pull the latest version of the Docker image before execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with image name configured.- Returns:
Unchanged context after pulling the image.
- Return type:
Run container step - executes container and streams logs.
- class gyoza.worker.execution_pipeline.steps.run_container.RunContainer[source]#
Bases:
objectRun the container and stream logs.
Runs the container attached, streaming logs in real-time. Logs compatible with the gyoza events are parsed and sent to master.
Start OpRun step - sends STARTED event to master.
- gyoza.worker.execution_pipeline.steps.start_oprun.start_oprun(ctx)[source]#
Send STARTED event to the master API.
- Parameters:
ctx (
ExecutionContext) – Execution context with oprun_id.- Returns:
Unchanged context.
- Return type:
- gyoza.worker.execution_pipeline.steps.complete_oprun.complete_oprun(ctx)[source]#
Send COMPLETED or FAILED event to the master API based on exit code.
Loads outputs from the pickle file and sends them as payload on success. If the server returns 400 or 409 (e.g. run already terminal or not assigned to this worker), the update is skipped and context is returned without raising, so the worker does not treat a reclaimed run as a local failure.
- Parameters:
ctx (
ExecutionContext) – Execution context with exit_code and env_vars set.- Returns:
Context with outputs populated on success.
- Return type:
- gyoza.worker.execution_pipeline.steps.cleanup_op_dir.cleanup_op_dir(ctx)[source]#
Delete the op directory created by prepare_inputs.
Removes the gyoza_dir generated by the pipeline, which contains inputs, outputs, and any other files produced during execution.
- Parameters:
ctx (
ExecutionContext) – Execution context with gyoza_dir in extras.- Returns:
Context unchanged (directory removed from disk).
- Return type:
Worker Context#
Worker context.
- Provides a easy to use worker context that can be used to
track worker identity, resources, and running operations.
Example
>>> from gyoza.worker.worker_context import worker_context
>>>
>>> # Resources and worker_id are auto-detected at import time
>>> print(worker_context.worker_id) # from GYOZA_WORKER_ID env var
>>> print(worker_context.resources.cpu_cores)
>>> print(worker_context.resources.ram_mb)
>>>
>>> # Use for heartbeat
>>> from gyoza.client import gyoza_client
>>> payload = worker_context.to_heartbeat_payload()
>>> gyoza_client.heartbeat(**payload)
- class gyoza.worker.worker_context.GPU(id, vram_mb, tags=<factory>)[source]
Bases:
objectRepresentation of a single GPU.
- Parameters:
- id: int
- vram_mb: int
- class gyoza.worker.worker_context.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]
Bases:
objectHardware resources available on the worker.
- Parameters:
- cpu_cores: int = 0
- ram_mb: int = 0
- class gyoza.worker.worker_context.RunningOp(id, image, inputs=<factory>, constraints=<factory>, state=None)[source]
Bases:
objectSimplified run representation with last known server state.
Stored in worker context for each op the worker is executing. Server state is refreshed from the main loop via GET /runs/{id}.
- Parameters:
- id: str
- image: str
- class gyoza.worker.worker_context.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]
Bases:
objectSingleton context storing worker state for heartbeats.
Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.
- Parameters:
- worker_id: str = ''
- resources: Resources
- add_running_op(oprun_id, image, inputs, constraints=None)[source]
Add an OpRun to the running operations list.
- remove_running_op(oprun_id)[source]
Remove an OpRun from the running operations list.
- set_run_state(run_id, state)[source]
Update the last known server state for a running op.
- get_run_state(run_id)[source]
Return the last known server state for a run, or None.
- is_run_cancelled(run_id)[source]
Return True if the run is known to be CANCELLED on the server.
- gyoza.worker.worker_context.detect_resources()[source]
Auto-detect hardware resources available on this machine.
- Returns:
Detected CPU cores, RAM, and GPUs.
- Return type:
Types for the worker context module.
- class gyoza.worker.worker_context.types.RunningOp(id, image, inputs=<factory>, constraints=<factory>, state=None)[source]#
Bases:
objectSimplified run representation with last known server state.
Stored in worker context for each op the worker is executing. Server state is refreshed from the main loop via GET /runs/{id}.
- class gyoza.worker.worker_context.types.GPU(id, vram_mb, tags=<factory>)[source]#
Bases:
objectRepresentation of a single GPU.
- Parameters:
- class gyoza.worker.worker_context.types.Resources(cpu_cores=0, ram_mb=0, gpus=<factory>)[source]#
Bases:
objectHardware resources available on the worker.
- Parameters:
- class gyoza.worker.worker_context.types.WorkerContext(worker_id='', resources=<factory>, tags=<factory>, running_ops=<factory>)[source]#
Bases:
objectSingleton context storing worker state for heartbeats.
Holds running ops (RunningOp) with last known server state, refreshed from the main loop so the executor can check e.g. cancellation.
- Parameters:
- add_running_op(oprun_id, image, inputs, constraints=None)[source]#
Add an OpRun to the running operations list.
Utilities for auto-detecting worker resources.