Sales Pipeline
Monthly sales CSV processing: filter by minimum sales, compute totals, and produce a Plotly bar chart.
Usage
cargo run --example sales_app -- run
cargo run --example sales_app -- check
cargo run --example sales_app -- viz
Types
// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct SalesCatalog {
pub raw_sales: PolarsCsvDataset,
pub filtered_sales: PolarsCsvDataset,
pub total_sales: MemoryDataset<i64>,
pub chart: PlotlyDataset,
}
#[derive(Serialize, Deserialize)]
pub struct SalesParams {
/// Minimum sales to include a month in the chart.
pub min_sales: Param<i64>,
}
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
fn build_chart(df: DataFrame, total: i64) -> (Plot,) {
let months: Vec<String> = df
.column("month").unwrap()
.str().unwrap()
.into_no_null_iter()
.map(|s| s.to_string())
.collect();
let sales: Vec<i64> = df
.column("sales").unwrap()
.i64().unwrap()
.into_no_null_iter()
.collect();
let mut plot = Plot::new();
plot.add_trace(Bar::new(months, sales).name("Monthly Sales"));
plot.set_layout(
Layout::new()
.title(format!("Months with sales ≥ 1000 (total: {total})"))
.y_axis(plotly::layout::Axis::new().title("Sales")),
);
(plot,)
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn sales_pipeline<'a>(
cat: &'a SalesCatalog,
params: &'a SalesParams,
) -> impl Steps<PondError> + 'a {
(
// Node 1: filter out months below the threshold
Node {
name: "filter_months",
func: |df: DataFrame, min_sales: i64| -> Result<(DataFrame,), PolarsError> {
let mask = df.column("sales")?.i64()?.gt_eq(min_sales);
Ok((df.filter(&mask)?,))
},
input: (&cat.raw_sales, ¶ms.min_sales),
output: (&cat.filtered_sales,),
},
// Node 2: sum the filtered sales
Node {
name: "compute_total",
func: |df: DataFrame| {
let total =
df.column("sales").unwrap().i64().unwrap().sum().unwrap_or(0);
(total,)
},
input: (&cat.filtered_sales,),
output: (&cat.total_sales,),
},
// Node 3: build a bar chart of the filtered monthly sales
Node {
name: "build_chart",
func: build_chart,
input: (&cat.filtered_sales, &cat.total_sales),
output: (&cat.chart,),
},
)
}
App entry point
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((
LoggingHook::new(),
VizHook::new("http://localhost:8080".to_string()),
))
.with_args(std::env::args_os())?
.dispatch(sales_pipeline)