Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Sales Pipeline

Monthly sales CSV processing: filter by minimum sales, compute totals, and produce a Plotly bar chart.

Usage

cargo run --example sales_app -- run
cargo run --example sales_app -- check
cargo run --example sales_app -- viz

Types

// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct SalesCatalog {
    pub raw_sales: PolarsCsvDataset,
    pub filtered_sales: PolarsCsvDataset,
    pub total_sales: MemoryDataset<i64>,
    pub chart: PlotlyDataset,
}

#[derive(Serialize, Deserialize)]
pub struct SalesParams {
    /// Minimum sales to include a month in the chart.
    pub min_sales: Param<i64>,
}

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

fn build_chart(df: DataFrame, total: i64) -> (Plot,) {
    let months: Vec<String> = df
        .column("month").unwrap()
        .str().unwrap()
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect();
    let sales: Vec<i64> = df
        .column("sales").unwrap()
        .i64().unwrap()
        .into_no_null_iter()
        .collect();

    let mut plot = Plot::new();
    plot.add_trace(Bar::new(months, sales).name("Monthly Sales"));
    plot.set_layout(
        Layout::new()
            .title(format!("Months with sales ≥ 1000  (total: {total})"))
            .y_axis(plotly::layout::Axis::new().title("Sales")),
    );
    (plot,)
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn sales_pipeline<'a>(
    cat: &'a SalesCatalog,
    params: &'a SalesParams,
) -> impl Steps<PondError> + 'a {
    (
        // Node 1: filter out months below the threshold
        Node {
            name: "filter_months",
            func: |df: DataFrame, min_sales: i64| -> Result<(DataFrame,), PolarsError> {
                let mask = df.column("sales")?.i64()?.gt_eq(min_sales);
                Ok((df.filter(&mask)?,))
            },
            input: (&cat.raw_sales, &params.min_sales),
            output: (&cat.filtered_sales,),
        },
        // Node 2: sum the filtered sales
        Node {
            name: "compute_total",
            func: |df: DataFrame| {
                let total =
                    df.column("sales").unwrap().i64().unwrap().sum().unwrap_or(0);
                (total,)
            },
            input: (&cat.filtered_sales,),
            output: (&cat.total_sales,),
        },
        // Node 3: build a bar chart of the filtered monthly sales
        Node {
            name: "build_chart",
            func: build_chart,
            input: (&cat.filtered_sales, &cat.total_sales),
            output: (&cat.chart,),
        },
    )
}

App entry point

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_hooks((
        LoggingHook::new(),
        VizHook::new("http://localhost:8080".to_string()),
    ))
    .with_args(std::env::args_os())?
    .dispatch(sales_pipeline)

Pipeline visualization

Open fullscreen