Source code for gyoza.worker.runner
"""Worker main loop.
Entry point called by ``gyoza worker start``. Runs the heartbeat/claim
cycle in a blocking loop and dispatches op runs to a thread pool.
"""
import os
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline import ExecutionContext, execution_pipeline
from gyoza.worker.worker_context import worker_context
_HEARTBEAT_INTERVAL = int(os.getenv("GYOZA_HEARTBEAT_INTERVAL", "10"))
[docs]
def start() -> None:
"""Start the worker heartbeat and execution loop.
Blocks until interrupted with Ctrl-C or a signal. On each tick:
1. Sends a heartbeat to the server (registers/updates the worker).
2. Claims pending op runs allocated to this worker.
3. Dispatches each claimed run to a background thread.
"""
print(f"Starting worker: {worker_context.worker_id}")
print(f"Resources: {worker_context.resources.to_dict()}")
executor = ThreadPoolExecutor()
try:
while True:
try:
payload = worker_context.to_heartbeat_payload()
gyoza_client.heartbeat(**payload)
print(f"Heartbeat sent. Running ops: {len(worker_context.running_ops)}")
ops = gyoza_client.claim_ops(worker_context.worker_id)
print(f"Claimed {len(ops)} new operations")
for op in ops:
print(f"Dispatching task: {op['id']}")
executor.submit(_run_task, op)
except Exception as e:
print(f"Error in main loop: {e}")
time.sleep(_HEARTBEAT_INTERVAL)
except KeyboardInterrupt:
print("\nShutting down worker...")
finally:
executor.shutdown(wait=True)
print("Worker stopped")
def _run_task(op: dict[str, Any]) -> None:
"""Execute a single op run in a worker thread.
Parameters
----------
op : dict[str, Any]
Op run data from ``claim_ops`` (``id``, ``image``, ``inputs``,
``constraints``).
"""
oprun_id = op["id"]
image = op["image"]
inputs = op.get("inputs", {})
constraints = op.get("constraints", {})
worker_context.add_running_op(oprun_id, image, inputs)
try:
ctx = ExecutionContext(
oprun_id=oprun_id,
image=image,
constraints=constraints,
inputs=inputs,
)
execution_pipeline.run(ctx)
print(f"Task {oprun_id} finished with exit code {ctx.exit_code}")
except Exception as e:
print(f"Task {oprun_id} failed: {e}")
finally:
worker_context.remove_running_op(oprun_id)