Source code for gyoza.cli.commands.worker

"""Worker sub-commands for the gyoza CLI."""

import os
import uuid

import typer

app = typer.Typer(help="Manage gyoza workers.")


[docs] def start( worker_id: str | None = typer.Option( None, "--worker-id", envvar="GYOZA_WORKER_ID", help="Unique identifier for this worker. Defaults to worker-<uuid>.", ), working_directory: str = typer.Option( "/tmp/gyoza/", "--working-directory", envvar="GYOZA_WORKING_DIRECTORY", help="Directory used to stage op run data.", ), heartbeat_interval: int = typer.Option( 10, "--heartbeat-interval", envvar="GYOZA_HEARTBEAT_INTERVAL", help="Seconds between heartbeat ticks.", ), ) -> None: """Start the worker heartbeat and execution loop.""" os.environ["GYOZA_WORKER_ID"] = worker_id or f"worker-{uuid.uuid4()}" os.environ["GYOZA_WORKING_DIRECTORY"] = working_directory os.environ["GYOZA_HEARTBEAT_INTERVAL"] = str(heartbeat_interval) from gyoza.worker import runner # noqa: PLC0415 runner.start()
app.command("start")(start)