Source code for gyoza.worker.execution_pipeline.steps.prepare_inputs
import json
import os
from gyoza.worker.execution_pipeline.types import ExecutionContext
GYOZA_WORKING_DIRECTORY = os.getenv("GYOZA_WORKING_DIRECTORY", "/tmp")
[docs]
class PrepareInputs:
"""Prepare the inputs for the container execution.
Creates a unique gyoza directory for the run, saves inputs as a pickle file,
and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.
"""
def __call__(self, ctx: ExecutionContext) -> ExecutionContext:
"""Execute the step.
Parameters
----------
ctx : ExecutionContext
Execution context with inputs to convert.
Returns
-------
ExecutionContext
Context with gyoza_dir, volumes, and env_vars configured.
"""
gyoza_dir = self._generate_op_dir(ctx.oprun_id)
self._save_input_pickle(ctx.inputs, gyoza_dir)
self._setup_volumes(ctx, gyoza_dir)
self._setup_env_vars(ctx, gyoza_dir)
# Store gyoza_dir in extras for cleanup later
ctx.extras["gyoza_dir"] = gyoza_dir
return ctx
def _generate_op_dir(self, oprun_id: str) -> str:
"""Generate the operation directory path using the oprun_id.
Parameters
----------
oprun_id : str
OpRun identifier used as directory name.
Returns
-------
str
Path to the gyoza directory.
"""
return os.path.join(GYOZA_WORKING_DIRECTORY, oprun_id)
def _save_input_pickle(self, inputs: dict, gyoza_dir: str) -> None:
"""Save inputs as a JSON file in the gyoza directory.
Parameters
----------
inputs : dict
Input parameters to save.
gyoza_dir : str
Directory where the JSON file will be saved.
"""
os.makedirs(gyoza_dir, exist_ok=True)
os.chmod(gyoza_dir, 0o777)
json_path = os.path.join(gyoza_dir, "input.json")
with open(json_path, "w") as f:
json.dump(inputs, f)
def _setup_volumes(self, ctx: ExecutionContext, gyoza_dir: str) -> None:
"""Add the gyoza directory as a volume mount.
Parameters
----------
ctx : ExecutionContext
Execution context to update.
gyoza_dir : str
Path to mount inside the container.
"""
ctx.volumes[gyoza_dir] = gyoza_dir
def _setup_env_vars(self, ctx: ExecutionContext, gyoza_dir: str) -> None:
"""Set GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables.
Parameters
----------
ctx : ExecutionContext
Execution context to update.
gyoza_dir : str
Base directory for input/output files.
"""
ctx.env_vars["GYOZA_INPUT_PATH"] = os.path.join(gyoza_dir, "input.json")
ctx.env_vars["GYOZA_OUTPUT_PATH"] = os.path.join(gyoza_dir, "output.json")
prepare_inputs = PrepareInputs()