Source code for gyoza.worker.execution_pipeline.steps.cleanup_op_dir

import os
import shutil

from gyoza.worker.execution_pipeline.types import ExecutionContext


[docs] def cleanup_op_dir(ctx: ExecutionContext) -> ExecutionContext: """Delete the op directory created by prepare_inputs. Removes the gyoza_dir generated by the pipeline, which contains inputs, outputs, and any other files produced during execution. Parameters ---------- ctx : ExecutionContext Execution context with gyoza_dir in extras. Returns ------- ExecutionContext Context unchanged (directory removed from disk). """ gyoza_dir = ctx.extras.get("gyoza_dir") if gyoza_dir and os.path.exists(gyoza_dir): shutil.rmtree(gyoza_dir) return ctx