Source code for gyoza.worker.execution_pipeline.pipeline

"""Pipeline implementation."""

import traceback

from gyoza.worker.execution_pipeline.types import ExecutionContext, Step


[docs] class Pipeline: """A sequence of steps executed in order. Parameters ---------- steps : list[Step] Ordered list of steps to execute. """ def __init__(self, steps: list[Step] | None = None) -> None: self._steps: list[Step] = list(steps) if steps else []
[docs] def run(self, ctx: ExecutionContext | None = None) -> ExecutionContext: """Execute all steps in sequence. If any step raises an exception the run is marked as FAILED via the server API and the exception is re-raised so the caller can log it. Parameters ---------- ctx : ExecutionContext | None Shared context. Created if not provided. Returns ------- ExecutionContext The context after all steps have been applied. Raises ------ Exception Whatever exception the failing step raised. """ if ctx is None: ctx = ExecutionContext() try: for step in self._steps: ctx = step(ctx) except Exception as exc: from gyoza.client import gyoza_client # noqa: PLC0415 _MAX_MSG_LEN = 10000 msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}" gyoza_client.add_event( run_id=ctx.oprun_id, event_type="FAILED", msg=msg[:_MAX_MSG_LEN], ) raise return ctx