Source code for gyoza.worker.runner

"""Worker main loop.

Entry point called by ``gyoza worker start``. Runs the heartbeat/claim
cycle in a blocking loop and dispatches op runs to a thread pool.
"""

import os
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any

from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline import ExecutionContext, execution_pipeline
from gyoza.worker.worker_context import worker_context

_HEARTBEAT_INTERVAL = int(os.getenv("GYOZA_HEARTBEAT_INTERVAL", "10"))


[docs] def start() -> None: """Start the worker heartbeat and execution loop. Blocks until interrupted with Ctrl-C or a signal. On each tick: 1. Sends a heartbeat to the server (registers/updates the worker). 2. Claims pending op runs allocated to this worker. 3. Dispatches each claimed run to a background thread. """ print(f"Starting worker: {worker_context.worker_id}") print(f"Resources: {worker_context.resources.to_dict()}") executor = ThreadPoolExecutor() try: while True: try: payload = worker_context.to_heartbeat_payload() gyoza_client.heartbeat(**payload) print(f"Heartbeat sent. Running ops: {len(worker_context.running_ops)}") ops = gyoza_client.claim_ops(worker_context.worker_id) print(f"Claimed {len(ops)} new operations") for op in ops: print(f"Dispatching task: {op['id']}") executor.submit(_run_task, op) except Exception as e: print(f"Error in main loop: {e}") time.sleep(_HEARTBEAT_INTERVAL) except KeyboardInterrupt: print("\nShutting down worker...") finally: executor.shutdown(wait=True) print("Worker stopped")
def _run_task(op: dict[str, Any]) -> None: """Execute a single op run in a worker thread. Parameters ---------- op : dict[str, Any] Op run data from ``claim_ops`` (``id``, ``image``, ``inputs``, ``constraints``). """ oprun_id = op["id"] image = op["image"] inputs = op.get("inputs", {}) constraints = op.get("constraints", {}) worker_context.add_running_op(oprun_id, image, inputs) try: ctx = ExecutionContext( oprun_id=oprun_id, image=image, constraints=constraints, inputs=inputs, ) execution_pipeline.run(ctx) print(f"Task {oprun_id} finished with exit code {ctx.exit_code}") except Exception as e: print(f"Task {oprun_id} failed: {e}") finally: worker_context.remove_running_op(oprun_id)