Pipeline Hooks
Pipeline hooks fire at the boundaries of Pipeline structs (not flat tuples). They are useful for tracking the lifecycle of logical groups of nodes.
Methods
fn before_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn on_pipeline_error(&self, p: &dyn PipelineInfo, error: &str) {}
Arguments
p— the pipeline being executed.p.name()returns the pipeline’s name,p.is_leaf()returnsfalse.error— the stringified error message from the failing node within the pipeline.
Sequential runner behavior
The SequentialRunner fires pipeline hooks as it enters and exits Pipeline structs:
before_pipeline_run("processing")
before_node_run("clean")
after_node_run("clean")
before_node_run("transform")
after_node_run("transform")
after_pipeline_run("processing")
If a child node fails, on_pipeline_error fires instead of after_pipeline_run.
Parallel runner behavior
The ParallelRunner fires pipeline hooks based on dataset availability:
before_pipeline_runfires when all of the pipeline’s declared inputs are availableafter_pipeline_runfires when all of the pipeline’s declared outputs have been producedon_pipeline_errorfires when a child node fails — it propagates up through all ancestor pipelines
This means pipeline hooks may fire at different times than in sequential execution, since nodes may run in a different order.
Flat tuples vs Pipeline structs
Pipeline hooks only fire for Pipeline structs. A flat tuple of nodes at the top level does not trigger before_pipeline_run / after_pipeline_run. If you want pipeline-level hooks for your entire pipeline, wrap the top-level steps in a Pipeline.
Example: timing pipelines
struct PipelineTimer {
timings: Mutex<HashMap<&'static str, Instant>>,
}
impl Hook for PipelineTimer {
fn before_pipeline_run(&self, p: &dyn PipelineInfo) {
self.timings.lock().unwrap().insert(p.name(), Instant::now());
}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {
if let Some(start) = self.timings.lock().unwrap().remove(p.name()) {
println!("[{}] completed in {:.1}ms", p.name(), start.elapsed().as_secs_f64() * 1000.0);
}
}
}