Steps
Steps are how nodes are composed into a sequence that the runner can execute. A pipeline function returns an impl Steps<E>, which is implemented for tuples of nodes (and pipelines). Tuples of up to 10 elements are supported.
In the minimal example
The pipeline function returns a tuple of two nodes. This tuple automatically implements Steps<PondError>:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
},
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
)
}
The tuple ordering defines the sequential execution order. The SequentialRunner executes nodes in this order; the ParallelRunner uses dependency analysis to run independent nodes concurrently, but still respects data dependencies.
Composing steps
Steps are just tuples, so you compose them by adding elements:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node { name: "step1", func: |x| (x,), input: (¶ms.x,), output: (&cat.a,) },
Node { name: "step2", func: |a| (a + 1,), input: (&cat.a,), output: (&cat.b,) },
Node { name: "step3", func: |b| (b * 2,), input: (&cat.b,), output: (&cat.c,) },
)
}
For grouping related nodes with declared contracts, see Pipeline.
Validation
StepInfo::check() validates the pipeline structure without executing it:
- No node reads a dataset before it is produced by an earlier node
- No dataset is produced by more than one node
- Parameters are never written to
let steps = pipeline(&catalog, ¶ms);
steps.check()?; // returns Result<(), CheckError>
See Check for details.
The pipeline function
The function that creates steps must be a named function with an explicit lifetime, not a closure:
// Correct: named function with tied lifetime
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(/* nodes */)
}
// Wrong: closures desugar into two independent lifetimes
let pipeline = |cat: &Catalog, params: &Params| { /* ... */ };
This is because the PipelineFn trait uses a lifetime-on-trait pattern that requires both references to share the same lifetime 'a. Named functions with explicit <'a> satisfy this; closures do not.