Dynamic Pipeline
Demonstrates runtime pipeline composition using StepVec: the report
node is only included when the include_report param is true.
Usage
cargo run --example dyn_steps_app -- run
cargo run --example dyn_steps_app -- check
cargo run --example dyn_steps_app -- viz
Types
#[derive(Serialize, Deserialize)]
pub struct Catalog {
pub readings: PolarsCsvDataset,
pub summary: MemoryDataset<f64>,
pub report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
pub struct Params {
pub threshold: Param<f64>,
pub include_report: Param<bool>,
}
Pipeline definition
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
let mut steps = vec![
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
}
.boxed(),
];
if params.include_report.0 {
steps.push(
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
}
.boxed(),
);
}
steps
}
App entry point
fn main() -> Result<(), PondError> {
let dir = {
let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
manifest.join("examples").join("dyn_steps_data")
};
write_fixtures(&dir);
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
}