Source code for gyoza.worker.execution_pipeline.steps.prepare_inputs

import json
import os

from gyoza.worker.execution_pipeline.types import ExecutionContext

GYOZA_WORKING_DIRECTORY = os.getenv("GYOZA_WORKING_DIRECTORY", "/tmp")


[docs] class PrepareInputs: """Prepare the inputs for the container execution. Creates a unique gyoza directory for the run, saves inputs as a pickle file, and sets GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables. """ def __call__(self, ctx: ExecutionContext) -> ExecutionContext: """Execute the step. Parameters ---------- ctx : ExecutionContext Execution context with inputs to convert. Returns ------- ExecutionContext Context with gyoza_dir, volumes, and env_vars configured. """ gyoza_dir = self._generate_op_dir(ctx.oprun_id) self._save_input_pickle(ctx.inputs, gyoza_dir) self._setup_volumes(ctx, gyoza_dir) self._setup_env_vars(ctx, gyoza_dir) # Store gyoza_dir in extras for cleanup later ctx.extras["gyoza_dir"] = gyoza_dir return ctx def _generate_op_dir(self, oprun_id: str) -> str: """Generate the operation directory path using the oprun_id. Parameters ---------- oprun_id : str OpRun identifier used as directory name. Returns ------- str Path to the gyoza directory. """ return os.path.join(GYOZA_WORKING_DIRECTORY, oprun_id) def _save_input_pickle(self, inputs: dict, gyoza_dir: str) -> None: """Save inputs as a JSON file in the gyoza directory. Parameters ---------- inputs : dict Input parameters to save. gyoza_dir : str Directory where the JSON file will be saved. """ os.makedirs(gyoza_dir, exist_ok=True) os.chmod(gyoza_dir, 0o777) json_path = os.path.join(gyoza_dir, "input.json") with open(json_path, "w") as f: json.dump(inputs, f) def _setup_volumes(self, ctx: ExecutionContext, gyoza_dir: str) -> None: """Add the gyoza directory as a volume mount. Parameters ---------- ctx : ExecutionContext Execution context to update. gyoza_dir : str Path to mount inside the container. """ ctx.volumes[gyoza_dir] = gyoza_dir def _setup_env_vars(self, ctx: ExecutionContext, gyoza_dir: str) -> None: """Set GYOZA_INPUT_PATH and GYOZA_OUTPUT_PATH environment variables. Parameters ---------- ctx : ExecutionContext Execution context to update. gyoza_dir : str Base directory for input/output files. """ ctx.env_vars["GYOZA_INPUT_PATH"] = os.path.join(gyoza_dir, "input.json") ctx.env_vars["GYOZA_OUTPUT_PATH"] = os.path.join(gyoza_dir, "output.json")
prepare_inputs = PrepareInputs()