Source code for gyoza.worker.execution_pipeline.steps.complete_oprun

import json
import os

import httpx

from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext


[docs] def complete_oprun(ctx: ExecutionContext) -> ExecutionContext: """Send COMPLETED or FAILED event to the master API based on exit code. Loads outputs from the pickle file and sends them as payload on success. If the server returns 400 or 409 (e.g. run already terminal or not assigned to this worker), the update is skipped and context is returned without raising, so the worker does not treat a reclaimed run as a local failure. Parameters ---------- ctx : ExecutionContext Execution context with exit_code and env_vars set. Returns ------- ExecutionContext Context with outputs populated on success. """ try: if ctx.exit_code == 0: ctx.outputs = _load_outputs(ctx.env_vars.get("GYOZA_OUTPUT_PATH", "")) gyoza_client.add_event( run_id=ctx.oprun_id, event_type="COMPLETED", msg="Execution completed successfully", payload={"outputs": ctx.outputs}, ) else: tail = "\n".join(ctx.logs.splitlines()[-1000:]) if ctx.logs else "" gyoza_client.add_event( run_id=ctx.oprun_id, event_type="FAILED", msg=f"Execution failed with exit code {ctx.exit_code}:\n{tail}", ) except httpx.HTTPStatusError as e: if e.response.status_code in (400, 409): print( f"Run {ctx.oprun_id} already finalized on server " f"(status {e.response.status_code}), skipping event" ) else: raise return ctx
def _load_outputs(output_path: str) -> dict: """Load outputs from the pickle file. Parameters ---------- output_path : str Path to the output pickle file. Returns ------- dict Loaded outputs, or empty dict if file doesn't exist. """ print(f"Loading outputs from {output_path}") if not output_path or not os.path.exists(output_path): return {} with open(output_path) as f: return json.load(f)