Source code for gyoza.worker.execution_pipeline.steps.complete_oprun

import json
import os

from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext


[docs] def complete_oprun(ctx: ExecutionContext) -> ExecutionContext: """Send COMPLETED or FAILED event to the master API based on exit code. Loads outputs from the pickle file and sends them as payload on success. Parameters ---------- ctx : ExecutionContext Execution context with exit_code and env_vars set. Returns ------- ExecutionContext Context with outputs populated on success. """ if ctx.exit_code == 0: ctx.outputs = _load_outputs(ctx.env_vars.get("GYOZA_OUTPUT_PATH", "")) gyoza_client.add_event( run_id=ctx.oprun_id, event_type="COMPLETED", msg="Execution completed successfully", payload={"outputs": ctx.outputs}, ) else: tail = "\n".join(ctx.logs.splitlines()[-1000:]) if ctx.logs else "" gyoza_client.add_event( run_id=ctx.oprun_id, event_type="FAILED", msg=f"Execution failed with exit code {ctx.exit_code}:\n{tail}", ) return ctx
def _load_outputs(output_path: str) -> dict: """Load outputs from the pickle file. Parameters ---------- output_path : str Path to the output pickle file. Returns ------- dict Loaded outputs, or empty dict if file doesn't exist. """ print(f"Loading outputs from {output_path}") if not output_path or not os.path.exists(output_path): return {} with open(output_path) as f: return json.load(f)