Source code for gyoza.worker.execution_pipeline.pipeline
"""Pipeline implementation."""
import traceback
from gyoza.worker.execution_pipeline.types import ExecutionContext, Step
[docs]
class Pipeline:
"""A sequence of steps executed in order.
Parameters
----------
steps : list[Step]
Ordered list of steps to execute.
"""
def __init__(self, steps: list[Step] | None = None) -> None:
self._steps: list[Step] = list(steps) if steps else []
[docs]
def run(self, ctx: ExecutionContext | None = None) -> ExecutionContext:
"""Execute all steps in sequence.
If any step raises an exception the run is marked as FAILED via the
server API and the exception is re-raised so the caller can log it.
Parameters
----------
ctx : ExecutionContext | None
Shared context. Created if not provided.
Returns
-------
ExecutionContext
The context after all steps have been applied.
Raises
------
Exception
Whatever exception the failing step raised.
"""
if ctx is None:
ctx = ExecutionContext()
try:
for step in self._steps:
ctx = step(ctx)
except Exception as exc:
from gyoza.client import gyoza_client # noqa: PLC0415
_MAX_MSG_LEN = 10000
msg = f"{type(exc).__name__}: {exc}\n{traceback.format_exc()}"
gyoza_client.add_event(
run_id=ctx.oprun_id,
event_type="FAILED",
msg=msg[:_MAX_MSG_LEN],
)
raise
return ctx