Split/Join Pipeline
Demonstrates fan-out/fan-in patterns using TemplatedCatalog, Split, Join,
and StepVec. A combined inventory CSV is split by store into per-store files,
processed independently, then joined back into a comparison report.
Usage
cargo run --example split_join_app -- run
cargo run --example split_join_app -- check
cargo run --example split_join_app -- viz
Types
// ---------------------------------------------------------------------------
// Per-store catalog entry (expanded from YAML template)
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize, Deserialize)]
pub struct StoreCatalog {
pub inventory: PolarsCsvDataset,
pub total_value: MemoryDataset<f64>,
}
// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct Catalog {
pub all_inventory: PolarsCsvDataset,
pub grouped: MemoryDataset<HashMap<String, DataFrame>>,
pub stores: TemplatedCatalog<StoreCatalog>,
pub store_values: MemoryDataset<HashMap<String, f64>>,
pub report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
pub struct Params {
pub low_stock_threshold: Param<i64>,
}
The StoreCatalog struct is the per-entry template. Each store gets its own
inventory CSV dataset and total_value memory dataset, with file paths
expanded from a YAML template:
stores:
placeholder: "store"
template:
inventory:
path: "data/{store}_inventory.csv"
total_value: {}
names: [north, south, east]
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
/// Read the combined CSV and group rows by the "store" column.
fn group_by_store(df: DataFrame) -> Result<(HashMap<String, DataFrame>,), PolarsError> {
let store_col = df.column("store")?.str()?;
let unique: Vec<String> = store_col
.into_no_null_iter()
.map(|s| s.to_string())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let mut map = HashMap::new();
for store in &unique {
let mask = store_col.equal(store.as_str());
map.insert(store.clone(), df.filter(&mask)?);
}
Ok((map,))
}
/// Compute the total stock value for a single store.
///
/// For each row: value = quantity * unit_price.
/// Items below `low_stock_threshold` are flagged but still counted.
fn compute_store_value(df: DataFrame, threshold: i64) -> (f64,) {
let qty = df.column("quantity").unwrap().i64().unwrap();
let price = df.column("unit_price").unwrap().f64().unwrap();
let mut total = 0.0;
let mut low_stock_items = 0;
for i in 0..df.height() {
let q = qty.get(i).unwrap_or(0);
let p = price.get(i).unwrap_or(0.0);
total += q as f64 * p;
if q < threshold {
low_stock_items += 1;
}
}
if low_stock_items > 0 {
log::warn!(
"{low_stock_items} item(s) below stock threshold of {threshold}"
);
}
(total,)
}
/// Build a JSON comparison report from per-store totals.
fn build_report(store_values: HashMap<String, f64>) -> (Value,) {
let grand_total: f64 = store_values.values().sum();
let mut stores: Vec<Value> = store_values
.iter()
.map(|(name, &value)| {
json!({
"store": name,
"total_value": (value * 100.0).round() / 100.0,
"share_pct": ((value / grand_total * 10000.0).round() / 100.0),
})
})
.collect();
stores.sort_by(|a, b| {
b["total_value"]
.as_f64()
.unwrap()
.partial_cmp(&a["total_value"].as_f64().unwrap())
.unwrap()
});
let report = json!({
"grand_total": (grand_total * 100.0).round() / 100.0,
"stores": stores,
});
(report,)
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
// Step 1: group the combined CSV into a HashMap by store.
let mut steps: StepVec<'a> = vec![
Node {
name: "group_by_store",
func: group_by_store,
input: (&cat.all_inventory,),
output: (&cat.grouped,),
}
.boxed(),
];
// Step 2: Split distributes per-store DataFrames to individual CSV files.
steps.push(
Split {
name: "split_stores",
input: &cat.grouped,
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.inventory,
}
.boxed(),
);
// Step 3: per-store processing — dynamically build a node for each store.
for (_, store) in cat.stores.iter() {
steps.push(
Node {
name: "compute_store_value",
func: compute_store_value,
input: (&store.inventory, ¶ms.low_stock_threshold),
output: (&store.total_value,),
}
.boxed(),
);
}
// Step 4: Join collects per-store totals back into a HashMap.
steps.push(
Join {
name: "join_values",
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.total_value,
output: &cat.store_values,
}
.boxed(),
);
// Step 5: build a comparison report from the joined values.
steps.push(
Node {
name: "build_report",
func: build_report,
input: (&cat.store_values,),
output: (&cat.report,),
}
.boxed(),
);
steps
}
The pipeline uses StepVec because the per-store processing nodes are built
dynamically from the TemplatedCatalog entries. The flow is:
- group_by_store — reads the combined CSV and groups rows into a
HashMap<String, DataFrame> - split_stores — distributes each store’s DataFrame to its per-store CSV file
- compute_store_value (one per store) — computes total stock value from each store’s CSV
- join_values — collects per-store totals back into a
HashMap<String, f64> - build_report — produces a JSON comparison report
App entry point
fn main() -> Result<(), PondError> {
let dir = data_dir();
write_fixtures(&dir);
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((LoggingHook::new(),))
.with_args(std::env::args_os())?
.dispatch(pipeline)
}