Source code for gyoza.worker.execution_pipeline.steps.run_container
"""Run container step - executes container and streams logs."""
import codecs
import docker
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext
# Prefixes emitted by GyozaLogger: "[GYOZA] [<TYPE>] <content>"
_GYOZA_EVENT_PREFIX = "[GYOZA]"
_INFO_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [INFO]"
_PROGRESS_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [PROGRESS]"
[docs]
class RunContainer:
"""Run the container and stream logs.
Runs the container attached, streaming logs in real-time.
Logs compatible with the gyoza events are parsed and sent to master.
"""
def __init__(self) -> None:
self._client = docker.from_env()
def __call__(self, ctx: ExecutionContext) -> ExecutionContext:
"""Execute the step.
Parameters
----------
ctx : ExecutionContext
Execution context with image, env_vars, and volumes configured.
Returns
-------
ExecutionContext
Context with logs and exit_code populated.
"""
container = self._create_container(ctx)
ctx.logs, ctx.exit_code = self._run_and_stream(container, ctx.oprun_id)
return ctx
def _create_container(
self, ctx: ExecutionContext
) -> docker.models.containers.Container:
"""Create the docker container.
Parameters
----------
ctx : ExecutionContext
Execution context with image, env_vars, and volumes.
Returns
-------
Container
Created container ready to start.
"""
volumes = {
host_path: {"bind": container_path, "mode": "rw"}
for host_path, container_path in ctx.volumes.items()
}
container = self._client.containers.create(
image=ctx.image,
environment=ctx.env_vars,
volumes=volumes,
detach=True,
)
return container
def _run_and_stream(
self, container: docker.models.containers.Container, oprun_id: str
) -> tuple[str, int | None]:
"""Run the container and stream logs.
Parameters
----------
container : Container
Docker container to run.
oprun_id : str
OpRun ID for sending events.
Returns
-------
tuple[str, int | None]
Tuple of (logs, exit_code).
"""
logs_lines: list[str] = []
log_line_buffer = ""
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
container.start()
for chunk in container.logs(stream=True, follow=True):
log_line_buffer += decoder.decode(chunk)
while "\n" in log_line_buffer:
line, log_line_buffer = log_line_buffer.split("\n", 1)
logs_lines.append(line)
if line.startswith(_GYOZA_EVENT_PREFIX):
self._send_log_event(oprun_id, line)
result = container.wait()
exit_code = result.get("StatusCode")
container.remove()
return "\n".join(logs_lines), exit_code
def _send_log_event(self, oprun_id: str, line: str) -> None:
"""Parse a [GYOZA] log line and forward it as the correct event type.
Parameters
----------
oprun_id : str
The OpRun identifier.
line : str
Raw log line emitted by GyozaLogger.
"""
print(f"Sending event: {line}")
if line.startswith(_PROGRESS_EVENT_PREFIX):
raw = line[len(_PROGRESS_EVENT_PREFIX) :].strip().rstrip("%")
try:
value = float(raw)
except ValueError:
return
gyoza_client.add_event(run_id=oprun_id, event_type="PROGRESS", msg=value)
elif line.startswith(_INFO_EVENT_PREFIX):
msg = line[len(_INFO_EVENT_PREFIX) :].strip()
gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=msg)
else:
gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=line)
run_container = RunContainer()