Source code for gyoza.worker.execution_pipeline.steps.run_container

"""Run container step - executes container and streams logs."""

import codecs

import docker
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext

# Prefixes emitted by GyozaLogger: "[GYOZA] [<TYPE>] <content>"
_GYOZA_EVENT_PREFIX = "[GYOZA]"
_INFO_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [INFO]"
_PROGRESS_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [PROGRESS]"


[docs] class RunContainer: """Run the container and stream logs. Runs the container attached, streaming logs in real-time. Logs compatible with the gyoza events are parsed and sent to master. """ def __init__(self) -> None: self._client = docker.from_env() def __call__(self, ctx: ExecutionContext) -> ExecutionContext: """Execute the step. Parameters ---------- ctx : ExecutionContext Execution context with image, env_vars, and volumes configured. Returns ------- ExecutionContext Context with logs and exit_code populated. """ container = self._create_container(ctx) ctx.logs, ctx.exit_code = self._run_and_stream(container, ctx.oprun_id) return ctx def _create_container( self, ctx: ExecutionContext ) -> docker.models.containers.Container: """Create the docker container. Parameters ---------- ctx : ExecutionContext Execution context with image, env_vars, and volumes. Returns ------- Container Created container ready to start. """ volumes = { host_path: {"bind": container_path, "mode": "rw"} for host_path, container_path in ctx.volumes.items() } container = self._client.containers.create( image=ctx.image, environment=ctx.env_vars, volumes=volumes, detach=True, ) return container def _run_and_stream( self, container: docker.models.containers.Container, oprun_id: str ) -> tuple[str, int | None]: """Run the container and stream logs. Parameters ---------- container : Container Docker container to run. oprun_id : str OpRun ID for sending events. Returns ------- tuple[str, int | None] Tuple of (logs, exit_code). """ logs_lines: list[str] = [] log_line_buffer = "" decoder = codecs.getincrementaldecoder("utf-8")(errors="replace") container.start() for chunk in container.logs(stream=True, follow=True): log_line_buffer += decoder.decode(chunk) while "\n" in log_line_buffer: line, log_line_buffer = log_line_buffer.split("\n", 1) logs_lines.append(line) if line.startswith(_GYOZA_EVENT_PREFIX): self._send_log_event(oprun_id, line) result = container.wait() exit_code = result.get("StatusCode") container.remove() return "\n".join(logs_lines), exit_code def _send_log_event(self, oprun_id: str, line: str) -> None: """Parse a [GYOZA] log line and forward it as the correct event type. Parameters ---------- oprun_id : str The OpRun identifier. line : str Raw log line emitted by GyozaLogger. """ print(f"Sending event: {line}") if line.startswith(_PROGRESS_EVENT_PREFIX): raw = line[len(_PROGRESS_EVENT_PREFIX) :].strip().rstrip("%") try: value = float(raw) except ValueError: return gyoza_client.add_event(run_id=oprun_id, event_type="PROGRESS", msg=value) elif line.startswith(_INFO_EVENT_PREFIX): msg = line[len(_INFO_EVENT_PREFIX) :].strip() gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=msg) else: gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=line)
run_container = RunContainer()