Source code for gyoza.worker.execution_pipeline.steps.cleanup_op_dir
import os
import shutil
from gyoza.worker.execution_pipeline.types import ExecutionContext
[docs]
def cleanup_op_dir(ctx: ExecutionContext) -> ExecutionContext:
"""Delete the op directory created by prepare_inputs.
Removes the gyoza_dir generated by the pipeline, which contains inputs,
outputs, and any other files produced during execution.
Parameters
----------
ctx : ExecutionContext
Execution context with gyoza_dir in extras.
Returns
-------
ExecutionContext
Context unchanged (directory removed from disk).
"""
gyoza_dir = ctx.extras.get("gyoza_dir")
if gyoza_dir and os.path.exists(gyoza_dir):
shutil.rmtree(gyoza_dir)
return ctx