Source code for gyoza.worker.execution_pipeline.steps.complete_oprun
import json
import os
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext
[docs]
def complete_oprun(ctx: ExecutionContext) -> ExecutionContext:
"""Send COMPLETED or FAILED event to the master API based on exit code.
Loads outputs from the pickle file and sends them as payload on success.
Parameters
----------
ctx : ExecutionContext
Execution context with exit_code and env_vars set.
Returns
-------
ExecutionContext
Context with outputs populated on success.
"""
if ctx.exit_code == 0:
ctx.outputs = _load_outputs(ctx.env_vars.get("GYOZA_OUTPUT_PATH", ""))
gyoza_client.add_event(
run_id=ctx.oprun_id,
event_type="COMPLETED",
msg="Execution completed successfully",
payload={"outputs": ctx.outputs},
)
else:
tail = "\n".join(ctx.logs.splitlines()[-1000:]) if ctx.logs else ""
gyoza_client.add_event(
run_id=ctx.oprun_id,
event_type="FAILED",
msg=f"Execution failed with exit code {ctx.exit_code}:\n{tail}",
)
return ctx
def _load_outputs(output_path: str) -> dict:
"""Load outputs from the pickle file.
Parameters
----------
output_path : str
Path to the output pickle file.
Returns
-------
dict
Loaded outputs, or empty dict if file doesn't exist.
"""
print(f"Loading outputs from {output_path}")
if not output_path or not os.path.exists(output_path):
return {}
with open(output_path) as f:
return json.load(f)