Source code for gyoza.worker.execution_pipeline.steps.complete_oprun
import json
import os
import httpx
from gyoza.client import gyoza_client
from gyoza.worker.execution_pipeline.types import ExecutionContext
[docs]
def complete_oprun(ctx: ExecutionContext) -> ExecutionContext:
"""Send COMPLETED or FAILED event to the master API based on exit code.
Loads outputs from the pickle file and sends them as payload on success.
If the server returns 400 or 409 (e.g. run already terminal or not
assigned to this worker), the update is skipped and context is returned
without raising, so the worker does not treat a reclaimed run as a
local failure.
Parameters
----------
ctx : ExecutionContext
Execution context with exit_code and env_vars set.
Returns
-------
ExecutionContext
Context with outputs populated on success.
"""
try:
if ctx.exit_code == 0:
ctx.outputs = _load_outputs(ctx.env_vars.get("GYOZA_OUTPUT_PATH", ""))
gyoza_client.add_event(
run_id=ctx.oprun_id,
event_type="COMPLETED",
msg="Execution completed successfully",
payload={"outputs": ctx.outputs},
)
else:
tail = "\n".join(ctx.logs.splitlines()[-1000:]) if ctx.logs else ""
gyoza_client.add_event(
run_id=ctx.oprun_id,
event_type="FAILED",
msg=f"Execution failed with exit code {ctx.exit_code}:\n{tail}",
)
except httpx.HTTPStatusError as e:
if e.response.status_code in (400, 409):
print(
f"Run {ctx.oprun_id} already finalized on server "
f"(status {e.response.status_code}), skipping event"
)
else:
raise
return ctx
def _load_outputs(output_path: str) -> dict:
"""Load outputs from the pickle file.
Parameters
----------
output_path : str
Path to the output pickle file.
Returns
-------
dict
Loaded outputs, or empty dict if file doesn't exist.
"""
print(f"Loading outputs from {output_path}")
if not output_path or not os.path.exists(output_path):
return {}
with open(output_path) as f:
return json.load(f)