Split & Join
The Dynamic Pipelines chapter showed how StepVec lets you include or exclude nodes based on params β a boolean flag controls whether a report node runs. The pipeline shape changes, but the datasets are still fixed at compile time.
Split & Join solve a different problem: the catalog determines the pipeline shape. When you have multiple items of the same kind β stores, regions, sensors β you want to run the same processing for each one, with each item getting its own datasets. The number of items comes from configuration, not code.
The pattern
A typical fan-out/fan-in pipeline looks like this:
[combined data]
β
βββββ΄ββββ Split
βΌ βΌ
[item A] [item B] Per-item processing
β β
βββββ¬ββββ Join
βΌ
[collected results]
- Split takes a
HashMap<String, T>from a single dataset and distributes each value to a per-item dataset - Per-item nodes process each item independently (and can run in parallel)
- Join collects a value from each per-item dataset back into a
HashMap<String, T>
TemplatedCatalog
The per-item datasets live in a TemplatedCatalog<S> β a collection of identically-shaped catalog structs, one per item:
#[derive(Debug, Serialize, Deserialize)]
struct StoreCatalog {
inventory: PolarsCsvDataset,
total_value: MemoryDataset<f64>,
}
#[derive(Serialize, Deserialize)]
struct Catalog {
// ...
stores: TemplatedCatalog<StoreCatalog>,
// ...
}
In YAML, a TemplatedCatalog is defined with a template and a list of names. String values containing {placeholder} are expanded per entry:
stores:
placeholder: "store"
template:
inventory:
path: "data/{store}_inventory.csv"
total_value: {}
names: [north, south, east]
This produces three StoreCatalog instances β north, south, east β each with its own file path. The placeholder field is optional and defaults to "name".
TemplatedCatalog serializes as a map, so the catalog indexer produces meaningful names like stores.north.inventory.
Split
Split is a leaf node (like Ident) that loads a HashMap<String, T> from an input dataset and saves each value to the corresponding entry in a TemplatedCatalog. A field accessor selects which dataset within each entry to write to:
Split {
name: "split_stores",
input: &cat.grouped, // MemoryDataset<HashMap<String, DataFrame>>
catalog: &cat.stores, // TemplatedCatalog<StoreCatalog>
field: |s: &StoreCatalog| &s.inventory, // which dataset to write to
}
At runtime, Split validates that the HashMap keys exactly match the catalog entry names. A mismatch produces a PondError::KeyMismatch error.
For check(), Split reports the single input dataset and all per-entry field datasets as outputs β so downstream nodes that read from those datasets are correctly validated.
Join
Join is the inverse: it loads a value from each entryβs dataset and collects them into a HashMap<String, T>:
Join {
name: "join_values",
catalog: &cat.stores, // TemplatedCatalog<StoreCatalog>
field: |s: &StoreCatalog| &s.total_value, // which dataset to read from
output: &cat.store_values, // MemoryDataset<HashMap<String, f64>>
}
For check(), Join reports all per-entry field datasets as inputs and the single output dataset as output.
Building per-item nodes
Between Split and Join, you need processing nodes for each item. Since the number of items is determined by YAML config, you build these dynamically with StepVec:
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
// Step 1: group the combined CSV into a HashMap by store.
let mut steps: StepVec<'a> = vec![
Node {
name: "group_by_store",
func: group_by_store,
input: (&cat.all_inventory,),
output: (&cat.grouped,),
}
.boxed(),
];
// Step 2: Split distributes per-store DataFrames to individual CSV files.
steps.push(
Split {
name: "split_stores",
input: &cat.grouped,
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.inventory,
}
.boxed(),
);
// Step 3: per-store processing β dynamically build a node for each store.
for (_, store) in cat.stores.iter() {
steps.push(
Node {
name: "compute_store_value",
func: compute_store_value,
input: (&store.inventory, ¶ms.low_stock_threshold),
output: (&store.total_value,),
}
.boxed(),
);
}
// Step 4: Join collects per-store totals back into a HashMap.
steps.push(
Join {
name: "join_values",
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.total_value,
output: &cat.store_values,
}
.boxed(),
);
// Step 5: build a comparison report from the joined values.
steps.push(
Node {
name: "build_report",
func: build_report,
input: (&cat.store_values,),
output: (&cat.report,),
}
.boxed(),
);
steps
}
Each call to cat.stores.iter() yields (&str, &StoreCatalog) pairs in name-insertion order. The per-store nodes reference datasets owned by each StoreCatalog entry, so they are naturally wired into the correct fan-out/fan-in structure.
Comparison with PartitionedDataset
PartitionedDataset handles a similar concept β a directory of files keyed by name β but at the dataset level. A single node reads or writes all partitions at once as a HashMap. Split & Join operate at the pipeline level: they let you run separate nodes for each item, with each item having its own arbitrarily complex set of datasets.
Use PartitionedDataset when a single node can handle all items. Use Split & Join when each item needs its own processing sub-pipeline.
Nested templates
TemplatedCatalog supports nesting. An outer template can contain an inner TemplatedCatalog with a different placeholder:
regions:
placeholder: "region"
template:
metrics:
placeholder: "metric"
template:
raw:
path: "data/{region}/{metric}/raw.csv"
names: [temperature, humidity]
names: [north, south]
This produces paths like data/north/temperature/raw.csv. The outer placeholder is substituted first, so inner templates see the expanded value.