pondrs
Pipelines of Nodes & Datasets — a Rust pipeline execution library, heavily inspired by Kedro.
Example
Define your catalog and params as structs, with datasets backed by files or memory:
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: PolarsCsvDataset,
summary: MemoryDataset<f64>,
report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
struct Params {
threshold: Param<f64>,
}
Write a pipeline function that wires nodes together through shared datasets:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
},
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
)
}
Configure your catalog and params via YAML and run with the built-in CLI:
# catalog.yml
readings:
path: data/readings.csv
separator: ","
summary: {}
report:
path: data/report.json
# params.yml
threshold: 0.5
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
$ my_app run
$ my_app run --params threshold=0.8 # override params from CLI
$ my_app check # validate pipeline DAG
$ my_app viz # interactive pipeline visualization
Pipeline visualization
A minimal pipeline
This chapter walks through the core concepts of pondrs using a minimal example. The same example appears on the introduction page — here we break it apart and explain each piece in detail.
The example reads a CSV of sensor readings, computes the mean, compares it against a threshold parameter, and writes a JSON report. The following sections explain each concept:
- Parameters — read-only values loaded from YAML (
Param<f64>) - Datasets — the
Datasettrait and the concrete types used here (PolarsCsvDataset,MemoryDataset,JsonDataset) - Catalog — the struct that groups datasets together
- Nodes — the
Nodestruct that connects a function to its input/output datasets - Steps — how nodes are composed into a sequence that the runner can execute
- App — how
Appties everything together and provides CLI dispatch
Parameters
Parameters are read-only configuration values that feed into pipeline nodes. They are defined using Param<T>, a thin wrapper that implements the Dataset trait. For more on Param, nested parameters, and struct parameters, see the Params & Catalog chapter.
In the minimal example
The Params struct holds a single threshold value:
#[derive(Serialize, Deserialize)]
struct Params {
threshold: Param<f64>,
}
Configured via YAML:
# params.yml
threshold: 0.5
The “report” node reads the threshold as one of its inputs. When Param appears in a node’s input tuple, .load() clones the inner value. Because Param::Error is Infallible, this can never fail.
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
Overriding from the CLI
When using the App framework, parameters can be overridden at runtime without editing YAML files:
$ my_app run --params threshold=0.8
Dot notation works for nested parameter structs:
$ my_app run --params model.learning_rate=0.01
See the YAML Configuration chapter for details.
Datasets
Datasets are the data abstraction in pondrs. Every piece of data flowing through a pipeline — whether it’s a CSV file, an in-memory value, or a hardware register — is a dataset.
The Dataset trait
pub trait Dataset: serde::Serialize {
type LoadItem;
type SaveItem;
type Error;
fn load(&self) -> Result<Self::LoadItem, Self::Error>;
fn save(&self, output: Self::SaveItem) -> Result<(), Self::Error>;
fn is_param(&self) -> bool { false }
}
LoadItem— the type produced when loading (e.g.DataFrame,String,f64)SaveItem— the type accepted when saving (often the same asLoadItem)Error— the error type for I/O operations. Usecore::convert::Infalliblefor datasets that never fail (likeParam)is_param()— returnstruefor read-only parameter datasets. The pipeline validator uses this to prevent writing to params.Serializesupertrait — enables automatic YAML serialization of dataset configuration for the viz and catalog indexer.
Datasets in the minimal example
The catalog uses three dataset types:
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: PolarsCsvDataset,
summary: MemoryDataset<f64>,
report: JsonDataset,
}
PolarsCsvDataset
Reads and writes CSV files as Polars DataFrames. Requires the polars feature. Configured with a file path and optional CSV options like separator:
readings:
path: data/readings.csv
separator: ","
MemoryDataset<T>
Thread-safe in-memory storage for intermediate values. Starts empty — loading before any save returns DatasetNotLoaded. Requires the std feature. Uses Arc<Mutex<Option<T>>> internally, so it works safely with the ParallelRunner.
summary: {}
JsonDataset
Reads and writes JSON files as serde_json::Value. Requires the json feature.
report:
path: data/report.json
Further reading
- Custom Datasets — how to implement your own dataset type
- List of Datasets — all built-in dataset types and their feature flags
- Error handling — how dataset errors are handled
- no_std Datasets — datasets available without the standard library
Catalog
The catalog is a plain Rust struct that groups all datasets used by a pipeline. It is not a special type — any struct that derives Serialize and Deserialize and contains dataset fields works as a catalog.
In the minimal example
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: PolarsCsvDataset,
summary: MemoryDataset<f64>,
report: JsonDataset,
}
Each field is a dataset. The field names become the dataset names used in logging, visualization, and error messages — the framework discovers them automatically via serde serialization.
YAML configuration
The catalog struct is deserialized from a YAML file. Each field maps to a YAML key, and the dataset type determines what configuration is needed:
# catalog.yml
readings:
path: data/readings.csv
separator: ","
summary: {}
report:
path: data/report.json
- File-backed datasets (like
PolarsCsvDataset,JsonDataset) need at least apath. - In-memory datasets (like
MemoryDataset) use an empty mapping{}— they have no persistent configuration. - Parameters live in a separate params struct and file, not in the catalog.
Loading the catalog
When using App::from_yaml or App::from_args, the catalog is loaded and deserialized automatically:
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
You can also load it manually:
let contents = std::fs::read_to_string("catalog.yml")?;
let catalog: Catalog = serde_yaml::from_str(&contents)?;
For nested catalogs, naming conventions, and catalog overrides, see the Params & Catalog chapter.
Nodes
A Node is a single computation unit in the pipeline. It connects a function to its input and output datasets. For a deeper look at NodeInput/NodeOutput and other details, see Pipelines & Nodes — Nodes.
Definition
pub struct Node<F, Input: NodeInput, Output: NodeOutput>
where
F: StableFn<Input::Args>,
F::Output: CompatibleOutput<Output::Output>,
{
pub name: &'static str,
pub func: F,
pub input: Input,
pub output: Output,
}
name— a human-readable label used in logging, hooks, and visualization.func— the function to execute. Can be a closure or a named function.input— a tuple of dataset references to load before callingfunc.output— a tuple of dataset references to save the return value to.
In the minimal example
The “summarize” node reads a CSV file, computes the mean, and stores it in memory:
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
},
The “report” node reads the mean and a parameter, then writes a JSON report:
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
Input and output tuples
Inputs and outputs are tuples of dataset references. The function’s arguments must match the LoadItem types of the input datasets, and its return value must match the SaveItem types of the output datasets.
// Single input, single output
input: (&cat.readings,),
output: (&cat.summary,),
// Multiple inputs, single output
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
// No inputs (side-effect node)
input: (),
output: (&cat.result,),
Tuples of up to 10 elements are supported for both inputs and outputs.
Return values
Node functions return tuples matching the output datasets. A single-output node returns a 1-tuple:
func: |x: i32| (x * 2,), // note the trailing comma
Multi-output nodes return larger tuples:
func: |x: i32| (x + 1, x * 2),
output: (&cat.incremented, &cat.doubled),
Fallible nodes
Node functions can return Result to signal errors:
Node {
name: "parse",
func: |text: String| -> Result<(i32,), PondError> {
let n = text.trim().parse::<i32>().map_err(|e| PondError::Custom(e.to_string()))?;
Ok((n,))
},
input: (&cat.raw_text,),
output: (&cat.parsed_value,),
}
The CompatibleOutput trait allows both bare tuples and Result<tuple, E> as return types. Type mismatches between the function return and the output datasets are caught at compile time.
See Node Errors for more details.
Type safety
The Node struct uses compile-time checks to ensure:
- The function’s argument types match the input datasets’
LoadItemtypes - The function’s return type matches the output datasets’
SaveItemtypes - Mismatches produce compile errors at node construction, not at runtime
Steps
Steps are how nodes are composed into a sequence that the runner can execute. A pipeline function returns an impl Steps<E>, which is implemented for tuples of nodes (and pipelines). Tuples of up to 10 elements are supported.
In the minimal example
The pipeline function returns a tuple of two nodes. This tuple automatically implements Steps<PondError>:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
},
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
},
)
}
The tuple ordering defines the sequential execution order. The SequentialRunner executes nodes in this order; the ParallelRunner uses dependency analysis to run independent nodes concurrently, but still respects data dependencies.
Composing steps
Steps are just tuples, so you compose them by adding elements:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node { name: "step1", func: |x| (x,), input: (¶ms.x,), output: (&cat.a,) },
Node { name: "step2", func: |a| (a + 1,), input: (&cat.a,), output: (&cat.b,) },
Node { name: "step3", func: |b| (b * 2,), input: (&cat.b,), output: (&cat.c,) },
)
}
For grouping related nodes with declared contracts, see Pipeline.
Validation
StepInfo::check() validates the pipeline structure without executing it:
- No node reads a dataset before it is produced by an earlier node
- No dataset is produced by more than one node
- Parameters are never written to
let steps = pipeline(&catalog, ¶ms);
steps.check()?; // returns Result<(), CheckError>
See Check for details.
The pipeline function
The function that creates steps must be a named function with an explicit lifetime, not a closure:
// Correct: named function with tied lifetime
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(/* nodes */)
}
// Wrong: closures desugar into two independent lifetimes
let pipeline = |cat: &Catalog, params: &Params| { /* ... */ };
This is because the PipelineFn trait uses a lifetime-on-trait pattern that requires both references to share the same lifetime 'a. Named functions with explicit <'a> satisfy this; closures do not.
App
The App struct ties everything together — catalog, params, hooks, and runners — and provides CLI dispatch for running, validating, and visualizing your pipeline.
In the minimal example
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
App::from_yaml loads the catalog and params from YAML files, then with_args parses CLI arguments for subcommand selection and parameter overrides. Finally, dispatch runs the appropriate subcommand (run, check, or viz).
Subcommands
$ my_app run # execute the pipeline
$ my_app run --params threshold=0.8 # override params from CLI
$ my_app check # validate pipeline DAG
$ my_app viz # interactive pipeline visualization
For the full details on the App struct, initialization options, YAML configuration, and subcommands, see the App chapter.
Params & Catalog
This chapter takes a deeper look at the two data-holding structs that every pipeline uses: the params struct for read-only configuration and the catalog struct for datasets.
For the basics of how these are used in a pipeline, see A minimal pipeline. For YAML file loading and CLI overrides, see YAML Configuration.
- Parameters —
Param<T>, nested parameter structs, and struct parameters - Catalog — organizing datasets, nested catalogs, and naming conventions
Parameters
Parameters are read-only configuration values that feed into pipeline nodes. They are defined using Param<T>, a thin wrapper that implements the Dataset trait.
How Param works
#[derive(Debug, Serialize, Deserialize)]
pub struct Param<T: Clone>(pub T);
Param implements Dataset with:
LoadItem = T— returns a clone of the inner valueSaveItem = ()— writing is forbidden; the pipeline validator rejects any node that writes to aParamError = Infallible— loading always succeeds
Because is_param() returns true, the viz dashboard and pipeline check treat parameters differently from regular datasets.
Nested parameters
Parameters can be organized into nested structs for clarity. Each struct level must derive Serialize and Deserialize:
#[derive(Serialize, Deserialize)]
struct ModelParams {
learning_rate: Param<f64>,
epochs: Param<u32>,
}
#[derive(Serialize, Deserialize)]
struct Params {
model: ModelParams,
verbose: Param<bool>,
}
# params.yml
model:
learning_rate: 0.01
epochs: 100
verbose: true
Nested parameters can be overridden from the CLI with dot notation:
$ my_app run --params model.learning_rate=0.001
Struct parameters
Param<T> works with any T: Clone + Serialize + Deserialize, including custom structs:
#[derive(Clone, Debug, Serialize, Deserialize)]
struct BaselinePeriod {
start_month: u32,
end_month: u32,
}
#[derive(Serialize, Deserialize)]
struct Params {
baseline: Param<BaselinePeriod>,
}
baseline:
start_month: 1
end_month: 12
When a node loads this parameter, it receives the full BaselinePeriod struct as its argument.
no_std
Param is available in no_std environments — it requires no feature flags and uses no allocation. Only T: Clone + Serialize is needed.
Catalog
The catalog is a plain Rust struct that groups all datasets used by a pipeline. Any struct that derives Serialize and Deserialize and contains dataset fields works as a catalog — there is no special trait to implement.
Dataset fields
Each field in the catalog is a dataset. The field names become the dataset names used in logging, visualization, and error messages. The framework discovers them automatically via serde serialization.
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: PolarsCsvDataset,
summary: MemoryDataset<f64>,
report: JsonDataset,
}
Nested catalogs
Catalogs can nest other structs for organization. This is useful when a pipeline has many datasets that fall into logical groups:
#[derive(Serialize, Deserialize)]
struct InputData {
raw_readings: PolarsCsvDataset,
reference: YamlDataset,
}
#[derive(Serialize, Deserialize)]
struct Catalog {
input: InputData,
output: JsonDataset,
intermediate: MemoryDataset<f64>,
}
The discovered dataset names use dot-separated paths: input.raw_readings, input.reference, output, and intermediate. These names appear in logs, hooks, and the viz dashboard.
The corresponding YAML mirrors the nesting:
input:
raw_readings:
path: data/raw.csv
reference:
path: data/ref.yml
output:
path: data/output.json
intermediate: {}
Naming conventions
The framework uses serde struct names to distinguish leaf datasets from container structs:
- Types whose serde name ends with
"Dataset"are treated as leaf datasets — the indexer stops recursing Paramis treated as a leaf by name- All other struct names are treated as containers — the indexer recurses into their fields
This means custom dataset types should follow the *Dataset naming convention (e.g. TextDataset, MyCustomDataset). Container structs like nested catalogs or parameter groups must not end with “Dataset”.
Catalog overrides
Dataset configuration can be overridden from the CLI using dot notation:
$ my_app run --catalog output.path=/tmp/result.json
$ my_app run --catalog input.raw_readings.separator=";"
See YAML Configuration for the full details on overrides.
Pipelines & Nodes
This chapter covers nodes and pipelines in more depth — the NodeInput/NodeOutput traits, the Pipeline struct for grouping nodes, and pipeline validation.
- Nodes —
NodeInput,NodeOutput,CompatibleOutput, and side-effect nodes - Pipeline — the
Pipelinestruct for grouping nodes with input/output contracts - Dynamic Pipelines — runtime step composition with
StepVec - Split & Join — fan-out/fan-in patterns with
TemplatedCatalog - Check — runtime validation of pipeline structure
Nodes
This page covers the Node struct in more depth. For the basics, see A minimal pipeline — Nodes.
NodeInput and NodeOutput traits
These traits handle the mechanics of loading from and saving to dataset tuples:
pub trait NodeInput {
type Args;
fn load_data(&self, on_event: ...) -> Result<Self::Args, PondError>;
}
pub trait NodeOutput {
type Output;
fn save_data(&self, output: Self::Output, on_event: ...) -> Result<(), PondError>;
}
They are implemented for tuples of dataset references (up to 10 elements) via macros. During execution, load_data fires BeforeLoad/AfterLoad events for each dataset, and save_data fires BeforeSave/AfterSave events — these drive the hook system.
CompatibleOutput
The CompatibleOutput trait is what allows node functions to return either bare tuples or Result<tuple, E>:
// Bare tuple — infallible node
func: |x: i32| (x * 2,),
// Result — fallible node
func: |x: i32| -> Result<(i32,), MyError> { Ok((x * 2,)) },
The bound F::Output: CompatibleOutput<Output::Output> on the Node struct catches type mismatches at node construction time, before the pipeline error type E is known. This means you get a compile error immediately if the function’s return type doesn’t match the output datasets.
IntoNodeResult
When a node is called at runtime, IntoNodeResult normalizes the function’s return value into Result<O, E>:
- A bare tuple
ObecomesOk(O) - A
Result<O, E>is passed through as-is
This is what allows runners to handle both fallible and infallible nodes uniformly.
Side-effect nodes
Nodes with no outputs are useful for logging, sending notifications, or other side effects:
Node {
name: "log_summary",
func: |summary: f64| {
println!("Summary: {summary}");
},
input: (&cat.summary,),
output: (),
}
A node with output: () does not save any datasets. The function’s return value (unit ()) is discarded.
Similarly, a node with input: () takes no arguments and produces outputs from scratch.
Pipeline
The Pipeline struct groups related steps into a named container with declared input/output dataset contracts.
Definition
pub struct Pipeline<S: StepInfo, Input: NodeInput, Output: NodeOutput> {
pub name: &'static str,
pub steps: S,
pub input: Input,
pub output: Output,
}
name— label for logging, hooks, and visualizationsteps— a tuple of nodes (and/or nested pipelines)input— datasets this pipeline expects to be available when it runsoutput— datasets this pipeline guarantees to produce
Pipelines are containers — runners never call them directly. Instead, they recurse into the pipeline’s steps. The PipelineInfo::is_leaf() method returns false for pipelines, signaling the runner to descend into children.
Example
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node { name: "load", func: |v| (v,), input: (¶ms.input,), output: (&cat.raw,) },
Pipeline {
name: "processing",
steps: (
Node { name: "clean", func: |v| (v,), input: (&cat.raw,), output: (&cat.clean,) },
Node { name: "transform", func: |v| (v * 2,), input: (&cat.clean,), output: (&cat.result,) },
),
input: (&cat.raw,),
output: (&cat.result,),
},
)
}
Input/output contracts
The input and output declarations are validated by check():
- Every declared input must be consumed by at least one child node
- Every declared output must be produced by at least one child node
If these contracts are violated, check() returns CheckError::UnusedPipelineInput or CheckError::UnproducedPipelineOutput.
Nesting
Pipelines can be nested arbitrarily. The validator and runners recurse through the tree:
Pipeline {
name: "outer",
steps: (
Pipeline {
name: "inner",
steps: (/* nodes */),
input: (/* ... */),
output: (/* ... */),
},
Node { /* ... */ },
),
input: (/* ... */),
output: (/* ... */),
}
Hooks and visualization
Pipeline boundaries fire their own hook events (before_pipeline_run, after_pipeline_run, on_pipeline_error) distinct from node events. In the visualization, pipelines appear as expandable containers that group their child nodes.
When to use Pipeline vs flat tuples
Use a flat tuple of nodes when the pipeline is simple and linear. Use Pipeline when you want to:
- Name a group of related steps for logging and visualization
- Declare input/output contracts that are validated by
check() - Organize large pipelines into logical sections
Dynamic Pipelines
Sometimes the set of steps in a pipeline isn’t known at compile time. A config flag might enable or disable a step, or the number of steps might depend on runtime data. StepVec provides a type-erased, heap-allocated step container for these cases.
StepVec
pub type StepVec<'a, E = PondError> = Vec<Box<dyn RunnableStep<E> + Send + Sync + 'a>>;
It implements StepInfo and Steps<E>, so it works everywhere tuples do — as the return type of a pipeline function, as the steps field of a Pipeline, and with check(), runners, and visualization.
Use RunnableStep::boxed() to convert a Node or Pipeline into a boxed trait object:
let step: Box<dyn RunnableStep<PondError> + Send + Sync + 'a> =
Node { name: "n", func: |v| (v,), input: (&a,), output: (&b,) }.boxed();
Conditional nodes
The primary use case is including or excluding nodes based on runtime configuration:
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
let mut steps = vec![
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
}
.boxed(),
];
if params.include_report.0 {
steps.push(
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
}
.boxed(),
);
}
steps
}
The pipeline function returns StepVec<'a> instead of impl Steps<PondError> + 'a. Each node is .boxed() before being added to the vec, and conditional nodes are pushed with if.
Nesting inside static pipelines
StepVec can be used as the steps of a Pipeline, which can itself be placed in a static tuple:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
let dynamic_section = Pipeline {
name: "optional_reports",
steps: {
let mut s: StepVec<'a> = vec![
Node { name: "base_report", ... }.boxed(),
];
if params.detailed.0 {
s.push(Node { name: "detailed_report", ... }.boxed());
}
s
},
input: (&cat.summary,),
output: (&cat.report,),
};
(
Node { name: "summarize", ... },
dynamic_section,
)
}
This lets you keep type safety for the fixed parts of your pipeline and only pay for dynamic dispatch where you need it.
Validation
check() works identically for StepVec and tuple-based pipelines — it iterates the items and validates sequential ordering, duplicate outputs, and pipeline contracts. Since StepVec is built at runtime, validation applies to the constructed pipeline only, not hypothetical alternatives. An excluded conditional node won’t be checked.
If a StepVec is wrapped in a Pipeline with declared outputs, those outputs must be produced by the nodes that are actually present. An empty StepVec in a Pipeline that declares outputs will correctly fail with UnproducedPipelineOutput.
When to use StepVec vs tuples
Use tuples (the default) when all nodes are known at compile time. You get zero-cost dispatch and full type checking.
Use StepVec when you need:
- Conditional inclusion/exclusion of nodes based on config or params
- A variable number of nodes determined at runtime
- Nodes of heterogeneous types that can’t be expressed in a single tuple
Split & Join
The Dynamic Pipelines chapter showed how StepVec lets you include or exclude nodes based on params — a boolean flag controls whether a report node runs. The pipeline shape changes, but the datasets are still fixed at compile time.
Split & Join solve a different problem: the catalog determines the pipeline shape. When you have multiple items of the same kind — stores, regions, sensors — you want to run the same processing for each one, with each item getting its own datasets. The number of items comes from configuration, not code.
The pattern
A typical fan-out/fan-in pipeline looks like this:
[combined data]
│
┌───┴───┐ Split
▼ ▼
[item A] [item B] Per-item processing
│ │
└───┬───┘ Join
▼
[collected results]
- Split takes a
HashMap<String, T>from a single dataset and distributes each value to a per-item dataset - Per-item nodes process each item independently (and can run in parallel)
- Join collects a value from each per-item dataset back into a
HashMap<String, T>
TemplatedCatalog
The per-item datasets live in a TemplatedCatalog<S> — a collection of identically-shaped catalog structs, one per item:
#[derive(Debug, Serialize, Deserialize)]
struct StoreCatalog {
inventory: PolarsCsvDataset,
total_value: MemoryDataset<f64>,
}
#[derive(Serialize, Deserialize)]
struct Catalog {
// ...
stores: TemplatedCatalog<StoreCatalog>,
// ...
}
In YAML, a TemplatedCatalog is defined with a template and a list of names. String values containing {placeholder} are expanded per entry:
stores:
placeholder: "store"
template:
inventory:
path: "data/{store}_inventory.csv"
total_value: {}
names: [north, south, east]
This produces three StoreCatalog instances — north, south, east — each with its own file path. The placeholder field is optional and defaults to "name".
TemplatedCatalog serializes as a map, so the catalog indexer produces meaningful names like stores.north.inventory.
Split
Split is a leaf node (like Ident) that loads a HashMap<String, T> from an input dataset and saves each value to the corresponding entry in a TemplatedCatalog. A field accessor selects which dataset within each entry to write to:
Split {
name: "split_stores",
input: &cat.grouped, // MemoryDataset<HashMap<String, DataFrame>>
catalog: &cat.stores, // TemplatedCatalog<StoreCatalog>
field: |s: &StoreCatalog| &s.inventory, // which dataset to write to
}
At runtime, Split validates that the HashMap keys exactly match the catalog entry names. A mismatch produces a PondError::KeyMismatch error.
For check(), Split reports the single input dataset and all per-entry field datasets as outputs — so downstream nodes that read from those datasets are correctly validated.
Join
Join is the inverse: it loads a value from each entry’s dataset and collects them into a HashMap<String, T>:
Join {
name: "join_values",
catalog: &cat.stores, // TemplatedCatalog<StoreCatalog>
field: |s: &StoreCatalog| &s.total_value, // which dataset to read from
output: &cat.store_values, // MemoryDataset<HashMap<String, f64>>
}
For check(), Join reports all per-entry field datasets as inputs and the single output dataset as output.
Building per-item nodes
Between Split and Join, you need processing nodes for each item. Since the number of items is determined by YAML config, you build these dynamically with StepVec:
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
// Step 1: group the combined CSV into a HashMap by store.
let mut steps: StepVec<'a> = vec![
Node {
name: "group_by_store",
func: group_by_store,
input: (&cat.all_inventory,),
output: (&cat.grouped,),
}
.boxed(),
];
// Step 2: Split distributes per-store DataFrames to individual CSV files.
steps.push(
Split {
name: "split_stores",
input: &cat.grouped,
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.inventory,
}
.boxed(),
);
// Step 3: per-store processing — dynamically build a node for each store.
for (_, store) in cat.stores.iter() {
steps.push(
Node {
name: "compute_store_value",
func: compute_store_value,
input: (&store.inventory, ¶ms.low_stock_threshold),
output: (&store.total_value,),
}
.boxed(),
);
}
// Step 4: Join collects per-store totals back into a HashMap.
steps.push(
Join {
name: "join_values",
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.total_value,
output: &cat.store_values,
}
.boxed(),
);
// Step 5: build a comparison report from the joined values.
steps.push(
Node {
name: "build_report",
func: build_report,
input: (&cat.store_values,),
output: (&cat.report,),
}
.boxed(),
);
steps
}
Each call to cat.stores.iter() yields (&str, &StoreCatalog) pairs in name-insertion order. The per-store nodes reference datasets owned by each StoreCatalog entry, so they are naturally wired into the correct fan-out/fan-in structure.
Comparison with PartitionedDataset
PartitionedDataset handles a similar concept — a directory of files keyed by name — but at the dataset level. A single node reads or writes all partitions at once as a HashMap. Split & Join operate at the pipeline level: they let you run separate nodes for each item, with each item having its own arbitrarily complex set of datasets.
Use PartitionedDataset when a single node can handle all items. Use Split & Join when each item needs its own processing sub-pipeline.
Nested templates
TemplatedCatalog supports nesting. An outer template can contain an inner TemplatedCatalog with a different placeholder:
regions:
placeholder: "region"
template:
metrics:
placeholder: "metric"
template:
raw:
path: "data/{region}/{metric}/raw.csv"
names: [temperature, humidity]
names: [north, south]
This produces paths like data/north/temperature/raw.csv. The outer placeholder is substituted first, so inner templates see the expanded value.
Check
Pipeline validation catches structural errors before execution. The check() method on StepInfo walks the pipeline and verifies several invariants.
Usage
let steps = pipeline(&catalog, ¶ms);
steps.check()?; // Result<(), CheckError>
Or via the CLI:
$ my_app check
Pipeline is valid.
What check validates
Sequential ordering
A node must not read a dataset that is only produced by a later node. Datasets that no node produces are treated as external inputs (valid).
// Valid: n1 produces a, n2 reads a
(
Node { name: "n1", input: (¶m,), output: (&a,), .. },
Node { name: "n2", input: (&a,), output: (&b,), .. },
)
// Invalid: n1 reads b, but b is produced by n2
(
Node { name: "n1", input: (&b,), output: (&a,), .. },
Node { name: "n2", input: (¶m,), output: (&b,), .. },
)
// → CheckError::InputNotProduced { node_name: "n1", .. }
No duplicate outputs
A dataset must not be produced by more than one node:
(
Node { name: "n1", input: (¶m,), output: (&a,), .. },
Node { name: "n2", input: (¶m,), output: (&a,), .. }, // same output!
)
// → CheckError::DuplicateOutput { node_name: "n2", .. }
Params are read-only
No node may write to a Param dataset:
(
Node { name: "n1", func: || ((),), input: (), output: (¶m,) },
)
// → CheckError::ParamWritten { node_name: "n1", .. }
Pipeline contracts
For Pipeline structs, the declared inputs and outputs must match what the children actually consume and produce. See Pipeline.
CheckError variants
| Variant | Meaning |
|---|---|
InputNotProduced | Node reads a dataset produced by a later node |
DuplicateOutput | Two nodes produce the same dataset |
ParamWritten | A node writes to a Param |
UnusedPipelineInput | Pipeline declares an input its children don’t consume |
UnproducedPipelineOutput | Pipeline declares an output its children don’t produce |
CapacityExceeded | Internal dataset buffer overflow (see below) |
Capacity
check() uses a fixed-capacity buffer (default 20 datasets) for no_std compatibility. If your pipeline has more than 20 unique datasets, use check_with_capacity:
steps.check_with_capacity::<64>()?;
no_std compatibility
check() works in no_std environments. It uses no allocation — all dataset tracking is done in fixed-size arrays on the stack.
Error handling
pondrs uses Rust’s standard Result type for error propagation. The framework provides PondError for infrastructure errors, while letting you define your own error type for domain-specific failures.
- Error Type —
PondError, custom error types, and theFrom<PondError>requirement - Node Errors — how fallible node functions propagate errors
- Dataset Errors — the
Dataset::Errorassociated type and adding custom error variants
Error Type
PondError
PondError is the framework-level error type. It covers infrastructure failures like I/O errors, serialization errors, and dataset-not-loaded conditions:
pub enum PondError {
#[cfg(feature = "std")] Io(std::io::Error),
#[cfg(feature = "polars")] Polars(polars::error::PolarsError),
#[cfg(feature = "yaml")] YamlScan(yaml_rust2::ScanError),
#[cfg(feature = "yaml")] YamlEmit(yaml_rust2::EmitError),
#[cfg(feature = "std")] SerdeYaml(serde_yaml::Error),
#[cfg(any(feature = "json", feature = "plotly", feature = "viz"))]
Json(serde_json::Error),
#[cfg(feature = "image")] Image(image::ImageError),
DatasetNotLoaded, // always available (no_std)
RunnerNotFound,
CheckFailed,
#[cfg(feature = "std")] LockPoisoned(String),
#[cfg(feature = "std")] Custom(String),
}
Variants are feature-gated — only DatasetNotLoaded, RunnerNotFound, and CheckFailed are available in no_std builds.
Using PondError directly
For simple pipelines, you can use PondError as your pipeline error type:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(/* nodes */)
}
This works because PondError trivially satisfies From<PondError>.
Custom error types
When you need domain-specific error variants, define your own error enum with a From<PondError> conversion:
#[derive(Debug, thiserror::Error)]
enum MyError {
#[error(transparent)]
Pond(#[from] PondError),
#[error("validation failed: {0}")]
Validation(String),
#[error("threshold exceeded: {value} > {max}")]
ThresholdExceeded { value: f64, max: f64 },
}
The #[from] attribute on the PondError variant provides the required From<PondError> implementation. Your pipeline function then uses MyError as its error type:
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<MyError> + 'a {
(/* nodes that can return MyError */)
}
The From<PondError> requirement
The pipeline error type E must satisfy E: From<PondError>. This is how the framework converts dataset I/O errors and infrastructure failures into your pipeline’s error type. Without this conversion, dataset load() and save() calls couldn’t propagate errors through your nodes.
Adding variants for custom datasets
If you implement a custom dataset whose Error type is not already covered by PondError, you have two choices:
-
Use
PondError::Custom(simplest): convert your error to a string.impl Dataset for MyDataset { type Error = PondError; fn load(&self) -> Result<Self::LoadItem, PondError> { do_something().map_err(|e| PondError::Custom(e.to_string())) } } -
Add a variant to your pipeline error type: keep the original error type in your custom dataset and convert it in your pipeline error enum.
impl Dataset for MyDataset { type Error = MyDatasetError; // ... } #[derive(Debug, thiserror::Error)] enum MyError { #[error(transparent)] Pond(#[from] PondError), #[error(transparent)] MyDataset(#[from] MyDatasetError), }This requires
PondError: From<MyDatasetError>or using your custom pipeline error typeEwhereE: From<PondError> + From<MyDatasetError>. See Dataset Errors for the full pattern.
Node Errors
Node functions can be either infallible (returning a bare tuple) or fallible (returning Result).
Infallible nodes
The simplest nodes return a bare tuple. They cannot fail:
Node {
name: "double",
func: |x: i32| (x * 2,),
input: (¶ms.x,),
output: (&cat.doubled,),
}
Fallible nodes
Nodes that can fail return Result<(outputs...), E> where E is the pipeline error type:
Node {
name: "parse",
func: |text: String| -> Result<(i32,), PondError> {
let n = text.trim().parse::<i32>()
.map_err(|e| PondError::Custom(e.to_string()))?;
Ok((n,))
},
input: (&cat.raw_text,),
output: (&cat.parsed,),
}
How it works: IntoNodeResult
The IntoNodeResult trait normalizes both bare tuples and Result returns into Result<O, E>:
pub trait IntoNodeResult<O, E> {
fn into_node_result(self) -> Result<O, E>;
}
- For bare tuples
O: wraps inOk(value) - For
Result<O, E>: passes through unchanged
This means the same Node struct works for both infallible and fallible functions.
CompatibleOutput
The CompatibleOutput trait ensures at compile time that the function’s return type matches the output datasets. It accepts both:
O(bare tuple) — direct matchResult<O, E>— theOktype must match
If the types don’t match, you get a compile error when constructing the Node, not at runtime.
Error propagation
When a fallible node returns Err(e):
- The runner catches the error
- The
on_node_errorhook fires with the error message - If the node is inside a
Pipeline,on_pipeline_errorfires for each ancestor - The runner stops executing and returns the error
Using custom error types
With a custom error type, nodes can return domain-specific errors:
#[derive(Debug, thiserror::Error)]
enum MyError {
#[error(transparent)]
Pond(#[from] PondError),
#[error("value {0} exceeds limit")]
TooLarge(f64),
}
Node {
name: "validate",
func: |value: f64| -> Result<(f64,), MyError> {
if value > 100.0 {
return Err(MyError::TooLarge(value));
}
Ok((value,))
},
input: (&cat.raw,),
output: (&cat.validated,),
}
Dataset Errors
Each dataset declares its own Error associated type. The framework needs to convert these errors into the pipeline’s error type E via PondError.
The conversion chain
When a node loads or saves a dataset, errors flow through this chain:
Dataset::Error → PondError → E (pipeline error type)
The first conversion (Dataset::Error → PondError) is required by the NodeInput and NodeOutput trait bounds: PondError: From<D::Error>. The second conversion (PondError → E) is required by the runner: E: From<PondError>.
Built-in dataset errors
All built-in datasets use error types that already have From implementations into PondError:
| Dataset | Error type | PondError variant |
|---|---|---|
Param<T> | Infallible | (never fails) |
CellDataset<T> | PondError | direct |
MemoryDataset<T> | PondError | direct |
PolarsCsvDataset | PondError | Polars, Io |
JsonDataset | PondError | Json, Io |
TextDataset | PondError | Io |
YamlDataset | PondError | YamlScan, Io |
CacheDataset<D> | PondError | wraps inner |
Custom datasets with PondError
The simplest approach for custom datasets is to use PondError directly as the error type:
impl Dataset for MyDataset {
type LoadItem = MyData;
type SaveItem = MyData;
type Error = PondError;
fn load(&self) -> Result<MyData, PondError> {
let bytes = std::fs::read(&self.path)?; // Io variant via From
parse_my_format(&bytes)
.map_err(|e| PondError::Custom(e.to_string()))
}
fn save(&self, data: MyData) -> Result<(), PondError> {
let bytes = serialize_my_format(&data)
.map_err(|e| PondError::Custom(e.to_string()))?;
std::fs::write(&self.path, bytes)?;
Ok(())
}
}
Custom datasets with their own error type
If you want to preserve error type information, your dataset can use its own error type. You then need PondError: From<YourError>:
#[derive(Debug, thiserror::Error)]
pub enum MyDatasetError {
#[error("parse error at line {line}: {msg}")]
Parse { line: usize, msg: String },
#[error("io error: {0}")]
Io(#[from] std::io::Error),
}
impl Dataset for MyDataset {
type LoadItem = MyData;
type SaveItem = MyData;
type Error = MyDatasetError;
// ...
}
Since PondError doesn’t have a variant for every possible custom error, you typically handle this through your pipeline error type:
#[derive(Debug, thiserror::Error)]
enum PipelineError {
#[error(transparent)]
Pond(#[from] PondError),
#[error(transparent)]
MyDataset(#[from] MyDatasetError),
}
However, the NodeInput/NodeOutput bounds require PondError: From<MyDatasetError>. If this conversion doesn’t exist, the dataset won’t compile as a node input/output. The easiest solution is to use PondError as your dataset’s error type and use PondError::Custom for the domain-specific cases, or to implement the From conversion yourself.
Infallible datasets
Datasets that never fail (like Param) use core::convert::Infallible:
impl Dataset for Param<T> {
type Error = Infallible;
fn load(&self) -> Result<T, Infallible> { Ok(self.0.clone()) }
}
PondError implements From<Infallible> (via the match x {} pattern), so this works with any pipeline error type.
Hooks
Hooks let you observe pipeline execution events without modifying the pipeline itself. They are used for logging, timing, visualization, and custom instrumentation.
The Hook trait
pub trait Hook: Sync {
// Pipeline lifecycle
fn before_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn on_pipeline_error(&self, p: &dyn PipelineInfo, error: &str) {}
// Node lifecycle
fn before_node_run(&self, n: &dyn PipelineInfo) {}
fn after_node_run(&self, n: &dyn PipelineInfo) {}
fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {}
// Dataset lifecycle (fired per-dataset during Node::call)
fn before_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn before_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
}
All methods have default no-op implementations, so you only override the ones you care about.
The Hooks trait
Multiple hooks are composed as a tuple:
pub trait Hooks: Sync {
fn for_each_hook(&self, f: &mut dyn FnMut(&dyn Hook));
}
Implemented for tuples of up to 10 hooks, plus () (no hooks):
// No hooks
App::new(catalog, params).execute(pipeline)
// One hook
App::new(catalog, params)
.with_hooks((LoggingHook::new(),))
.execute(pipeline)
// Multiple hooks
App::new(catalog, params)
.with_hooks((LoggingHook::new(), my_custom_hook))
.execute(pipeline)
Hook arguments
All hook methods receive &dyn PipelineInfo, which provides:
name()— the node or pipeline name (&'static str)is_leaf()—truefor nodes,falsefor pipelinestype_string()— the Rust type name of the underlying functionfor_each_input()/for_each_output()— iterate over dataset references
Dataset hook methods additionally receive &DatasetRef, which provides:
id— a unique identifier (pointer-based)name— anOption<&str>with the resolved dataset name (from the catalog indexer, available when usingstd)meta—&dyn DatasetMetawithis_param(),type_string(), and (withstd)html()andyaml()
Sync requirement
Hooks must be Sync because the ParallelRunner calls hook methods from multiple threads. For hooks that need interior mutability (e.g. to track timing), use thread-safe types like Mutex or DashMap.
This chapter covers:
- Dataset Hooks — hooks for dataset load/save events
- Node Hooks — hooks for node execution events
- Pipeline Hooks — hooks for pipeline lifecycle events
- Built-in Hooks —
LoggingHookandVizHook
Dataset Hooks
Dataset hooks fire during the load and save operations inside Node::call(). Each hook receives the owning node and the dataset reference.
Methods
fn before_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn before_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
Arguments
n— the node that is loading/saving the dataset. Usen.name()to get the node name.ds— the dataset reference:ds.id— unique pointer-based identifierds.name— resolved name from the catalog (e.g.Some("readings")), orNoneinno_stdds.meta.is_param()— whether this is a parameter datasetds.meta.type_string()— the Rust type name (e.g."pondrs::datasets::memory::MemoryDataset<f64>")ds.meta.html()— (stdonly) returns an optional HTML snippet for the dataset’s current contents. Datasets likePlotlyDatasetoverride this to produce rich visualizations; file-backed datasets render their contents as formatted text. Used by the viz dashboard.ds.meta.yaml()— (stdonly) returns the dataset’s configuration serialized as YAML. This is produced automatically via theSerializesupertrait and is used by the viz dashboard to display dataset settings.
Firing order
For a node with two inputs and one output, hooks fire in this order:
before_dataset_loaded(input 0)after_dataset_loaded(input 0)before_dataset_loaded(input 1)after_dataset_loaded(input 1)- node function executes
before_dataset_saved(output 0)after_dataset_saved(output 0)
Dataset hooks fire inside the before_node_run / after_node_run window. The sequence is always: before_node_run → dataset loads → function call → dataset saves → after_node_run.
Example: tracking I/O time
use std::sync::Mutex;
use std::collections::HashMap;
use std::time::Instant;
struct IoTimingHook {
starts: Mutex<HashMap<usize, Instant>>,
}
impl Hook for IoTimingHook {
fn before_dataset_loaded(&self, _n: &dyn PipelineInfo, ds: &DatasetRef) {
self.starts.lock().unwrap().insert(ds.id, Instant::now());
}
fn after_dataset_loaded(&self, _n: &dyn PipelineInfo, ds: &DatasetRef) {
if let Some(start) = self.starts.lock().unwrap().remove(&ds.id) {
let name = ds.name.unwrap_or("<unknown>");
println!(" loaded {} in {:.1}ms", name, start.elapsed().as_secs_f64() * 1000.0);
}
}
}
Name resolution
In std builds, the runner resolves dataset names from the catalog indexer before dispatching to hooks. This is why ds.name is Option<&str> — it’s Some("readings") when the catalog indexer can map the pointer to a field name, and None in no_std builds where the indexer isn’t available.
Node Hooks
Node hooks fire when a runner starts and finishes executing a node, or when a node encounters an error.
Methods
fn before_node_run(&self, n: &dyn PipelineInfo) {}
fn after_node_run(&self, n: &dyn PipelineInfo) {}
fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {}
Arguments
n— the node being executed. Usen.name()for the node name,n.type_string()for the function’s type name.error(onon_node_error) — the stringified error message. Instdbuilds this ise.to_string(); inno_stdit’s the fixed string"node error".
Lifecycle
For a successful node execution:
before_node_run(n)
before_dataset_loaded(n, ds0)
after_dataset_loaded(n, ds0)
... (function executes) ...
before_dataset_saved(n, ds_out)
after_dataset_saved(n, ds_out)
after_node_run(n)
For a failed node:
before_node_run(n)
... (error occurs during load, function, or save) ...
on_node_error(n, "error message")
after_node_run and on_node_error are mutually exclusive — exactly one fires per node execution.
Example: counting nodes
use std::sync::atomic::{AtomicUsize, Ordering};
struct NodeCounter {
count: AtomicUsize,
}
impl Hook for NodeCounter {
fn after_node_run(&self, n: &dyn PipelineInfo) {
let i = self.count.fetch_add(1, Ordering::Relaxed) + 1;
println!("Completed node {} ({}/total)", n.name(), i);
}
fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {
eprintln!("Node {} failed: {}", n.name(), error);
}
}
Parallel runner behavior
With the ParallelRunner, node hooks may fire from different threads concurrently. This is why Hook: Sync is required. Use atomic types or Mutex for any shared state in your hook.
Pipeline Hooks
Pipeline hooks fire at the boundaries of Pipeline structs (not flat tuples). They are useful for tracking the lifecycle of logical groups of nodes.
Methods
fn before_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn on_pipeline_error(&self, p: &dyn PipelineInfo, error: &str) {}
Arguments
p— the pipeline being executed.p.name()returns the pipeline’s name,p.is_leaf()returnsfalse.error— the stringified error message from the failing node within the pipeline.
Sequential runner behavior
The SequentialRunner fires pipeline hooks as it enters and exits Pipeline structs:
before_pipeline_run("processing")
before_node_run("clean")
after_node_run("clean")
before_node_run("transform")
after_node_run("transform")
after_pipeline_run("processing")
If a child node fails, on_pipeline_error fires instead of after_pipeline_run.
Parallel runner behavior
The ParallelRunner fires pipeline hooks based on dataset availability:
before_pipeline_runfires when all of the pipeline’s declared inputs are availableafter_pipeline_runfires when all of the pipeline’s declared outputs have been producedon_pipeline_errorfires when a child node fails — it propagates up through all ancestor pipelines
This means pipeline hooks may fire at different times than in sequential execution, since nodes may run in a different order.
Flat tuples vs Pipeline structs
Pipeline hooks only fire for Pipeline structs. A flat tuple of nodes at the top level does not trigger before_pipeline_run / after_pipeline_run. If you want pipeline-level hooks for your entire pipeline, wrap the top-level steps in a Pipeline.
Example: timing pipelines
struct PipelineTimer {
timings: Mutex<HashMap<&'static str, Instant>>,
}
impl Hook for PipelineTimer {
fn before_pipeline_run(&self, p: &dyn PipelineInfo) {
self.timings.lock().unwrap().insert(p.name(), Instant::now());
}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {
if let Some(start) = self.timings.lock().unwrap().remove(p.name()) {
println!("[{}] completed in {:.1}ms", p.name(), start.elapsed().as_secs_f64() * 1000.0);
}
}
}
Built-in Hooks
pondrs provides two built-in hook implementations.
LoggingHook
Requires the std feature.
Logs pipeline and node lifecycle events using the log crate, with automatic timing:
use pondrs::hooks::LoggingHook;
App::new(catalog, params)
.with_hooks((LoggingHook::new(),))
.execute(pipeline)?;
Output (with env_logger at info level):
[INFO] [pipeline] processing - starting
[INFO] [node] clean - starting
[INFO] [node] clean - completed (12.3ms)
[INFO] [node] transform - starting
[INFO] [node] transform - completed (5.1ms)
[INFO] [pipeline] processing - completed (17.8ms)
At debug level, dataset load/save events are also logged:
[DEBUG] loading readings
[DEBUG] loaded readings (8.2ms)
[DEBUG] saving summary
[DEBUG] saved summary (0.1ms)
LoggingHook uses a TimingTracker internally to measure durations between before/after pairs.
VizHook
Requires the viz feature.
Posts live execution events to a running viz server via HTTP. This enables the interactive visualization to show real-time node status during app run:
use pondrs::viz::VizHook;
App::new(catalog, params)
.with_hooks((LoggingHook::new(), VizHook::new("http://localhost:8080".into())))
.execute(pipeline)?;
The typical workflow is:
- Start the viz server:
my_app viz --port 8080 - In another terminal, run the pipeline with
VizHookattached
VizHook is fire-and-forget — it silently ignores HTTP errors, so a missing viz server won’t crash your pipeline. It tracks:
- Node start/end/error events
- Dataset load/save durations
Each event is posted as a VizEvent to POST /api/status, which the viz server broadcasts to connected WebSocket clients.
Combining hooks
Hooks compose as tuples:
.with_hooks((
LoggingHook::new(),
VizHook::new("http://localhost:8080".into()),
my_custom_hook,
))
Each hook receives every event independently. Order in the tuple determines call order, but hooks should not depend on ordering.
Runners
Runners execute pipeline steps. pondrs provides two built-in runners and lets you select between them at runtime.
The Runner trait
pub trait Runner {
fn name(&self) -> &'static str;
fn run<E>(
&self,
pipe: &impl Steps<E>,
catalog: &impl Serialize,
params: &impl Serialize,
hooks: &impl Hooks,
) -> Result<(), E>
where
E: From<PondError> + Send + Sync + Display + Debug + 'static;
}
name()— identifies the runner for CLI selection (--runner sequential)run()— executes the pipeline, calling hooks at each lifecycle point
The Runners trait
Multiple runners compose as tuples, enabling runtime selection:
pub trait Runners {
fn first_name(&self) -> &'static str;
fn run_by_name<E>(&self, name: &str, ...) -> Option<Result<(), E>>;
fn for_each_name(&self, f: &mut dyn FnMut(&str));
}
The default runners depend on the feature set:
std—(SequentialRunner, ParallelRunner)— sequential is the defaultno_std—(SequentialRunner,)— only sequential is available
Selecting a runner
Via CLI:
$ my_app run # uses default (sequential)
$ my_app run --runner parallel # uses parallel runner
$ my_app run --runner sequential # explicit sequential
Via code:
App::new(catalog, params)
.with_runners((SequentialRunner, ParallelRunner))
.execute(pipeline)?;
Custom runners
You can implement Runner for your own types. Add them to the runners tuple:
App::new(catalog, params)
.with_runners((SequentialRunner, ParallelRunner, MyDistributedRunner))
.execute(pipeline)?;
$ my_app run --runner my_distributed
This chapter covers:
- Runner Trait — implementing a custom runner
- Sequential Runner — runs nodes in definition order
- Parallel Runner — runs independent nodes concurrently
Runner Trait
The Runner trait defines how a pipeline is executed. Implementing it lets you create custom execution strategies.
Definition
pub trait Runner {
fn name(&self) -> &'static str;
fn run<E>(
&self,
pipe: &impl Steps<E>,
catalog: &impl Serialize,
params: &impl Serialize,
hooks: &impl Hooks,
) -> Result<(), E>
where
E: From<PondError> + Send + Sync + Display + Debug + 'static;
}
Implementing a runner
A runner walks the pipeline steps and executes each node. The key types to work with are:
Steps<E>— iterate steps viafor_each_item()RunnableStep<E>— each step, which is either a leaf node or a pipeline containeris_leaf()—truefor nodes,falsefor pipelinescall(on_event)— execute a leaf node, passing a callback for dataset eventsfor_each_child_step()— iterate children of a pipeline container
Hooks— fire lifecycle events viafor_each_hook()
See the SequentialRunner and ParallelRunner source code for concrete implementation examples.
Dataset event dispatch
When calling item.call(on_event), the on_event callback receives (&DatasetRef, DatasetEvent) for each dataset load/save. The built-in runners use the dispatch_dataset_event helper to resolve dataset names from the catalog index and forward to hooks. A custom runner can do the same or handle events differently.
Catalog indexing
The catalog and params arguments to run() are provided so the runner can build a dataset name index. The built-in runners use index_catalog_with_params(catalog, params) (available in std) to create a HashMap<usize, String> mapping pointer IDs to field names. This is optional — a no_std runner can ignore these arguments.
Sequential Runner
The SequentialRunner executes pipeline steps one at a time in definition order.
Overview
#[derive(Default)]
pub struct SequentialRunner;
impl Runner for SequentialRunner {
fn name(&self) -> &'static str { "sequential" }
// ...
}
- Available in both
stdandno_stdenvironments - Executes nodes in the exact order they appear in the steps tuple
- Recursively enters
Pipelinecontainers, executing children in order - Default runner when no
--runnerflag is specified
Behavior
Given this pipeline:
(
Node { name: "a", .. },
Pipeline {
name: "inner",
steps: (
Node { name: "b", .. },
Node { name: "c", .. },
),
..
},
Node { name: "d", .. },
)
Execution order is: a → b → c → d.
Hook events fire in this order:
before_node_run("a")
after_node_run("a")
before_pipeline_run("inner")
before_node_run("b")
after_node_run("b")
before_node_run("c")
after_node_run("c")
after_pipeline_run("inner")
before_node_run("d")
after_node_run("d")
Error handling
If any node fails, execution stops immediately. No subsequent nodes are executed. The error propagates up through any enclosing Pipeline containers, firing on_pipeline_error at each level.
no_std differences
In no_std builds:
- Dataset names are not resolved (no catalog indexer), so
ds.namein hook callbacks is alwaysNone - Error messages in
on_node_errorare the fixed string"node error"instead of the full error message
When to use
Use the sequential runner when:
- Your nodes have strict ordering requirements
- You’re in a
no_stdenvironment - You want deterministic, predictable execution
- Debugging — sequential execution makes it easier to trace issues
Parallel Runner
The ParallelRunner executes independent pipeline nodes concurrently using scoped threads.
Requires the std feature.
Overview
#[derive(Default)]
pub struct ParallelRunner;
impl Runner for ParallelRunner {
fn name(&self) -> &'static str { "parallel" }
// ...
}
How it works
- Builds a dependency graph from the pipeline using
build_pipeline_graph() - Identifies source datasets — params and external inputs that are available immediately
- Schedules nodes as soon as all their input datasets have been produced
- Tracks produced datasets — when a node completes, its output datasets become available, potentially unblocking other nodes
- Uses
std::thread::scopefor safe scoped threads — no'staticbounds needed
Usage
$ my_app run --runner parallel
Or programmatically:
use pondrs::runners::ParallelRunner;
App::new(catalog, params)
.with_runners((SequentialRunner, ParallelRunner))
.execute(pipeline)?;
Dependency analysis
The parallel runner determines which nodes can run concurrently by analyzing dataset dependencies:
param
/ \
[a] [b] ← a and b can run in parallel (both read param)
\ /
[c] ← c waits for both a and b to complete
Only data dependencies matter — the tuple ordering in your pipeline function is irrelevant to the parallel runner.
Error handling
On the first node failure:
on_node_errorfires for the failed nodeon_pipeline_errorfires for all ancestor pipelines- No new nodes are scheduled
- Already-running nodes are allowed to complete (drain)
- The first error is returned
Pipeline hooks
Pipeline hooks behave differently than in the sequential runner:
before_pipeline_runfires when all of the pipeline’s declared inputs are availableafter_pipeline_runfires when all of the pipeline’s declared outputs have been produced
This means pipeline hooks may fire at different points in wall-clock time compared to sequential execution.
Thread safety requirements
Because nodes run on different threads:
- Hooks must be
Sync(useMutexor atomics for mutable state) - Datasets used concurrently must support concurrent access.
MemoryDatasetusesArc<Mutex<_>>and is safe.CellDatasetis not safe for parallel use.
When to use
Use the parallel runner when:
- Your pipeline has independent branches that can run concurrently
- Nodes involve I/O (file reads, network calls) where parallelism helps
- You’re in a
stdenvironment with thread support
Datasets
This chapter covers all built-in dataset types and how to implement your own.
The Dataset trait is introduced in A minimal pipeline — Datasets. Here we go deeper into each concrete implementation.
- Custom Datasets — implementing the
Datasettrait for your own types - Param — read-only parameter values
- Memory Dataset — thread-safe in-memory storage
- Cell Dataset — stack-friendly
no_stdstorage - Partitioned Dataset — directory of files loaded as a map
- Cache Dataset — caching wrapper for any dataset
- List of Datasets — quick reference for all built-in types
Custom Datasets
You can implement the Dataset trait for any type to integrate custom data sources into your pipeline.
The Dataset trait
pub trait Dataset: serde::Serialize {
type LoadItem;
type SaveItem;
type Error;
fn load(&self) -> Result<Self::LoadItem, Self::Error>;
fn save(&self, output: Self::SaveItem) -> Result<(), Self::Error>;
fn is_param(&self) -> bool { false }
#[cfg(feature = "std")]
fn html(&self) -> Option<String> { None }
}
Example: a plain text dataset
#[derive(Serialize, Deserialize, Clone)]
pub struct TextDataset {
path: String,
}
impl Dataset for TextDataset {
type LoadItem = String;
type SaveItem = String;
type Error = PondError;
fn load(&self) -> Result<String, PondError> {
Ok(std::fs::read_to_string(&self.path)?)
}
fn save(&self, text: String) -> Result<(), PondError> {
std::fs::write(&self.path, text)?;
Ok(())
}
}
Checklist
-
Derive
Serialize(required by the supertrait) — and usuallyDeserializetoo, so the catalog can be loaded from YAML. -
Name the type with a
Datasetsuffix — the catalog indexer uses serde struct names ending in"Dataset"to identify leaf datasets. Without this suffix, nested struct fields may not be discovered correctly. -
Choose your error type — use
PondErrorfor simplicity, or a custom error type if you want to preserve error detail (see Dataset Errors). -
Implement
html()(optional,stdonly) — return an HTML snippet for the viz dashboard. This is shown in the dataset detail panel.
The FileDataset trait
If your dataset is backed by a file, implement FileDataset to enable use with PartitionedDataset:
pub trait FileDataset: Dataset + Clone {
fn path(&self) -> &str;
fn set_path(&mut self, path: &str);
}
This lets PartitionedDataset clone your dataset template and point each partition at a different file.
Param
Param<T> is a read-only dataset wrapper for configuration values. It is the primary way to pass parameters into pipeline nodes.
Definition
#[derive(Debug, Serialize, Deserialize)]
pub struct Param<T: Clone>(pub T);
impl<T: Clone + Serialize> Dataset for Param<T> {
type LoadItem = T;
type SaveItem = ();
type Error = Infallible;
fn load(&self) -> Result<T, Infallible> { Ok(self.0.clone()) }
fn save(&self, _: ()) -> Result<(), Infallible> { unreachable!() }
fn is_param(&self) -> bool { true }
}
Key properties:
- Loading always succeeds —
Error = Infallible - Writing is forbidden —
save()is unreachable; the pipeline validator (check()) rejects any node that writes to aParam is_param()returnstrue— used by the validator and visualization to distinguish params from data
Usage
#[derive(Serialize, Deserialize)]
struct Params {
threshold: Param<f64>,
max_retries: Param<u32>,
}
Node {
name: "filter",
func: |value: f64, threshold: f64| {
(value >= threshold,)
},
input: (&cat.value, ¶ms.threshold),
output: (&cat.passed,),
}
YAML
threshold: 0.5
max_retries: 3
Param<T> deserializes directly from the YAML value — no wrapping object needed.
Visualization
In the viz dashboard, parameters appear as distinct node shapes, separate from datasets. They are also shown in the left panel’s “Parameters” section.
no_std
Param is available in no_std — it requires no feature flags and uses no allocation.
Memory Dataset
MemoryDataset<T> is a thread-safe in-memory dataset for intermediate pipeline values.
Requires the std feature.
Definition
#[derive(Debug, Serialize, Deserialize)]
pub struct MemoryDataset<T: Clone> {
#[serde(skip)]
value: Arc<Mutex<Option<T>>>,
}
- Starts empty (
None) - Loading before any save returns
PondError::DatasetNotLoaded - Thread-safe via
Arc<Mutex<_>>— works with bothSequentialRunnerandParallelRunner
Usage
#[derive(Serialize, Deserialize)]
struct Catalog {
intermediate: MemoryDataset<f64>,
}
intermediate: {}
MemoryDataset has no persistent configuration — it always starts empty. In YAML, use an empty mapping {}.
When to use
Use MemoryDataset for intermediate values that are computed by one node and consumed by another, without needing to persist to disk:
(
Node {
name: "compute",
func: |input: DataFrame| {
let mean = input.column("value").unwrap().mean().unwrap();
(mean,)
},
input: (&cat.raw_data,),
output: (&cat.mean_value,), // MemoryDataset<f64>
},
Node {
name: "use_result",
func: |mean: f64| (format!("Mean: {mean}"),),
input: (&cat.mean_value,),
output: (&cat.report,),
},
)
Parallel safety
MemoryDataset is safe for use with the ParallelRunner. The Mutex ensures that concurrent reads and writes are properly synchronized. However, the parallel runner’s dependency analysis ensures that a node won’t try to read a MemoryDataset until the node that writes to it has completed.
no_std alternative
In no_std environments, use CellDataset instead. It uses Cell instead of Arc<Mutex<_>>, but is limited to Copy types and single-threaded use.
Cell Dataset
CellDataset<T> is a stack-friendly dataset using Cell for no_std / single-threaded pipelines.
Definition
pub struct CellDataset<T: Copy> {
value: Cell<Option<T>>,
}
- Works only with
Copytypes (e.g.i32,f64,bool,u8) - Uses
Cellfor interior mutability — no heap allocation, no locking - Starts empty; loading before any save returns
PondError::DatasetNotLoaded const fn new()— can be used instaticorconstcontexts
Usage
let a = CellDataset::<i32>::new();
let b = CellDataset::<i32>::new();
let pipe = (
Node { name: "n1", func: |v| (v * 2,), input: (¶ms.x,), output: (&a,) },
Node { name: "n2", func: |v| (v + 1,), input: (&a,), output: (&b,) },
);
Thread safety
CellDataset implements Sync via an unsafe impl because the RunnableStep trait requires Send + Sync. This is safe only for single-threaded runners like SequentialRunner.
Do not use CellDataset with ParallelRunner. Use MemoryDataset instead for parallel pipelines.
no_std
CellDataset is the primary intermediate dataset for no_std environments. It requires no feature flags, no allocator, and no standard library. Combined with Param and the SequentialRunner, it forms the foundation of a no_std pipeline.
Limitations
- Only works with
Copytypes — cannot holdString,Vec,DataFrame, etc. - Not safe for concurrent access — single-threaded use only
- No serialization of stored values —
Serializeimpl serializes as unit()
Partitioned Dataset
PartitionedDataset and LazyPartitionedDataset represent a directory of files where each file is treated as a separate partition.
Requires the polars feature.
PartitionedDataset
Eagerly loads all files in a directory into a HashMap<String, D::LoadItem>:
pub struct PartitionedDataset<D: FileDataset> {
pub path: String,
pub ext: String,
pub dataset: D,
}
path— the directory to read from / write toext— file extension to filter by (e.g."csv","parquet")dataset— a template dataset that is cloned and pointed at each file
Loading
Returns HashMap<String, D::LoadItem> where keys are filename stems:
data/partitions/
january.csv
february.csv
march.csv
// loads as HashMap { "january" => DataFrame, "february" => DataFrame, "march" => DataFrame }
Saving
Accepts HashMap<String, D::SaveItem> and writes each entry as {name}.{ext}:
Node {
name: "split_by_month",
func: |df: DataFrame| -> (HashMap<String, DataFrame>,) {
// split DataFrame into partitions...
},
input: (&cat.all_data,),
output: (&cat.monthly,), // PartitionedDataset<PolarsCsvDataset>
}
LazyPartitionedDataset
Same as PartitionedDataset but returns HashMap<String, Lazy<D::LoadItem>> — each partition is loaded on demand:
Node {
name: "process",
func: |partitions: HashMap<String, Lazy<DataFrame>>| {
// only load the partitions you need
let jan = partitions["january"].load().unwrap();
// ...
},
input: (&cat.monthly,),
output: (&cat.result,),
}
Lazy<T> wraps a closure that calls dataset.load() when .load() is called on it.
YAML configuration
monthly:
path: data/partitions
ext: csv
dataset:
separator: ","
has_header: true
The dataset field configures the template dataset that is cloned for each partition file.
FileDataset requirement
The inner dataset type must implement FileDataset:
pub trait FileDataset: Dataset + Clone {
fn path(&self) -> &str;
fn set_path(&mut self, path: &str);
}
Built-in types that implement FileDataset: PolarsCsvDataset, PolarsParquetDataset, TextDataset, JsonDataset.
Cache Dataset
CacheDataset<D> wraps any dataset and caches the loaded/saved value in memory. Subsequent loads return the cached value without hitting the underlying dataset.
Requires the std feature.
Definition
pub struct CacheDataset<D: Dataset> {
pub dataset: D,
cache: Arc<Mutex<Option<D::LoadItem>>>,
}
Usage
Wrap any dataset to add caching:
#[derive(Serialize, Deserialize)]
struct Catalog {
readings: CacheDataset<PolarsCsvDataset>,
}
readings:
dataset:
path: data/readings.csv
separator: ","
Behavior
- First
load()— delegates to the inner dataset, caches the result, returns it - Subsequent
load()calls — returns the cached value without re-reading the file save()— writes to the inner dataset and updates the cachehtml()— delegates to the inner dataset
When to use
Use CacheDataset when a dataset is read by multiple nodes and the underlying I/O is expensive:
(
Node { name: "analyze", input: (&cat.readings,), .. },
Node { name: "validate", input: (&cat.readings,), .. },
Node { name: "summarize", input: (&cat.readings,), .. },
)
Without caching, readings would be loaded from disk three times. With CacheDataset<PolarsCsvDataset>, it’s loaded once and served from memory for the remaining reads.
Constraints
The inner dataset must satisfy:
D::LoadItem: Clone— so the cached value can be cloned on each loadD::SaveItem: Clone + Into<D::LoadItem>— so saves can update the cachePondError: From<D::Error>— for error conversion
List of Datasets
All built-in dataset types, their feature flags, and typical use cases.
Core datasets (no feature flags)
| Type | Load/Save types | Description |
|---|---|---|
Param<T> | T / () | Read-only parameter. Error: Infallible. |
CellDataset<T> | T / T | no_std intermediate storage. T: Copy only. |
std datasets
| Type | Feature | Load/Save types | Description |
|---|---|---|---|
MemoryDataset<T> | std | T / T | Thread-safe in-memory storage via Arc<Mutex<_>>. |
TextDataset | std | String / String | Reads/writes plain text files. |
CacheDataset<D> | std | D::LoadItem / D::SaveItem | Caching wrapper for any dataset. |
File format datasets
| Type | Feature | Load/Save types | Description |
|---|---|---|---|
PolarsCsvDataset | polars | DataFrame / DataFrame | CSV files via Polars. Configurable separator, header, etc. |
PolarsParquetDataset | polars | DataFrame / DataFrame | Parquet files via Polars. |
PolarsExcelDataset | polars | DataFrame / — | Excel files via fastexcel. Read-only. |
JsonDataset | json | serde_json::Value / serde_json::Value | JSON files. |
YamlDataset | yaml | Vec<Yaml> / Vec<Yaml> | YAML files using yaml_rust2. |
PlotlyDataset | plotly | serde_json::Value / serde_json::Value | Plotly charts. Saves .json + .html. Custom html() for viz. |
ImageDataset | image | DynamicImage / DynamicImage | Image files via the image crate. |
Partitioned datasets
| Type | Feature | Load/Save types | Description |
|---|---|---|---|
PartitionedDataset<D> | polars | HashMap<String, D::LoadItem> / HashMap<String, D::SaveItem> | Directory of files, eagerly loaded. |
LazyPartitionedDataset<D> | polars | HashMap<String, Lazy<D::LoadItem>> / HashMap<String, D::SaveItem> | Directory of files, lazily loaded on demand. |
Hardware datasets (no_std)
| Type | Feature | Load/Save types | Description |
|---|---|---|---|
RegisterDataset<T> | — | T / T | Volatile memory-mapped register. T: Copy. |
GpioDataset | — | bool / bool | Single GPIO pin within a memory-mapped register. |
Common traits
All file-backed datasets that support PartitionedDataset implement FileDataset:
pub trait FileDataset: Dataset + Clone {
fn path(&self) -> &str;
fn set_path(&mut self, path: &str);
}
Implementors: PolarsCsvDataset, PolarsParquetDataset, PolarsExcelDataset, TextDataset, JsonDataset.
App
The App struct bundles catalog, params, hooks, and runners together and provides methods for pipeline execution and CLI dispatch.
pub struct App<C, P, H = (), R = DefaultRunners> {
catalog: C,
params: P,
hooks: H,
runners: R,
command: Command,
// ...
}
App is generic over catalog, params, hooks, and runners. Builder methods return a new App with different type parameters, so the type system tracks what has been configured.
Subcommands
App::dispatch() selects behavior based on the stored Command:
| Command | CLI | Description |
|---|---|---|
Command::Run | my_app run | Execute the pipeline |
Command::Check | my_app check | Validate pipeline structure |
Command::Viz | my_app viz | Build graph and serve visualization |
Chapter overview
- Initialization — constructing an App (
from_yaml,from_args,new) - YAML Configuration — catalog and params files, dot-notation overrides
- Run — executing the pipeline
- Check — validating pipeline structure via CLI
- Viz — interactive pipeline visualization
Initialization
There are several ways to construct an App, depending on whether your catalog comes from YAML files or is built programmatically.
App::from_yaml + with_args
The most common pattern for file-based pipelines. Loads catalog and params from YAML, then applies CLI arguments:
App::from_yaml("conf/catalog.yml", "conf/params.yml")?
.with_args(std::env::args_os())?
.dispatch(pipeline)
from_yaml loads and deserializes both files. with_args parses CLI arguments for subcommand selection and parameter overrides.
Requires std. Catalog and params types must implement DeserializeOwned.
App::from_args
Parses CLI arguments including --catalog-path and --params-path flags:
App::from_args(std::env::args_os())?
.dispatch(pipeline)
Default paths if not specified: conf/base/catalog.yml and conf/base/parameters.yml.
Requires std.
App::from_cli
Same as from_args but takes a pre-parsed CliArgs struct:
use pondrs::app::cli::CliArgs;
use clap::Parser;
let cli = CliArgs::parse();
App::from_cli(cli)?
.dispatch(pipeline)
App::new
Construct with catalog and params directly — no YAML, no CLI:
let catalog = Catalog { /* ... */ };
let params = Params { /* ... */ };
App::new(catalog, params)
.execute(pipeline)?;
This is the only constructor available in no_std. It defaults to Command::Run with no hooks and the default runners.
Use App::new when:
- The catalog is built programmatically (e.g.
RegisterDataset,GpioDataset) - You’re writing tests and want to inspect datasets after execution
- You’re in a
no_stdenvironment
You can still add CLI support to a programmatic catalog via with_args:
App::new(catalog, params)
.with_args(std::env::args_os())? // adds subcommand + param overrides
.dispatch(pipeline)?;
Builder methods
After construction, configure the app with builder methods:
App::from_yaml("catalog.yml", "params.yml")?
.with_args(std::env::args_os())?
.with_hooks((LoggingHook::new(),))
.with_runners((SequentialRunner, ParallelRunner))
.dispatch(pipeline)?;
| Method | Description |
|---|---|
with_hooks(h) | Set the hooks tuple |
with_runners(r) | Set the runners tuple |
with_command(cmd) | Set the command directly |
with_args(iter) | Parse CLI args, apply command + param overrides |
with_cli(cli) | Apply pre-parsed CliArgs |
Execution methods
| Method | Description |
|---|---|
execute(f) | Run the pipeline directly (always Command::Run behavior) |
dispatch(f) | Choose behavior based on stored Command (run/check/viz) |
catalog() | Borrow the catalog (useful in tests) |
params() | Borrow the params |
YAML Configuration
pondrs uses YAML files for catalog and parameter configuration. The App framework loads, patches, and deserializes these files automatically.
Catalog YAML
The catalog file maps dataset field names to their configuration:
# catalog.yml
readings:
path: data/readings.csv
separator: ","
summary: {}
report:
path: data/report.json
Each top-level key corresponds to a field in your catalog struct. The configuration under each key is deserialized into the dataset type for that field.
- File-backed datasets need at least a
path - In-memory datasets use
{} - Nested catalog structs create nested YAML sections
Nested catalogs
#[derive(Serialize, Deserialize)]
struct InputData {
raw: PolarsCsvDataset,
reference: YamlDataset,
}
#[derive(Serialize, Deserialize)]
struct Catalog {
input: InputData,
output: JsonDataset,
}
input:
raw:
path: data/raw.csv
separator: ","
reference:
path: data/ref.yml
output:
path: data/output.json
Parameters YAML
# params.yml
threshold: 0.5
model:
learning_rate: 0.01
epochs: 100
Each key maps to a Param<T> field. Nested structs create nested YAML sections.
CLI overrides
Both catalog and parameter values can be overridden from the command line using dot notation:
# Override parameters
$ my_app run --params threshold=0.8
$ my_app run --params model.learning_rate=0.001
# Override catalog configuration
$ my_app run --catalog output.path=/tmp/result.json
$ my_app run --catalog readings.separator=;
# Multiple overrides
$ my_app run --params threshold=0.8 --params model.epochs=200
How overrides work
- The YAML file is loaded into a
serde_yaml::Valuetree - Each
KEY=VALUEoverride is parsed and applied to the tree using dot notation - Values are parsed as YAML scalars (auto-detecting numbers, bools, strings, null)
- The patched tree is deserialized into the target struct
Overrides create missing intermediate keys if needed — you can override deeply nested values even if the parent keys don’t exist in the file.
File paths
Default paths (when using App::from_args or App::from_cli):
- Catalog:
conf/base/catalog.yml - Params:
conf/base/parameters.yml
Override with CLI flags:
$ my_app --catalog-path my/catalog.yml --params-path my/params.yml run
Or specify paths directly with App::from_yaml:
App::from_yaml("conf/catalog.yml", "conf/params.yml")?
Run
The run subcommand executes the pipeline.
CLI usage
$ my_app run # default runner (sequential)
$ my_app run --runner parallel # use parallel runner
$ my_app run --params threshold=0.8 # override parameters
$ my_app run --catalog output.path=/tmp/out.json # override catalog
How it works
- The pipeline function is called with
(&catalog, ¶ms)to construct the steps - The selected runner (by
--runnerflag or default) is looked up by name - The runner executes the steps, firing hooks at each lifecycle point
- If any node fails, execution stops and the error is returned
execute vs dispatch
app.execute(pipeline_fn)— always runs the pipeline, regardless of the stored commandapp.dispatch(pipeline_fn)— checks the stored command and dispatches to run, check, or viz
Use execute in tests or when you want to run the pipeline directly. Use dispatch in CLI apps to support all subcommands.
Runner selection
The --runner flag selects a runner by name. Available runners depend on what’s configured:
// Default: sequential + parallel (std), sequential only (no_std)
App::from_yaml(..)?
.dispatch(pipeline)?;
// Custom runners
App::from_yaml(..)?
.with_runners((SequentialRunner, ParallelRunner, MyRunner))
.dispatch(pipeline)?;
If the named runner isn’t found, PondError::RunnerNotFound is returned.
Post-run inspection
When using App::new + execute, you can inspect datasets after the pipeline runs:
let app = App::new(catalog, params);
app.execute(pipeline)?;
// Access results
let summary: f64 = app.catalog().summary.load()?;
assert!(summary > 0.0);
This pattern is useful for integration tests.
Check
The check subcommand validates the pipeline structure without executing it.
CLI usage
$ my_app check
Pipeline is valid.
If validation fails:
$ my_app check
Error: Pipeline validation failed:
- Node 'process' requires dataset 0x7f8a..., which is produced by a later node
What it validates
check calls StepInfo::check() on the pipeline, which verifies:
- Sequential ordering — no node reads a dataset produced by a later node
- No duplicate outputs — each dataset is produced by at most one node
- Params are read-only — no node writes to a
Param - Pipeline contracts —
Pipelinedeclared inputs/outputs match children
See Check for the full list of CheckError variants.
Programmatic use
You can call check() directly on any StepInfo:
let steps = pipeline(&catalog, ¶ms);
match steps.check() {
Ok(()) => println!("Valid!"),
Err(e) => eprintln!("Error: {e}"),
}
Or via App::dispatch, which calls check() when the command is Command::Check:
App::new(catalog, params)
.with_command(Command::Check)
.dispatch(pipeline)?;
Viz
The viz subcommand builds the pipeline graph and serves an interactive visualization.
Requires the viz feature.
CLI usage
# Start the interactive web server (default port 8080)
$ my_app viz
# Custom port
$ my_app viz --port 3000
# Export graph as JSON
$ my_app viz --output pipeline.json
# Export self-contained HTML file
$ my_app viz --export pipeline.html
Interactive server
my_app viz starts an axum web server with:
| Endpoint | Description |
|---|---|
GET /api/graph | Full pipeline graph as JSON |
GET /api/dataset/{id}/html | HTML snapshot for a dataset |
GET /api/status | Current node/dataset execution status |
POST /api/status | Receives live events from VizHook |
GET /ws | WebSocket broadcast of live events |
GET / | Embedded React frontend |
The frontend uses React + ReactFlow + dagre for a left-to-right DAG layout with:
- Node, dataset, and pipeline visual types
- Click-to-inspect dataset details (HTML preview, YAML config)
- Left sidebar listing all nodes, datasets, and parameters
- Dark/light theme support
Live execution status
To see real-time node status during pipeline execution:
- Start the viz server:
my_app viz --port 8080 - In another terminal, run the pipeline with
VizHook:
use pondrs::viz::VizHook;
App::from_yaml(..)?
.with_args(std::env::args_os())?
.with_hooks((LoggingHook::new(), VizHook::new("http://localhost:8080".into())))
.dispatch(pipeline)?;
The frontend shows live node status (idle/running/completed/error) and updates automatically via WebSocket.
Static HTML export
--export produces a self-contained HTML file with the full graph and dataset snapshots embedded. No server needed — just open the file in a browser:
$ my_app viz --export pipeline.html
$ open pipeline.html
The export uses vite-plugin-singlefile to inline all JS/CSS into a single HTML file, and injects graph data via window.__STATIC_DATA__.
JSON export
--output writes the VizGraph as JSON, useful for custom tooling:
$ my_app viz --output graph.json
Graph name
The graph name shown in the frontend header is derived from the program name (first CLI argument, file stem only). For example, running cargo run --example weather_app sets the name to weather_app.
no_std Pipelines
pondrs is a #![no_std] crate at its core. The std feature adds filesystem I/O, threading, logging, CLI parsing, and additional dataset types — but the fundamental pipeline model works without any of it.
What’s available without std
| Component | no_std | std |
|---|---|---|
Node, Pipeline, Steps, StepInfo | yes | yes |
PipelineInfo, RunnableStep | yes | yes |
check() validation | yes | yes |
Param<T> | yes | yes |
CellDataset<T> | yes | yes |
RegisterDataset<T> | yes | yes |
GpioDataset | yes | yes |
SequentialRunner | yes | yes |
Hook / Hooks traits | yes | yes |
App::new() | yes | yes |
PondError (limited variants) | yes | yes |
MemoryDataset<T> | — | yes |
ParallelRunner | — | yes |
LoggingHook | — | yes |
| CLI parsing / YAML loading | — | yes |
| Catalog indexer (dataset names) | — | yes |
| File-backed datasets | — | yes |
Building for no_std
cargo build --no-default-features --lib
No allocator is required. All dataset tracking in check() uses fixed-size stack arrays.
Typical embedded pattern
use pondrs::datasets::{CellDataset, Param, RegisterDataset, GpioDataset};
use pondrs::error::PondError;
use pondrs::{App, Node, Steps};
static SENSOR: RegisterDataset<u16> = unsafe { RegisterDataset::new(0x4000_0000) };
static LED: GpioDataset = unsafe { GpioDataset::new(0x4002_0000, 5, "LED") };
fn pipeline<'a>(
cat: &'a Catalog,
params: &'a Params,
) -> impl Steps<PondError> + 'a {
(
Node {
name: "read_sensor",
func: |raw: u16| (raw,),
input: (&cat.sensor,),
output: (&cat.reading,),
},
Node {
name: "check_threshold",
func: |value: u16, threshold: u16| (value > threshold,),
input: (&cat.reading, ¶ms.threshold),
output: (&cat.led,),
},
)
}
See Datasets for details on the no_std dataset types and App & Debugging for the no_std App pattern.
no_std Datasets
Four dataset types are available without the standard library.
Param<T>
Read-only parameter values. Works identically in no_std and std. Requires T: Clone + Serialize.
let threshold = Param(500u16);
See Param.
CellDataset<T>
Stack-friendly intermediate storage using Cell. Requires T: Copy.
let reading = CellDataset::<u16>::new();
let result = CellDataset::<bool>::new();
const fn new()— usable instaticcontexts- No heap allocation, no locking
- Single-threaded only (
SequentialRunner)
See Cell Dataset.
RegisterDataset<T>
Volatile memory-mapped register access. Reads and writes at a raw memory address using read_volatile / write_volatile.
// SAFETY: address must point to a valid, aligned register
static SENSOR: RegisterDataset<u16> = unsafe { RegisterDataset::new(0x4000_0000) };
Tis typicallyu8,u16, oru32const unsafe fn new(address: usize)— suitable forstaticdeclarationsload()reads the register,save()writes it
GpioDataset
A single GPIO pin within a memory-mapped register. Reads and writes a single bit at a given position.
// SAFETY: address must point to a valid GPIO port register
static LED: GpioDataset = unsafe { GpioDataset::new(0x4002_0000, 5, "LED1") };
load()returnsbool— whether the bit is setsave(true)sets the bit,save(false)clears it- Preserves other bits in the register (read-modify-write)
- The
labelfield is used in visualization
Hardware dataset safety
Both RegisterDataset and GpioDataset use unsafe constructors because they access raw memory addresses. They implement Send + Sync via unsafe impls — this is safe in single-threaded embedded contexts but the caller is responsible for preventing data races in multi-threaded environments.
Example: embedded pipeline
struct Catalog {
sensor: RegisterDataset<u16>,
reading: CellDataset<u16>,
alert: GpioDataset,
}
struct Params {
threshold: Param<u16>,
}
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "read",
func: |raw: u16| (raw,),
input: (&cat.sensor,),
output: (&cat.reading,),
},
Node {
name: "check",
func: |value: u16, thresh: u16| (value > thresh,),
input: (&cat.reading, ¶ms.threshold),
output: (&cat.alert,),
},
)
}
App & Debugging in no_std
Compiling a no_std pipeline with std
One of the strengths of pondrs is that a pipeline written for no_std can also be compiled with std enabled. This lets you use the full App interface — including YAML configuration, CLI argument parsing, parameter overrides, check, and viz — for development and debugging, while deploying the same pipeline code to a no_std target.
Simply add pondrs as a dev-dependency with the all feature, and use App::from_args or App::from_yaml in a host-side binary or test:
// tests/run_on_host.rs — compiles with std
fn main() -> Result<(), PondError> {
App::from_args(std::env::args_os())?
.dispatch(my_no_std_pipeline)
}
This gives you visualization, validation, logging hooks, and parallel execution for free during development, without changing the pipeline itself.
App::new
In no_std environments, App::new is the only available constructor:
let app = App::new(catalog, params);
app.execute(pipeline)?;
There is no YAML loading, no CLI parsing, and no dispatch — you construct the catalog and params directly and call execute.
Default runner
The default runner tuple in no_std is (SequentialRunner,). The ParallelRunner requires std (threads).
Hooks in no_std
The Hook and Hooks traits work in no_std, but there are no built-in hook implementations (LoggingHook requires std). You can implement your own:
struct UartLogger;
impl Hook for UartLogger {
fn before_node_run(&self, n: &dyn PipelineInfo) {
// write to UART, toggle debug pin, etc.
uart_print(n.name());
}
fn on_node_error(&self, n: &dyn PipelineInfo, _error: &str) {
uart_print("ERROR: ");
uart_print(n.name());
}
}
App::new(catalog, params)
.with_hooks((UartLogger,))
.execute(pipeline)?;
Note that in no_std, the error string in on_node_error is always "node error" (not the full error message), since Display formatting requires allocation.
Dataset names
In no_std, the catalog indexer is not available, so DatasetRef::name in hook callbacks is always None. Hooks can still use ds.id (pointer-based) or ds.meta.type_string() for identification.
PondError in no_std
Only three PondError variants are available without std:
DatasetNotLoaded— a dataset was read before being writtenRunnerNotFound— the specified runner name doesn’t match any runnerCheckFailed— pipeline validation failed (used bydispatchwithCommand::Check)
All other variants (Io, Polars, SerdeYaml, etc.) are feature-gated.
Validation
check() works fully in no_std. It uses fixed-size stack arrays instead of HashMap:
let steps = pipeline(&catalog, ¶ms);
steps.check()?;
// For pipelines with more than 20 datasets:
steps.check_with_capacity::<64>()?;
Examples
Each example includes the full source code and an interactive pipeline visualization.
Weather Pipeline
Demonstrates subpipelines, struct params, nested catalog/params, PartitionedDataset, MemoryDataset, YamlDataset, PlotlyDataset, parallel execution, and an intentional error node.
Usage
cargo run --example weather_app -- run --runner parallel
cargo run --example weather_app -- check
cargo run --example weather_app -- viz
Types
// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------
#[derive(Debug)]
pub enum WeatherError {
Pond(PondError),
Polars(PolarsError),
Validation(String),
}
impl From<PondError> for WeatherError {
fn from(e: PondError) -> Self {
WeatherError::Pond(e)
}
}
impl From<PolarsError> for WeatherError {
fn from(e: PolarsError) -> Self {
WeatherError::Polars(e)
}
}
impl std::fmt::Display for WeatherError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WeatherError::Pond(e) => write!(f, "{e}"),
WeatherError::Polars(e) => write!(f, "{e}"),
WeatherError::Validation(msg) => write!(f, "Validation failed: {msg}"),
}
}
}
impl std::error::Error for WeatherError {}
// ---------------------------------------------------------------------------
// Domain types
// ---------------------------------------------------------------------------
/// Baseline period for anomaly detection (struct param).
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BaselinePeriod {
pub start_month: u32,
pub end_month: u32,
}
/// Aggregated weather statistics (must be Copy for MemoryDataset).
#[derive(Clone, Copy, Debug, Default)]
pub struct WeatherSummary {
pub avg_temp: f64,
pub max_temp: f64,
pub min_temp: f64,
pub total_rainfall: f64,
pub station_count: u32,
pub reading_count: u32,
}
// ---------------------------------------------------------------------------
// Params (nested containers with struct param)
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct WeatherParams {
pub analysis: AnalysisConfig,
pub display: DisplayConfig,
}
#[derive(Serialize, Deserialize)]
pub struct AnalysisConfig {
pub anomaly_threshold: Param<f64>,
pub baseline: Param<BaselinePeriod>,
}
#[derive(Serialize, Deserialize)]
pub struct DisplayConfig {
pub chart_title: Param<String>,
pub top_n: Param<usize>,
}
// ---------------------------------------------------------------------------
// Catalog (nested containers)
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct WeatherCatalog {
pub sources: SourcesCatalog,
pub analysis: AnalysisCatalog,
pub reports: ReportsCatalog,
}
#[derive(Serialize, Deserialize)]
pub struct SourcesCatalog {
pub station_readings: PartitionedDataset<PolarsCsvDataset>,
pub station_metadata: YamlDataset,
}
#[derive(Serialize, Deserialize)]
pub struct AnalysisCatalog {
pub combined_readings: PolarsCsvDataset,
pub weather_summary: MemoryDataset<WeatherSummary>,
pub anomalies: PolarsCsvDataset,
}
#[derive(Serialize, Deserialize)]
pub struct ReportsCatalog {
pub temperature_chart: PlotlyDataset,
pub rainfall_chart: PlotlyDataset,
pub validation_passed: MemoryDataset<bool>,
}
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
fn merge_stations(
partitions: HashMap<String, DataFrame>,
) -> Result<(DataFrame,), PolarsError> {
let mut combined: Option<DataFrame> = None;
for (station_name, mut df) in partitions {
let n = df.height();
let station_col = Column::new("station".into(), vec![station_name.as_str(); n]);
df.with_column(station_col)?;
match &mut combined {
None => combined = Some(df),
Some(c) => { c.vstack_mut(&df)?; }
}
}
Ok((combined.unwrap_or_default(),))
}
fn load_metadata(meta: Yaml) {
if let Yaml::Hash(ref map) = meta {
for (key, _) in map {
if let Yaml::String(name) = key {
log::info!("Loaded metadata for station: {name}");
}
}
}
}
fn compute_summary(
df: DataFrame,
baseline: BaselinePeriod,
) -> (WeatherSummary,) {
let _ = baseline;
let temp = df.column("temperature").unwrap().f64().unwrap();
let rain = df.column("rainfall").unwrap().f64().unwrap();
let stations: Vec<String> = df
.column("station")
.unwrap()
.str()
.unwrap()
.into_no_null_iter()
.map(|s| s.to_string())
.collect();
let unique_stations: std::collections::HashSet<&str> =
stations.iter().map(|s| s.as_str()).collect();
let summary = WeatherSummary {
avg_temp: temp.mean().unwrap_or(0.0),
max_temp: temp.max().unwrap_or(0.0),
min_temp: temp.min().unwrap_or(0.0),
total_rainfall: rain.sum().unwrap_or(0.0),
station_count: unique_stations.len() as u32,
reading_count: df.height() as u32,
};
(summary,)
}
fn detect_anomalies(
df: DataFrame,
threshold: f64,
) -> Result<(DataFrame,), PolarsError> {
let temp = df.column("temperature")?.f64()?;
let mean = temp.mean().unwrap_or(0.0);
let std_dev = temp.std(1).unwrap_or(1.0);
let lower = mean - threshold * std_dev;
let upper = mean + threshold * std_dev;
let mask = temp.lt(lower) | temp.gt(upper);
Ok((df.filter(&mask)?,))
}
fn plot_temperatures(summary: WeatherSummary, title: String) -> (Plot,) {
let labels = vec!["Min", "Avg", "Max"];
let values = vec![summary.min_temp, summary.avg_temp, summary.max_temp];
let mut plot = Plot::new();
plot.add_trace(
Bar::new(labels, values).name("Temperature"),
);
plot.set_layout(
Layout::new()
.title(format!("{title} - Temperature Summary"))
.y_axis(plotly::layout::Axis::new().title("Temperature (C)")),
);
(plot,)
}
fn plot_rainfall(summary: WeatherSummary, title: String) -> (Plot,) {
let labels = vec![
format!("{} stations", summary.station_count),
format!("{} readings", summary.reading_count),
];
let values = vec![
summary.total_rainfall / summary.station_count as f64,
summary.total_rainfall,
];
let mut plot = Plot::new();
plot.add_trace(
Bar::new(labels, values).name("Rainfall"),
);
plot.set_layout(
Layout::new()
.title(format!("{title} - Rainfall Summary"))
.y_axis(plotly::layout::Axis::new().title("Rainfall (mm)")),
);
(plot,)
}
fn validate_reports(
_temp_chart: serde_json::Value,
_rain_chart: serde_json::Value,
) -> Result<(bool,), WeatherError> {
Err(WeatherError::Validation(
"Data quality check failed: east station has a suspicious 50C reading in July".into(),
))
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn weather_pipeline<'a>(
cat: &'a WeatherCatalog,
params: &'a WeatherParams,
) -> impl Steps<WeatherError> + 'a {
(
// Subpipeline: data preparation
Pipeline {
name: "data_prep",
steps: (
Node {
name: "merge_stations",
func: merge_stations,
input: (&cat.sources.station_readings,),
output: (&cat.analysis.combined_readings,),
},
Node {
name: "load_metadata",
func: load_metadata,
input: (&cat.sources.station_metadata,),
output: (),
},
),
input: (
&cat.sources.station_readings,
&cat.sources.station_metadata,
),
output: (&cat.analysis.combined_readings,),
},
// These two nodes can run in parallel (both read combined_readings)
Node {
name: "compute_summary",
func: compute_summary,
input: (
&cat.analysis.combined_readings,
¶ms.analysis.baseline,
),
output: (&cat.analysis.weather_summary,),
},
Node {
name: "detect_anomalies",
func: detect_anomalies,
input: (
&cat.analysis.combined_readings,
¶ms.analysis.anomaly_threshold,
),
output: (&cat.analysis.anomalies,),
},
// Subpipeline: reporting (inner nodes can run in parallel)
Pipeline {
name: "reporting",
steps: (
Node {
name: "plot_temperatures",
func: plot_temperatures,
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (&cat.reports.temperature_chart,),
},
Node {
name: "plot_rainfall",
func: plot_rainfall,
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (&cat.reports.rainfall_chart,),
},
),
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (
&cat.reports.temperature_chart,
&cat.reports.rainfall_chart,
),
},
// Validation node: intentionally fails
Node {
name: "validate_reports",
func: validate_reports,
input: (
&cat.reports.temperature_chart,
&cat.reports.rainfall_chart,
),
output: (&cat.reports.validation_passed,),
},
)
}
App entry point
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((
LoggingHook::new(),
VizHook::new("http://localhost:8080".to_string()),
))
.with_args(std::env::args_os())?
.dispatch(weather_pipeline)
Pipeline visualization
Sales Pipeline
Monthly sales CSV processing: filter by minimum sales, compute totals, and produce a Plotly bar chart.
Usage
cargo run --example sales_app -- run
cargo run --example sales_app -- check
cargo run --example sales_app -- viz
Types
// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct SalesCatalog {
pub raw_sales: PolarsCsvDataset,
pub filtered_sales: PolarsCsvDataset,
pub total_sales: MemoryDataset<i64>,
pub chart: PlotlyDataset,
}
#[derive(Serialize, Deserialize)]
pub struct SalesParams {
/// Minimum sales to include a month in the chart.
pub min_sales: Param<i64>,
}
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
fn build_chart(df: DataFrame, total: i64) -> (Plot,) {
let months: Vec<String> = df
.column("month").unwrap()
.str().unwrap()
.into_no_null_iter()
.map(|s| s.to_string())
.collect();
let sales: Vec<i64> = df
.column("sales").unwrap()
.i64().unwrap()
.into_no_null_iter()
.collect();
let mut plot = Plot::new();
plot.add_trace(Bar::new(months, sales).name("Monthly Sales"));
plot.set_layout(
Layout::new()
.title(format!("Months with sales ≥ 1000 (total: {total})"))
.y_axis(plotly::layout::Axis::new().title("Sales")),
);
(plot,)
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn sales_pipeline<'a>(
cat: &'a SalesCatalog,
params: &'a SalesParams,
) -> impl Steps<PondError> + 'a {
(
// Node 1: filter out months below the threshold
Node {
name: "filter_months",
func: |df: DataFrame, min_sales: i64| -> Result<(DataFrame,), PolarsError> {
let mask = df.column("sales")?.i64()?.gt_eq(min_sales);
Ok((df.filter(&mask)?,))
},
input: (&cat.raw_sales, ¶ms.min_sales),
output: (&cat.filtered_sales,),
},
// Node 2: sum the filtered sales
Node {
name: "compute_total",
func: |df: DataFrame| {
let total =
df.column("sales").unwrap().i64().unwrap().sum().unwrap_or(0);
(total,)
},
input: (&cat.filtered_sales,),
output: (&cat.total_sales,),
},
// Node 3: build a bar chart of the filtered monthly sales
Node {
name: "build_chart",
func: build_chart,
input: (&cat.filtered_sales, &cat.total_sales),
output: (&cat.chart,),
},
)
}
App entry point
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((
LoggingHook::new(),
VizHook::new("http://localhost:8080".to_string()),
))
.with_args(std::env::args_os())?
.dispatch(sales_pipeline)
Pipeline visualization
Identity Pipeline
Demonstrates the Ident node: write CSV as plain text, read it back as a Polars DataFrame via Ident, and produce a Plotly bar chart.
Usage
cargo run --example ident_app -- \
--catalog-path examples/ident_data/catalog.yml \
--params-path examples/ident_data/params.yml run
Types
// ---------------------------------------------------------------------------
// Catalog
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
struct IdentCatalog {
csv_text: TextDataset,
csv_data: PolarsCsvDataset,
chart: PlotlyDataset,
}
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
fn generate_csv() -> (String,) {
let csv = "\
fruit,count
Apples,35
Bananas,22
Cherries,48
Dates,15
Elderberries,31
Figs,9
Grapes,42";
(csv.to_string(),)
}
fn build_chart(df: DataFrame) -> (Plot,) {
let fruits: Vec<String> = df
.column("fruit")
.unwrap()
.str()
.unwrap()
.into_no_null_iter()
.map(|s| s.to_string())
.collect();
let counts: Vec<i64> = df
.column("count")
.unwrap()
.i64()
.unwrap()
.into_no_null_iter()
.collect();
let mut plot = Plot::new();
plot.add_trace(Bar::new(fruits, counts).name("Fruit Count"));
plot.set_layout(
Layout::new()
.title("Fruit Inventory")
.y_axis(plotly::layout::Axis::new().title("Count")),
);
(plot,)
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
fn ident_pipeline<'a>(
cat: &'a IdentCatalog,
_params: &'a (),
) -> impl Steps<PondError> + 'a {
(
Node {
name: "generate_csv",
func: generate_csv,
input: (),
output: (&cat.csv_text,),
},
Ident {
name: "text_to_csv",
input: &cat.csv_text,
output: &cat.csv_data,
},
Node {
name: "build_chart",
func: build_chart,
input: (&cat.csv_data,),
output: (&cat.chart,),
},
)
}
Pipeline visualization
Register Pipeline
Simulates hardware register access: reads a sensor register, processes the value against thresholds, and sets GPIO pins accordingly. Demonstrates RegisterDataset, GpioDataset, and Param with a programmatically constructed catalog.
Usage
cargo run --example register_example -- run
cargo run --example register_example -- viz
Types
// ---------------------------------------------------------------------------
// Simulated hardware: heap-allocated "registers"
// ---------------------------------------------------------------------------
struct SimulatedHardware {
sensor_reg: Box<u16>,
status_reg: Box<u32>,
}
impl SimulatedHardware {
fn new() -> Self {
Self {
sensor_reg: Box::new(0),
status_reg: Box::new(0),
}
}
fn sensor_address(&self) -> usize {
&*self.sensor_reg as *const u16 as usize
}
fn status_address(&self) -> usize {
&*self.status_reg as *const u32 as usize
}
}
// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------
#[derive(Serialize)]
struct Catalog {
sensor: RegisterDataset<u16>,
status: RegisterDataset<u32>,
led_ok: GpioDataset,
led_warn: GpioDataset,
led_crit: GpioDataset,
}
#[derive(Serialize, Deserialize)]
struct Params {
warn_threshold: Param<u16>,
crit_threshold: Param<u16>,
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline
// ---------------------------------------------------------------------------
fn register_pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
(
Node {
name: "read_sensor",
func: |raw: u16| -> (u32,) {
println!(" Sensor reading: 0x{raw:04x} ({raw})");
(raw as u32,)
},
input: (&cat.sensor,),
output: (&cat.status,),
},
Node {
name: "set_ok_led",
func: |reading: u16, warn: u16| {
let ok = reading < warn;
println!(" OK LED: {ok} (reading {reading} < warn {warn})");
(ok,)
},
input: (&cat.sensor, ¶ms.warn_threshold),
output: (&cat.led_ok,),
},
Node {
name: "set_warn_led",
func: |reading: u16, warn: u16, crit: u16| {
let warning = reading >= warn && reading < crit;
println!(" WARN LED: {warning}");
(warning,)
},
input: (&cat.sensor, ¶ms.warn_threshold, ¶ms.crit_threshold),
output: (&cat.led_warn,),
},
Node {
name: "set_crit_led",
func: |reading: u16, crit: u16| {
let critical = reading >= crit;
println!(" CRIT LED: {critical}");
(critical,)
},
input: (&cat.sensor, ¶ms.crit_threshold),
output: (&cat.led_crit,),
},
)
}
Pipeline visualization
Dynamic Pipeline
Demonstrates runtime pipeline composition using StepVec: the report
node is only included when the include_report param is true.
Usage
cargo run --example dyn_steps_app -- run
cargo run --example dyn_steps_app -- check
cargo run --example dyn_steps_app -- viz
Types
#[derive(Serialize, Deserialize)]
pub struct Catalog {
pub readings: PolarsCsvDataset,
pub summary: MemoryDataset<f64>,
pub report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
pub struct Params {
pub threshold: Param<f64>,
pub include_report: Param<bool>,
}
Pipeline definition
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
let mut steps = vec![
Node {
name: "summarize",
func: |df: DataFrame| {
let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
(mean,)
},
input: (&cat.readings,),
output: (&cat.summary,),
}
.boxed(),
];
if params.include_report.0 {
steps.push(
Node {
name: "report",
func: |mean: f64, threshold: f64| {
(json!({ "mean": mean, "passed": mean >= threshold }),)
},
input: (&cat.summary, ¶ms.threshold),
output: (&cat.report,),
}
.boxed(),
);
}
steps
}
App entry point
fn main() -> Result<(), PondError> {
let dir = {
let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
manifest.join("examples").join("dyn_steps_data")
};
write_fixtures(&dir);
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_args(std::env::args_os())?
.dispatch(pipeline)
}
Pipeline visualization
Split/Join Pipeline
Demonstrates fan-out/fan-in patterns using TemplatedCatalog, Split, Join,
and StepVec. A combined inventory CSV is split by store into per-store files,
processed independently, then joined back into a comparison report.
Usage
cargo run --example split_join_app -- run
cargo run --example split_join_app -- check
cargo run --example split_join_app -- viz
Types
// ---------------------------------------------------------------------------
// Per-store catalog entry (expanded from YAML template)
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize, Deserialize)]
pub struct StoreCatalog {
pub inventory: PolarsCsvDataset,
pub total_value: MemoryDataset<f64>,
}
// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct Catalog {
pub all_inventory: PolarsCsvDataset,
pub grouped: MemoryDataset<HashMap<String, DataFrame>>,
pub stores: TemplatedCatalog<StoreCatalog>,
pub store_values: MemoryDataset<HashMap<String, f64>>,
pub report: JsonDataset,
}
#[derive(Serialize, Deserialize)]
pub struct Params {
pub low_stock_threshold: Param<i64>,
}
The StoreCatalog struct is the per-entry template. Each store gets its own
inventory CSV dataset and total_value memory dataset, with file paths
expanded from a YAML template:
stores:
placeholder: "store"
template:
inventory:
path: "data/{store}_inventory.csv"
total_value: {}
names: [north, south, east]
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
/// Read the combined CSV and group rows by the "store" column.
fn group_by_store(df: DataFrame) -> Result<(HashMap<String, DataFrame>,), PolarsError> {
let store_col = df.column("store")?.str()?;
let unique: Vec<String> = store_col
.into_no_null_iter()
.map(|s| s.to_string())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let mut map = HashMap::new();
for store in &unique {
let mask = store_col.equal(store.as_str());
map.insert(store.clone(), df.filter(&mask)?);
}
Ok((map,))
}
/// Compute the total stock value for a single store.
///
/// For each row: value = quantity * unit_price.
/// Items below `low_stock_threshold` are flagged but still counted.
fn compute_store_value(df: DataFrame, threshold: i64) -> (f64,) {
let qty = df.column("quantity").unwrap().i64().unwrap();
let price = df.column("unit_price").unwrap().f64().unwrap();
let mut total = 0.0;
let mut low_stock_items = 0;
for i in 0..df.height() {
let q = qty.get(i).unwrap_or(0);
let p = price.get(i).unwrap_or(0.0);
total += q as f64 * p;
if q < threshold {
low_stock_items += 1;
}
}
if low_stock_items > 0 {
log::warn!(
"{low_stock_items} item(s) below stock threshold of {threshold}"
);
}
(total,)
}
/// Build a JSON comparison report from per-store totals.
fn build_report(store_values: HashMap<String, f64>) -> (Value,) {
let grand_total: f64 = store_values.values().sum();
let mut stores: Vec<Value> = store_values
.iter()
.map(|(name, &value)| {
json!({
"store": name,
"total_value": (value * 100.0).round() / 100.0,
"share_pct": ((value / grand_total * 10000.0).round() / 100.0),
})
})
.collect();
stores.sort_by(|a, b| {
b["total_value"]
.as_f64()
.unwrap()
.partial_cmp(&a["total_value"].as_f64().unwrap())
.unwrap()
});
let report = json!({
"grand_total": (grand_total * 100.0).round() / 100.0,
"stores": stores,
});
(report,)
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
// Step 1: group the combined CSV into a HashMap by store.
let mut steps: StepVec<'a> = vec![
Node {
name: "group_by_store",
func: group_by_store,
input: (&cat.all_inventory,),
output: (&cat.grouped,),
}
.boxed(),
];
// Step 2: Split distributes per-store DataFrames to individual CSV files.
steps.push(
Split {
name: "split_stores",
input: &cat.grouped,
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.inventory,
}
.boxed(),
);
// Step 3: per-store processing — dynamically build a node for each store.
for (_, store) in cat.stores.iter() {
steps.push(
Node {
name: "compute_store_value",
func: compute_store_value,
input: (&store.inventory, ¶ms.low_stock_threshold),
output: (&store.total_value,),
}
.boxed(),
);
}
// Step 4: Join collects per-store totals back into a HashMap.
steps.push(
Join {
name: "join_values",
catalog: &cat.stores,
field: |s: &StoreCatalog| &s.total_value,
output: &cat.store_values,
}
.boxed(),
);
// Step 5: build a comparison report from the joined values.
steps.push(
Node {
name: "build_report",
func: build_report,
input: (&cat.store_values,),
output: (&cat.report,),
}
.boxed(),
);
steps
}
The pipeline uses StepVec because the per-store processing nodes are built
dynamically from the TemplatedCatalog entries. The flow is:
- group_by_store — reads the combined CSV and groups rows into a
HashMap<String, DataFrame> - split_stores — distributes each store’s DataFrame to its per-store CSV file
- compute_store_value (one per store) — computes total stock value from each store’s CSV
- join_values — collects per-store totals back into a
HashMap<String, f64> - build_report — produces a JSON comparison report
App entry point
fn main() -> Result<(), PondError> {
let dir = data_dir();
write_fixtures(&dir);
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((LoggingHook::new(),))
.with_args(std::env::args_os())?
.dispatch(pipeline)
}