Source code for gyoza.worker.execution_pipeline.steps.run_container
"""Run container step - executes container and streams logs."""
import codecs
import docker
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext
from gyoza.worker.worker_context import worker_context
# Prefixes emitted by GyozaLogger: "[GYOZA] [<TYPE>] <content>"
_GYOZA_EVENT_PREFIX = "[GYOZA]"
_INFO_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [INFO]"
_PROGRESS_EVENT_PREFIX = f"{_GYOZA_EVENT_PREFIX} [PROGRESS]"
[docs]
class RunContainer:
"""Run the container and stream logs.
Runs the container attached, streaming logs in real-time.
Logs compatible with the gyoza events are parsed and sent to master.
"""
def __init__(self) -> None:
self._client = docker.from_env()
def __call__(self, ctx: ExecutionContext) -> ExecutionContext:
"""Execute the step.
Parameters
----------
ctx : ExecutionContext
Execution context with image, env_vars, and volumes configured.
Returns
-------
ExecutionContext
Context with logs and exit_code populated.
"""
container = self._create_container(ctx)
ctx.logs, ctx.exit_code = self._run_and_stream(container, ctx.oprun_id)
return ctx
def _create_container(
self, ctx: ExecutionContext
) -> docker.models.containers.Container:
"""Create the docker container.
Parameters
----------
ctx : ExecutionContext
Execution context with image, env_vars, and volumes.
Returns
-------
Container
Created container ready to start.
"""
volumes = {
host_path: {"bind": container_path, "mode": "rw"}
for host_path, container_path in ctx.volumes.items()
}
create_kwargs = {
"image": ctx.image,
"environment": ctx.env_vars,
"volumes": volumes,
"detach": True,
}
gpu_id = ctx.constraints.get("gpu_id")
if gpu_id is not None:
create_kwargs["device_requests"] = [
docker.types.DeviceRequest(
capabilities=[["gpu"]],
device_ids=[str(gpu_id)],
)
]
container = self._client.containers.create(**create_kwargs)
return container
def _run_and_stream(
self, container: docker.models.containers.Container, oprun_id: str
) -> tuple[str, int | None]:
"""Run the container and stream logs.
Parameters
----------
container : Container
Docker container to run.
oprun_id : str
OpRun ID for sending events.
Returns
-------
tuple[str, int | None]
Tuple of (logs, exit_code).
"""
logs_lines: list[str] = []
log_line_buffer = ""
decoder = codecs.getincrementaldecoder("utf-8")(errors="replace")
container.start()
for chunk in container.logs(stream=True, follow=True):
if worker_context.is_run_cancelled(oprun_id):
container.stop()
break
log_line_buffer += decoder.decode(chunk)
while "\n" in log_line_buffer:
line, log_line_buffer = log_line_buffer.split("\n", 1)
logs_lines.append(line)
if line.startswith(_GYOZA_EVENT_PREFIX):
self._send_log_event(oprun_id, line)
result = container.wait()
exit_code = result.get("StatusCode")
container.remove()
return "\n".join(logs_lines), exit_code
def _send_log_event(self, oprun_id: str, line: str) -> None:
"""Parse a [GYOZA] log line and forward it as the correct event type.
Parameters
----------
oprun_id : str
The OpRun identifier.
line : str
Raw log line emitted by GyozaLogger.
"""
print(f"Sending event: {line}")
if line.startswith(_PROGRESS_EVENT_PREFIX):
raw = line[len(_PROGRESS_EVENT_PREFIX) :].strip().rstrip("%")
try:
value = float(raw)
except ValueError:
return
gyoza_client.add_event(run_id=oprun_id, event_type="PROGRESS", msg=value)
elif line.startswith(_INFO_EVENT_PREFIX):
msg = line[len(_INFO_EVENT_PREFIX) :].strip()
gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=msg)
else:
gyoza_client.add_event(run_id=oprun_id, event_type="INFO", msg=line)
run_container = RunContainer()