Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

pondrs

Pipelines of Nodes & Datasets — a Rust pipeline execution library, heavily inspired by Kedro.

Example

Define your catalog and params as structs, with datasets backed by files or memory:

#[derive(Serialize, Deserialize)]
struct Catalog {
    readings: PolarsCsvDataset,
    summary: MemoryDataset<f64>,
    report: JsonDataset,
}

#[derive(Serialize, Deserialize)]
struct Params {
    threshold: Param<f64>,
}

Write a pipeline function that wires nodes together through shared datasets:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "summarize",
            func: |df: DataFrame| {
                let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
                (mean,)
            },
            input: (&cat.readings,),
            output: (&cat.summary,),
        },
        Node {
            name: "report",
            func: |mean: f64, threshold: f64| {
                (json!({ "mean": mean, "passed": mean >= threshold }),)
            },
            input: (&cat.summary, &params.threshold),
            output: (&cat.report,),
        },
    )
}

Configure your catalog and params via YAML and run with the built-in CLI:

# catalog.yml
readings:
  path: data/readings.csv
  separator: ","
summary: {}
report:
  path: data/report.json
# params.yml
threshold: 0.5
    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_args(std::env::args_os())?
    .dispatch(pipeline)
$ my_app run
$ my_app run --params threshold=0.8   # override params from CLI
$ my_app check                        # validate pipeline DAG
$ my_app viz                          # interactive pipeline visualization

Pipeline visualization

Open fullscreen

A minimal pipeline

This chapter walks through the core concepts of pondrs using a minimal example. The same example appears on the introduction page — here we break it apart and explain each piece in detail.

The example reads a CSV of sensor readings, computes the mean, compares it against a threshold parameter, and writes a JSON report. The following sections explain each concept:

  • Parameters — read-only values loaded from YAML (Param<f64>)
  • Datasets — the Dataset trait and the concrete types used here (PolarsCsvDataset, MemoryDataset, JsonDataset)
  • Catalog — the struct that groups datasets together
  • Nodes — the Node struct that connects a function to its input/output datasets
  • Steps — how nodes are composed into a sequence that the runner can execute
  • App — how App ties everything together and provides CLI dispatch

Parameters

Parameters are read-only configuration values that feed into pipeline nodes. They are defined using Param<T>, a thin wrapper that implements the Dataset trait. For more on Param, nested parameters, and struct parameters, see the Params & Catalog chapter.

In the minimal example

The Params struct holds a single threshold value:

#[derive(Serialize, Deserialize)]
struct Params {
    threshold: Param<f64>,
}

Configured via YAML:

# params.yml
threshold: 0.5

The “report” node reads the threshold as one of its inputs. When Param appears in a node’s input tuple, .load() clones the inner value. Because Param::Error is Infallible, this can never fail.

        Node {
            name: "report",
            func: |mean: f64, threshold: f64| {
                (json!({ "mean": mean, "passed": mean >= threshold }),)
            },
            input: (&cat.summary, &params.threshold),
            output: (&cat.report,),
        },

Overriding from the CLI

When using the App framework, parameters can be overridden at runtime without editing YAML files:

$ my_app run --params threshold=0.8

Dot notation works for nested parameter structs:

$ my_app run --params model.learning_rate=0.01

See the YAML Configuration chapter for details.

Datasets

Datasets are the data abstraction in pondrs. Every piece of data flowing through a pipeline — whether it’s a CSV file, an in-memory value, or a hardware register — is a dataset.

The Dataset trait

pub trait Dataset: serde::Serialize {
    type LoadItem;
    type SaveItem;
    type Error;

    fn load(&self) -> Result<Self::LoadItem, Self::Error>;
    fn save(&self, output: Self::SaveItem) -> Result<(), Self::Error>;
    fn is_param(&self) -> bool { false }
}
  • LoadItem — the type produced when loading (e.g. DataFrame, String, f64)
  • SaveItem — the type accepted when saving (often the same as LoadItem)
  • Error — the error type for I/O operations. Use core::convert::Infallible for datasets that never fail (like Param)
  • is_param() — returns true for read-only parameter datasets. The pipeline validator uses this to prevent writing to params.
  • Serialize supertrait — enables automatic YAML serialization of dataset configuration for the viz and catalog indexer.

Datasets in the minimal example

The catalog uses three dataset types:

#[derive(Serialize, Deserialize)]
struct Catalog {
    readings: PolarsCsvDataset,
    summary: MemoryDataset<f64>,
    report: JsonDataset,
}

PolarsCsvDataset

Reads and writes CSV files as Polars DataFrames. Requires the polars feature. Configured with a file path and optional CSV options like separator:

readings:
  path: data/readings.csv
  separator: ","

MemoryDataset<T>

Thread-safe in-memory storage for intermediate values. Starts empty — loading before any save returns DatasetNotLoaded. Requires the std feature. Uses Arc<Mutex<Option<T>>> internally, so it works safely with the ParallelRunner.

summary: {}

JsonDataset

Reads and writes JSON files as serde_json::Value. Requires the json feature.

report:
  path: data/report.json

Further reading

Catalog

The catalog is a plain Rust struct that groups all datasets used by a pipeline. It is not a special type — any struct that derives Serialize and Deserialize and contains dataset fields works as a catalog.

In the minimal example

#[derive(Serialize, Deserialize)]
struct Catalog {
    readings: PolarsCsvDataset,
    summary: MemoryDataset<f64>,
    report: JsonDataset,
}

Each field is a dataset. The field names become the dataset names used in logging, visualization, and error messages — the framework discovers them automatically via serde serialization.

YAML configuration

The catalog struct is deserialized from a YAML file. Each field maps to a YAML key, and the dataset type determines what configuration is needed:

# catalog.yml
readings:
  path: data/readings.csv
  separator: ","
summary: {}
report:
  path: data/report.json
  • File-backed datasets (like PolarsCsvDataset, JsonDataset) need at least a path.
  • In-memory datasets (like MemoryDataset) use an empty mapping {} — they have no persistent configuration.
  • Parameters live in a separate params struct and file, not in the catalog.

Loading the catalog

When using App::from_yaml or App::from_args, the catalog is loaded and deserialized automatically:

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_args(std::env::args_os())?
    .dispatch(pipeline)

You can also load it manually:

let contents = std::fs::read_to_string("catalog.yml")?;
let catalog: Catalog = serde_yaml::from_str(&contents)?;

For nested catalogs, naming conventions, and catalog overrides, see the Params & Catalog chapter.

Nodes

A Node is a single computation unit in the pipeline. It connects a function to its input and output datasets. For a deeper look at NodeInput/NodeOutput and other details, see Pipelines & Nodes — Nodes.

Definition

pub struct Node<F, Input: NodeInput, Output: NodeOutput>
where
    F: StableFn<Input::Args>,
    F::Output: CompatibleOutput<Output::Output>,
{
    pub name: &'static str,
    pub func: F,
    pub input: Input,
    pub output: Output,
}
  • name — a human-readable label used in logging, hooks, and visualization.
  • func — the function to execute. Can be a closure or a named function.
  • input — a tuple of dataset references to load before calling func.
  • output — a tuple of dataset references to save the return value to.

In the minimal example

The “summarize” node reads a CSV file, computes the mean, and stores it in memory:

        Node {
            name: "summarize",
            func: |df: DataFrame| {
                let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
                (mean,)
            },
            input: (&cat.readings,),
            output: (&cat.summary,),
        },

The “report” node reads the mean and a parameter, then writes a JSON report:

        Node {
            name: "report",
            func: |mean: f64, threshold: f64| {
                (json!({ "mean": mean, "passed": mean >= threshold }),)
            },
            input: (&cat.summary, &params.threshold),
            output: (&cat.report,),
        },

Input and output tuples

Inputs and outputs are tuples of dataset references. The function’s arguments must match the LoadItem types of the input datasets, and its return value must match the SaveItem types of the output datasets.

// Single input, single output
input: (&cat.readings,),
output: (&cat.summary,),

// Multiple inputs, single output
input: (&cat.summary, &params.threshold),
output: (&cat.report,),

// No inputs (side-effect node)
input: (),
output: (&cat.result,),

Tuples of up to 10 elements are supported for both inputs and outputs.

Return values

Node functions return tuples matching the output datasets. A single-output node returns a 1-tuple:

func: |x: i32| (x * 2,),  // note the trailing comma

Multi-output nodes return larger tuples:

func: |x: i32| (x + 1, x * 2),
output: (&cat.incremented, &cat.doubled),

Fallible nodes

Node functions can return Result to signal errors:

Node {
    name: "parse",
    func: |text: String| -> Result<(i32,), PondError> {
        let n = text.trim().parse::<i32>().map_err(|e| PondError::Custom(e.to_string()))?;
        Ok((n,))
    },
    input: (&cat.raw_text,),
    output: (&cat.parsed_value,),
}

The CompatibleOutput trait allows both bare tuples and Result<tuple, E> as return types. Type mismatches between the function return and the output datasets are caught at compile time.

See Node Errors for more details.

Type safety

The Node struct uses compile-time checks to ensure:

  1. The function’s argument types match the input datasets’ LoadItem types
  2. The function’s return type matches the output datasets’ SaveItem types
  3. Mismatches produce compile errors at node construction, not at runtime

Steps

Steps are how nodes are composed into a sequence that the runner can execute. A pipeline function returns an impl Steps<E>, which is implemented for tuples of nodes (and pipelines). Tuples of up to 10 elements are supported.

In the minimal example

The pipeline function returns a tuple of two nodes. This tuple automatically implements Steps<PondError>:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "summarize",
            func: |df: DataFrame| {
                let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
                (mean,)
            },
            input: (&cat.readings,),
            output: (&cat.summary,),
        },
        Node {
            name: "report",
            func: |mean: f64, threshold: f64| {
                (json!({ "mean": mean, "passed": mean >= threshold }),)
            },
            input: (&cat.summary, &params.threshold),
            output: (&cat.report,),
        },
    )
}

The tuple ordering defines the sequential execution order. The SequentialRunner executes nodes in this order; the ParallelRunner uses dependency analysis to run independent nodes concurrently, but still respects data dependencies.

Composing steps

Steps are just tuples, so you compose them by adding elements:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node { name: "step1", func: |x| (x,), input: (&params.x,), output: (&cat.a,) },
        Node { name: "step2", func: |a| (a + 1,), input: (&cat.a,), output: (&cat.b,) },
        Node { name: "step3", func: |b| (b * 2,), input: (&cat.b,), output: (&cat.c,) },
    )
}

For grouping related nodes with declared contracts, see Pipeline.

Validation

StepInfo::check() validates the pipeline structure without executing it:

  • No node reads a dataset before it is produced by an earlier node
  • No dataset is produced by more than one node
  • Parameters are never written to
let steps = pipeline(&catalog, &params);
steps.check()?;  // returns Result<(), CheckError>

See Check for details.

The pipeline function

The function that creates steps must be a named function with an explicit lifetime, not a closure:

// Correct: named function with tied lifetime
fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (/* nodes */)
}

// Wrong: closures desugar into two independent lifetimes
let pipeline = |cat: &Catalog, params: &Params| { /* ... */ };

This is because the PipelineFn trait uses a lifetime-on-trait pattern that requires both references to share the same lifetime 'a. Named functions with explicit <'a> satisfy this; closures do not.

App

The App struct ties everything together — catalog, params, hooks, and runners — and provides CLI dispatch for running, validating, and visualizing your pipeline.

In the minimal example

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_args(std::env::args_os())?
    .dispatch(pipeline)

App::from_yaml loads the catalog and params from YAML files, then with_args parses CLI arguments for subcommand selection and parameter overrides. Finally, dispatch runs the appropriate subcommand (run, check, or viz).

Subcommands

$ my_app run                              # execute the pipeline
$ my_app run --params threshold=0.8       # override params from CLI
$ my_app check                            # validate pipeline DAG
$ my_app viz                              # interactive pipeline visualization

For the full details on the App struct, initialization options, YAML configuration, and subcommands, see the App chapter.

Params & Catalog

This chapter takes a deeper look at the two data-holding structs that every pipeline uses: the params struct for read-only configuration and the catalog struct for datasets.

For the basics of how these are used in a pipeline, see A minimal pipeline. For YAML file loading and CLI overrides, see YAML Configuration.

  • ParametersParam<T>, nested parameter structs, and struct parameters
  • Catalog — organizing datasets, nested catalogs, and naming conventions

Parameters

Parameters are read-only configuration values that feed into pipeline nodes. They are defined using Param<T>, a thin wrapper that implements the Dataset trait.

How Param works

#[derive(Debug, Serialize, Deserialize)]
pub struct Param<T: Clone>(pub T);

Param implements Dataset with:

  • LoadItem = T — returns a clone of the inner value
  • SaveItem = () — writing is forbidden; the pipeline validator rejects any node that writes to a Param
  • Error = Infallible — loading always succeeds

Because is_param() returns true, the viz dashboard and pipeline check treat parameters differently from regular datasets.

Nested parameters

Parameters can be organized into nested structs for clarity. Each struct level must derive Serialize and Deserialize:

#[derive(Serialize, Deserialize)]
struct ModelParams {
    learning_rate: Param<f64>,
    epochs: Param<u32>,
}

#[derive(Serialize, Deserialize)]
struct Params {
    model: ModelParams,
    verbose: Param<bool>,
}
# params.yml
model:
  learning_rate: 0.01
  epochs: 100
verbose: true

Nested parameters can be overridden from the CLI with dot notation:

$ my_app run --params model.learning_rate=0.001

Struct parameters

Param<T> works with any T: Clone + Serialize + Deserialize, including custom structs:

#[derive(Clone, Debug, Serialize, Deserialize)]
struct BaselinePeriod {
    start_month: u32,
    end_month: u32,
}

#[derive(Serialize, Deserialize)]
struct Params {
    baseline: Param<BaselinePeriod>,
}
baseline:
  start_month: 1
  end_month: 12

When a node loads this parameter, it receives the full BaselinePeriod struct as its argument.

no_std

Param is available in no_std environments — it requires no feature flags and uses no allocation. Only T: Clone + Serialize is needed.

Catalog

The catalog is a plain Rust struct that groups all datasets used by a pipeline. Any struct that derives Serialize and Deserialize and contains dataset fields works as a catalog — there is no special trait to implement.

Dataset fields

Each field in the catalog is a dataset. The field names become the dataset names used in logging, visualization, and error messages. The framework discovers them automatically via serde serialization.

#[derive(Serialize, Deserialize)]
struct Catalog {
    readings: PolarsCsvDataset,
    summary: MemoryDataset<f64>,
    report: JsonDataset,
}

Nested catalogs

Catalogs can nest other structs for organization. This is useful when a pipeline has many datasets that fall into logical groups:

#[derive(Serialize, Deserialize)]
struct InputData {
    raw_readings: PolarsCsvDataset,
    reference: YamlDataset,
}

#[derive(Serialize, Deserialize)]
struct Catalog {
    input: InputData,
    output: JsonDataset,
    intermediate: MemoryDataset<f64>,
}

The discovered dataset names use dot-separated paths: input.raw_readings, input.reference, output, and intermediate. These names appear in logs, hooks, and the viz dashboard.

The corresponding YAML mirrors the nesting:

input:
  raw_readings:
    path: data/raw.csv
  reference:
    path: data/ref.yml
output:
  path: data/output.json
intermediate: {}

Naming conventions

The framework uses serde struct names to distinguish leaf datasets from container structs:

  • Types whose serde name ends with "Dataset" are treated as leaf datasets — the indexer stops recursing
  • Param is treated as a leaf by name
  • All other struct names are treated as containers — the indexer recurses into their fields

This means custom dataset types should follow the *Dataset naming convention (e.g. TextDataset, MyCustomDataset). Container structs like nested catalogs or parameter groups must not end with “Dataset”.

Catalog overrides

Dataset configuration can be overridden from the CLI using dot notation:

$ my_app run --catalog output.path=/tmp/result.json
$ my_app run --catalog input.raw_readings.separator=";"

See YAML Configuration for the full details on overrides.

Pipelines & Nodes

This chapter covers nodes and pipelines in more depth — the NodeInput/NodeOutput traits, the Pipeline struct for grouping nodes, and pipeline validation.

  • NodesNodeInput, NodeOutput, CompatibleOutput, and side-effect nodes
  • Pipeline — the Pipeline struct for grouping nodes with input/output contracts
  • Dynamic Pipelines — runtime step composition with StepVec
  • Split & Join — fan-out/fan-in patterns with TemplatedCatalog
  • Check — runtime validation of pipeline structure

Nodes

This page covers the Node struct in more depth. For the basics, see A minimal pipeline — Nodes.

NodeInput and NodeOutput traits

These traits handle the mechanics of loading from and saving to dataset tuples:

pub trait NodeInput {
    type Args;
    fn load_data(&self, on_event: ...) -> Result<Self::Args, PondError>;
}

pub trait NodeOutput {
    type Output;
    fn save_data(&self, output: Self::Output, on_event: ...) -> Result<(), PondError>;
}

They are implemented for tuples of dataset references (up to 10 elements) via macros. During execution, load_data fires BeforeLoad/AfterLoad events for each dataset, and save_data fires BeforeSave/AfterSave events — these drive the hook system.

CompatibleOutput

The CompatibleOutput trait is what allows node functions to return either bare tuples or Result<tuple, E>:

// Bare tuple — infallible node
func: |x: i32| (x * 2,),

// Result — fallible node
func: |x: i32| -> Result<(i32,), MyError> { Ok((x * 2,)) },

The bound F::Output: CompatibleOutput<Output::Output> on the Node struct catches type mismatches at node construction time, before the pipeline error type E is known. This means you get a compile error immediately if the function’s return type doesn’t match the output datasets.

IntoNodeResult

When a node is called at runtime, IntoNodeResult normalizes the function’s return value into Result<O, E>:

  • A bare tuple O becomes Ok(O)
  • A Result<O, E> is passed through as-is

This is what allows runners to handle both fallible and infallible nodes uniformly.

Side-effect nodes

Nodes with no outputs are useful for logging, sending notifications, or other side effects:

Node {
    name: "log_summary",
    func: |summary: f64| {
        println!("Summary: {summary}");
    },
    input: (&cat.summary,),
    output: (),
}

A node with output: () does not save any datasets. The function’s return value (unit ()) is discarded.

Similarly, a node with input: () takes no arguments and produces outputs from scratch.

Pipeline

The Pipeline struct groups related steps into a named container with declared input/output dataset contracts.

Definition

pub struct Pipeline<S: StepInfo, Input: NodeInput, Output: NodeOutput> {
    pub name: &'static str,
    pub steps: S,
    pub input: Input,
    pub output: Output,
}
  • name — label for logging, hooks, and visualization
  • steps — a tuple of nodes (and/or nested pipelines)
  • input — datasets this pipeline expects to be available when it runs
  • output — datasets this pipeline guarantees to produce

Pipelines are containers — runners never call them directly. Instead, they recurse into the pipeline’s steps. The PipelineInfo::is_leaf() method returns false for pipelines, signaling the runner to descend into children.

Example

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node { name: "load", func: |v| (v,), input: (&params.input,), output: (&cat.raw,) },
        Pipeline {
            name: "processing",
            steps: (
                Node { name: "clean", func: |v| (v,), input: (&cat.raw,), output: (&cat.clean,) },
                Node { name: "transform", func: |v| (v * 2,), input: (&cat.clean,), output: (&cat.result,) },
            ),
            input: (&cat.raw,),
            output: (&cat.result,),
        },
    )
}

Input/output contracts

The input and output declarations are validated by check():

  • Every declared input must be consumed by at least one child node
  • Every declared output must be produced by at least one child node

If these contracts are violated, check() returns CheckError::UnusedPipelineInput or CheckError::UnproducedPipelineOutput.

Nesting

Pipelines can be nested arbitrarily. The validator and runners recurse through the tree:

Pipeline {
    name: "outer",
    steps: (
        Pipeline {
            name: "inner",
            steps: (/* nodes */),
            input: (/* ... */),
            output: (/* ... */),
        },
        Node { /* ... */ },
    ),
    input: (/* ... */),
    output: (/* ... */),
}

Hooks and visualization

Pipeline boundaries fire their own hook events (before_pipeline_run, after_pipeline_run, on_pipeline_error) distinct from node events. In the visualization, pipelines appear as expandable containers that group their child nodes.

When to use Pipeline vs flat tuples

Use a flat tuple of nodes when the pipeline is simple and linear. Use Pipeline when you want to:

  • Name a group of related steps for logging and visualization
  • Declare input/output contracts that are validated by check()
  • Organize large pipelines into logical sections

Dynamic Pipelines

Sometimes the set of steps in a pipeline isn’t known at compile time. A config flag might enable or disable a step, or the number of steps might depend on runtime data. StepVec provides a type-erased, heap-allocated step container for these cases.

StepVec

pub type StepVec<'a, E = PondError> = Vec<Box<dyn RunnableStep<E> + Send + Sync + 'a>>;

It implements StepInfo and Steps<E>, so it works everywhere tuples do — as the return type of a pipeline function, as the steps field of a Pipeline, and with check(), runners, and visualization.

Use RunnableStep::boxed() to convert a Node or Pipeline into a boxed trait object:

let step: Box<dyn RunnableStep<PondError> + Send + Sync + 'a> =
    Node { name: "n", func: |v| (v,), input: (&a,), output: (&b,) }.boxed();

Conditional nodes

The primary use case is including or excluding nodes based on runtime configuration:

pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
    let mut steps = vec![
        Node {
            name: "summarize",
            func: |df: DataFrame| {
                let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
                (mean,)
            },
            input: (&cat.readings,),
            output: (&cat.summary,),
        }
        .boxed(),
    ];

    if params.include_report.0 {
        steps.push(
            Node {
                name: "report",
                func: |mean: f64, threshold: f64| {
                    (json!({ "mean": mean, "passed": mean >= threshold }),)
                },
                input: (&cat.summary, &params.threshold),
                output: (&cat.report,),
            }
            .boxed(),
        );
    }

    steps
}

The pipeline function returns StepVec<'a> instead of impl Steps<PondError> + 'a. Each node is .boxed() before being added to the vec, and conditional nodes are pushed with if.

Nesting inside static pipelines

StepVec can be used as the steps of a Pipeline, which can itself be placed in a static tuple:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    let dynamic_section = Pipeline {
        name: "optional_reports",
        steps: {
            let mut s: StepVec<'a> = vec![
                Node { name: "base_report", ... }.boxed(),
            ];
            if params.detailed.0 {
                s.push(Node { name: "detailed_report", ... }.boxed());
            }
            s
        },
        input: (&cat.summary,),
        output: (&cat.report,),
    };

    (
        Node { name: "summarize", ... },
        dynamic_section,
    )
}

This lets you keep type safety for the fixed parts of your pipeline and only pay for dynamic dispatch where you need it.

Validation

check() works identically for StepVec and tuple-based pipelines — it iterates the items and validates sequential ordering, duplicate outputs, and pipeline contracts. Since StepVec is built at runtime, validation applies to the constructed pipeline only, not hypothetical alternatives. An excluded conditional node won’t be checked.

If a StepVec is wrapped in a Pipeline with declared outputs, those outputs must be produced by the nodes that are actually present. An empty StepVec in a Pipeline that declares outputs will correctly fail with UnproducedPipelineOutput.

When to use StepVec vs tuples

Use tuples (the default) when all nodes are known at compile time. You get zero-cost dispatch and full type checking.

Use StepVec when you need:

  • Conditional inclusion/exclusion of nodes based on config or params
  • A variable number of nodes determined at runtime
  • Nodes of heterogeneous types that can’t be expressed in a single tuple

Split & Join

The Dynamic Pipelines chapter showed how StepVec lets you include or exclude nodes based on params — a boolean flag controls whether a report node runs. The pipeline shape changes, but the datasets are still fixed at compile time.

Split & Join solve a different problem: the catalog determines the pipeline shape. When you have multiple items of the same kind — stores, regions, sensors — you want to run the same processing for each one, with each item getting its own datasets. The number of items comes from configuration, not code.

The pattern

A typical fan-out/fan-in pipeline looks like this:

 [combined data]
       │
   ┌───┴───┐        Split
   ▼       ▼
[item A] [item B]    Per-item processing
   │       │
   └───┬───┘        Join
       ▼
  [collected results]
  1. Split takes a HashMap<String, T> from a single dataset and distributes each value to a per-item dataset
  2. Per-item nodes process each item independently (and can run in parallel)
  3. Join collects a value from each per-item dataset back into a HashMap<String, T>

TemplatedCatalog

The per-item datasets live in a TemplatedCatalog<S> — a collection of identically-shaped catalog structs, one per item:

#[derive(Debug, Serialize, Deserialize)]
struct StoreCatalog {
    inventory: PolarsCsvDataset,
    total_value: MemoryDataset<f64>,
}

#[derive(Serialize, Deserialize)]
struct Catalog {
    // ...
    stores: TemplatedCatalog<StoreCatalog>,
    // ...
}

In YAML, a TemplatedCatalog is defined with a template and a list of names. String values containing {placeholder} are expanded per entry:

stores:
  placeholder: "store"
  template:
    inventory:
      path: "data/{store}_inventory.csv"
    total_value: {}
  names: [north, south, east]

This produces three StoreCatalog instances — north, south, east — each with its own file path. The placeholder field is optional and defaults to "name".

TemplatedCatalog serializes as a map, so the catalog indexer produces meaningful names like stores.north.inventory.

Split

Split is a leaf node (like Ident) that loads a HashMap<String, T> from an input dataset and saves each value to the corresponding entry in a TemplatedCatalog. A field accessor selects which dataset within each entry to write to:

Split {
    name: "split_stores",
    input: &cat.grouped,                     // MemoryDataset<HashMap<String, DataFrame>>
    catalog: &cat.stores,                     // TemplatedCatalog<StoreCatalog>
    field: |s: &StoreCatalog| &s.inventory,   // which dataset to write to
}

At runtime, Split validates that the HashMap keys exactly match the catalog entry names. A mismatch produces a PondError::KeyMismatch error.

For check(), Split reports the single input dataset and all per-entry field datasets as outputs — so downstream nodes that read from those datasets are correctly validated.

Join

Join is the inverse: it loads a value from each entry’s dataset and collects them into a HashMap<String, T>:

Join {
    name: "join_values",
    catalog: &cat.stores,                       // TemplatedCatalog<StoreCatalog>
    field: |s: &StoreCatalog| &s.total_value,   // which dataset to read from
    output: &cat.store_values,                   // MemoryDataset<HashMap<String, f64>>
}

For check(), Join reports all per-entry field datasets as inputs and the single output dataset as output.

Building per-item nodes

Between Split and Join, you need processing nodes for each item. Since the number of items is determined by YAML config, you build these dynamically with StepVec:

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
    // Step 1: group the combined CSV into a HashMap by store.
    let mut steps: StepVec<'a> = vec![
        Node {
            name: "group_by_store",
            func: group_by_store,
            input: (&cat.all_inventory,),
            output: (&cat.grouped,),
        }
        .boxed(),
    ];

    // Step 2: Split distributes per-store DataFrames to individual CSV files.
    steps.push(
        Split {
            name: "split_stores",
            input: &cat.grouped,
            catalog: &cat.stores,
            field: |s: &StoreCatalog| &s.inventory,
        }
        .boxed(),
    );

    // Step 3: per-store processing — dynamically build a node for each store.
    for (_, store) in cat.stores.iter() {
        steps.push(
            Node {
                name: "compute_store_value",
                func: compute_store_value,
                input: (&store.inventory, &params.low_stock_threshold),
                output: (&store.total_value,),
            }
            .boxed(),
        );
    }

    // Step 4: Join collects per-store totals back into a HashMap.
    steps.push(
        Join {
            name: "join_values",
            catalog: &cat.stores,
            field: |s: &StoreCatalog| &s.total_value,
            output: &cat.store_values,
        }
        .boxed(),
    );

    // Step 5: build a comparison report from the joined values.
    steps.push(
        Node {
            name: "build_report",
            func: build_report,
            input: (&cat.store_values,),
            output: (&cat.report,),
        }
        .boxed(),
    );

    steps
}

Each call to cat.stores.iter() yields (&str, &StoreCatalog) pairs in name-insertion order. The per-store nodes reference datasets owned by each StoreCatalog entry, so they are naturally wired into the correct fan-out/fan-in structure.

Comparison with PartitionedDataset

PartitionedDataset handles a similar concept — a directory of files keyed by name — but at the dataset level. A single node reads or writes all partitions at once as a HashMap. Split & Join operate at the pipeline level: they let you run separate nodes for each item, with each item having its own arbitrarily complex set of datasets.

Use PartitionedDataset when a single node can handle all items. Use Split & Join when each item needs its own processing sub-pipeline.

Nested templates

TemplatedCatalog supports nesting. An outer template can contain an inner TemplatedCatalog with a different placeholder:

regions:
  placeholder: "region"
  template:
    metrics:
      placeholder: "metric"
      template:
        raw:
          path: "data/{region}/{metric}/raw.csv"
      names: [temperature, humidity]
  names: [north, south]

This produces paths like data/north/temperature/raw.csv. The outer placeholder is substituted first, so inner templates see the expanded value.

Check

Pipeline validation catches structural errors before execution. The check() method on StepInfo walks the pipeline and verifies several invariants.

Usage

let steps = pipeline(&catalog, &params);
steps.check()?;  // Result<(), CheckError>

Or via the CLI:

$ my_app check
Pipeline is valid.

What check validates

Sequential ordering

A node must not read a dataset that is only produced by a later node. Datasets that no node produces are treated as external inputs (valid).

// Valid: n1 produces a, n2 reads a
(
    Node { name: "n1", input: (&param,), output: (&a,), .. },
    Node { name: "n2", input: (&a,),     output: (&b,), .. },
)

// Invalid: n1 reads b, but b is produced by n2
(
    Node { name: "n1", input: (&b,),     output: (&a,), .. },
    Node { name: "n2", input: (&param,), output: (&b,), .. },
)
// → CheckError::InputNotProduced { node_name: "n1", .. }

No duplicate outputs

A dataset must not be produced by more than one node:

(
    Node { name: "n1", input: (&param,), output: (&a,), .. },
    Node { name: "n2", input: (&param,), output: (&a,), .. },  // same output!
)
// → CheckError::DuplicateOutput { node_name: "n2", .. }

Params are read-only

No node may write to a Param dataset:

(
    Node { name: "n1", func: || ((),), input: (), output: (&param,) },
)
// → CheckError::ParamWritten { node_name: "n1", .. }

Pipeline contracts

For Pipeline structs, the declared inputs and outputs must match what the children actually consume and produce. See Pipeline.

CheckError variants

VariantMeaning
InputNotProducedNode reads a dataset produced by a later node
DuplicateOutputTwo nodes produce the same dataset
ParamWrittenA node writes to a Param
UnusedPipelineInputPipeline declares an input its children don’t consume
UnproducedPipelineOutputPipeline declares an output its children don’t produce
CapacityExceededInternal dataset buffer overflow (see below)

Capacity

check() uses a fixed-capacity buffer (default 20 datasets) for no_std compatibility. If your pipeline has more than 20 unique datasets, use check_with_capacity:

steps.check_with_capacity::<64>()?;

no_std compatibility

check() works in no_std environments. It uses no allocation — all dataset tracking is done in fixed-size arrays on the stack.

Error handling

pondrs uses Rust’s standard Result type for error propagation. The framework provides PondError for infrastructure errors, while letting you define your own error type for domain-specific failures.

  • Error TypePondError, custom error types, and the From<PondError> requirement
  • Node Errors — how fallible node functions propagate errors
  • Dataset Errors — the Dataset::Error associated type and adding custom error variants

Error Type

PondError

PondError is the framework-level error type. It covers infrastructure failures like I/O errors, serialization errors, and dataset-not-loaded conditions:

pub enum PondError {
    #[cfg(feature = "std")]    Io(std::io::Error),
    #[cfg(feature = "polars")] Polars(polars::error::PolarsError),
    #[cfg(feature = "yaml")]   YamlScan(yaml_rust2::ScanError),
    #[cfg(feature = "yaml")]   YamlEmit(yaml_rust2::EmitError),
    #[cfg(feature = "std")]    SerdeYaml(serde_yaml::Error),
    #[cfg(any(feature = "json", feature = "plotly", feature = "viz"))]
                               Json(serde_json::Error),
    #[cfg(feature = "image")]  Image(image::ImageError),

    DatasetNotLoaded,          // always available (no_std)
    RunnerNotFound,
    CheckFailed,
    #[cfg(feature = "std")]    LockPoisoned(String),
    #[cfg(feature = "std")]    Custom(String),
}

Variants are feature-gated — only DatasetNotLoaded, RunnerNotFound, and CheckFailed are available in no_std builds.

Using PondError directly

For simple pipelines, you can use PondError as your pipeline error type:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (/* nodes */)
}

This works because PondError trivially satisfies From<PondError>.

Custom error types

When you need domain-specific error variants, define your own error enum with a From<PondError> conversion:

#[derive(Debug, thiserror::Error)]
enum MyError {
    #[error(transparent)]
    Pond(#[from] PondError),

    #[error("validation failed: {0}")]
    Validation(String),

    #[error("threshold exceeded: {value} > {max}")]
    ThresholdExceeded { value: f64, max: f64 },
}

The #[from] attribute on the PondError variant provides the required From<PondError> implementation. Your pipeline function then uses MyError as its error type:

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<MyError> + 'a {
    (/* nodes that can return MyError */)
}

The From<PondError> requirement

The pipeline error type E must satisfy E: From<PondError>. This is how the framework converts dataset I/O errors and infrastructure failures into your pipeline’s error type. Without this conversion, dataset load() and save() calls couldn’t propagate errors through your nodes.

Adding variants for custom datasets

If you implement a custom dataset whose Error type is not already covered by PondError, you have two choices:

  1. Use PondError::Custom (simplest): convert your error to a string.

    impl Dataset for MyDataset {
        type Error = PondError;
        fn load(&self) -> Result<Self::LoadItem, PondError> {
            do_something().map_err(|e| PondError::Custom(e.to_string()))
        }
    }
  2. Add a variant to your pipeline error type: keep the original error type in your custom dataset and convert it in your pipeline error enum.

    impl Dataset for MyDataset {
        type Error = MyDatasetError;
        // ...
    }
    
    #[derive(Debug, thiserror::Error)]
    enum MyError {
        #[error(transparent)]
        Pond(#[from] PondError),
        #[error(transparent)]
        MyDataset(#[from] MyDatasetError),
    }

    This requires PondError: From<MyDatasetError> or using your custom pipeline error type E where E: From<PondError> + From<MyDatasetError>. See Dataset Errors for the full pattern.

Node Errors

Node functions can be either infallible (returning a bare tuple) or fallible (returning Result).

Infallible nodes

The simplest nodes return a bare tuple. They cannot fail:

Node {
    name: "double",
    func: |x: i32| (x * 2,),
    input: (&params.x,),
    output: (&cat.doubled,),
}

Fallible nodes

Nodes that can fail return Result<(outputs...), E> where E is the pipeline error type:

Node {
    name: "parse",
    func: |text: String| -> Result<(i32,), PondError> {
        let n = text.trim().parse::<i32>()
            .map_err(|e| PondError::Custom(e.to_string()))?;
        Ok((n,))
    },
    input: (&cat.raw_text,),
    output: (&cat.parsed,),
}

How it works: IntoNodeResult

The IntoNodeResult trait normalizes both bare tuples and Result returns into Result<O, E>:

pub trait IntoNodeResult<O, E> {
    fn into_node_result(self) -> Result<O, E>;
}
  • For bare tuples O: wraps in Ok(value)
  • For Result<O, E>: passes through unchanged

This means the same Node struct works for both infallible and fallible functions.

CompatibleOutput

The CompatibleOutput trait ensures at compile time that the function’s return type matches the output datasets. It accepts both:

  • O (bare tuple) — direct match
  • Result<O, E> — the Ok type must match

If the types don’t match, you get a compile error when constructing the Node, not at runtime.

Error propagation

When a fallible node returns Err(e):

  1. The runner catches the error
  2. The on_node_error hook fires with the error message
  3. If the node is inside a Pipeline, on_pipeline_error fires for each ancestor
  4. The runner stops executing and returns the error

Using custom error types

With a custom error type, nodes can return domain-specific errors:

#[derive(Debug, thiserror::Error)]
enum MyError {
    #[error(transparent)]
    Pond(#[from] PondError),
    #[error("value {0} exceeds limit")]
    TooLarge(f64),
}

Node {
    name: "validate",
    func: |value: f64| -> Result<(f64,), MyError> {
        if value > 100.0 {
            return Err(MyError::TooLarge(value));
        }
        Ok((value,))
    },
    input: (&cat.raw,),
    output: (&cat.validated,),
}

Dataset Errors

Each dataset declares its own Error associated type. The framework needs to convert these errors into the pipeline’s error type E via PondError.

The conversion chain

When a node loads or saves a dataset, errors flow through this chain:

Dataset::Error → PondError → E (pipeline error type)

The first conversion (Dataset::Error → PondError) is required by the NodeInput and NodeOutput trait bounds: PondError: From<D::Error>. The second conversion (PondError → E) is required by the runner: E: From<PondError>.

Built-in dataset errors

All built-in datasets use error types that already have From implementations into PondError:

DatasetError typePondError variant
Param<T>Infallible(never fails)
CellDataset<T>PondErrordirect
MemoryDataset<T>PondErrordirect
PolarsCsvDatasetPondErrorPolars, Io
JsonDatasetPondErrorJson, Io
TextDatasetPondErrorIo
YamlDatasetPondErrorYamlScan, Io
CacheDataset<D>PondErrorwraps inner

Custom datasets with PondError

The simplest approach for custom datasets is to use PondError directly as the error type:

impl Dataset for MyDataset {
    type LoadItem = MyData;
    type SaveItem = MyData;
    type Error = PondError;

    fn load(&self) -> Result<MyData, PondError> {
        let bytes = std::fs::read(&self.path)?;  // Io variant via From
        parse_my_format(&bytes)
            .map_err(|e| PondError::Custom(e.to_string()))
    }

    fn save(&self, data: MyData) -> Result<(), PondError> {
        let bytes = serialize_my_format(&data)
            .map_err(|e| PondError::Custom(e.to_string()))?;
        std::fs::write(&self.path, bytes)?;
        Ok(())
    }
}

Custom datasets with their own error type

If you want to preserve error type information, your dataset can use its own error type. You then need PondError: From<YourError>:

#[derive(Debug, thiserror::Error)]
pub enum MyDatasetError {
    #[error("parse error at line {line}: {msg}")]
    Parse { line: usize, msg: String },
    #[error("io error: {0}")]
    Io(#[from] std::io::Error),
}

impl Dataset for MyDataset {
    type LoadItem = MyData;
    type SaveItem = MyData;
    type Error = MyDatasetError;
    // ...
}

Since PondError doesn’t have a variant for every possible custom error, you typically handle this through your pipeline error type:

#[derive(Debug, thiserror::Error)]
enum PipelineError {
    #[error(transparent)]
    Pond(#[from] PondError),
    #[error(transparent)]
    MyDataset(#[from] MyDatasetError),
}

However, the NodeInput/NodeOutput bounds require PondError: From<MyDatasetError>. If this conversion doesn’t exist, the dataset won’t compile as a node input/output. The easiest solution is to use PondError as your dataset’s error type and use PondError::Custom for the domain-specific cases, or to implement the From conversion yourself.

Infallible datasets

Datasets that never fail (like Param) use core::convert::Infallible:

impl Dataset for Param<T> {
    type Error = Infallible;
    fn load(&self) -> Result<T, Infallible> { Ok(self.0.clone()) }
}

PondError implements From<Infallible> (via the match x {} pattern), so this works with any pipeline error type.

Hooks

Hooks let you observe pipeline execution events without modifying the pipeline itself. They are used for logging, timing, visualization, and custom instrumentation.

The Hook trait

pub trait Hook: Sync {
    // Pipeline lifecycle
    fn before_pipeline_run(&self, p: &dyn PipelineInfo) {}
    fn after_pipeline_run(&self, p: &dyn PipelineInfo) {}
    fn on_pipeline_error(&self, p: &dyn PipelineInfo, error: &str) {}

    // Node lifecycle
    fn before_node_run(&self, n: &dyn PipelineInfo) {}
    fn after_node_run(&self, n: &dyn PipelineInfo) {}
    fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {}

    // Dataset lifecycle (fired per-dataset during Node::call)
    fn before_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
    fn after_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
    fn before_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
    fn after_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
}

All methods have default no-op implementations, so you only override the ones you care about.

The Hooks trait

Multiple hooks are composed as a tuple:

pub trait Hooks: Sync {
    fn for_each_hook(&self, f: &mut dyn FnMut(&dyn Hook));
}

Implemented for tuples of up to 10 hooks, plus () (no hooks):

// No hooks
App::new(catalog, params).execute(pipeline)

// One hook
App::new(catalog, params)
    .with_hooks((LoggingHook::new(),))
    .execute(pipeline)

// Multiple hooks
App::new(catalog, params)
    .with_hooks((LoggingHook::new(), my_custom_hook))
    .execute(pipeline)

Hook arguments

All hook methods receive &dyn PipelineInfo, which provides:

  • name() — the node or pipeline name (&'static str)
  • is_leaf()true for nodes, false for pipelines
  • type_string() — the Rust type name of the underlying function
  • for_each_input() / for_each_output() — iterate over dataset references

Dataset hook methods additionally receive &DatasetRef, which provides:

  • id — a unique identifier (pointer-based)
  • name — an Option<&str> with the resolved dataset name (from the catalog indexer, available when using std)
  • meta&dyn DatasetMeta with is_param(), type_string(), and (with std) html() and yaml()

Sync requirement

Hooks must be Sync because the ParallelRunner calls hook methods from multiple threads. For hooks that need interior mutability (e.g. to track timing), use thread-safe types like Mutex or DashMap.

This chapter covers:

Dataset Hooks

Dataset hooks fire during the load and save operations inside Node::call(). Each hook receives the owning node and the dataset reference.

Methods

fn before_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_loaded(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn before_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}
fn after_dataset_saved(&self, n: &dyn PipelineInfo, ds: &DatasetRef) {}

Arguments

  • n — the node that is loading/saving the dataset. Use n.name() to get the node name.
  • ds — the dataset reference:
    • ds.id — unique pointer-based identifier
    • ds.name — resolved name from the catalog (e.g. Some("readings")), or None in no_std
    • ds.meta.is_param() — whether this is a parameter dataset
    • ds.meta.type_string() — the Rust type name (e.g. "pondrs::datasets::memory::MemoryDataset<f64>")
    • ds.meta.html() — (std only) returns an optional HTML snippet for the dataset’s current contents. Datasets like PlotlyDataset override this to produce rich visualizations; file-backed datasets render their contents as formatted text. Used by the viz dashboard.
    • ds.meta.yaml() — (std only) returns the dataset’s configuration serialized as YAML. This is produced automatically via the Serialize supertrait and is used by the viz dashboard to display dataset settings.

Firing order

For a node with two inputs and one output, hooks fire in this order:

  1. before_dataset_loaded (input 0)
  2. after_dataset_loaded (input 0)
  3. before_dataset_loaded (input 1)
  4. after_dataset_loaded (input 1)
  5. node function executes
  6. before_dataset_saved (output 0)
  7. after_dataset_saved (output 0)

Dataset hooks fire inside the before_node_run / after_node_run window. The sequence is always: before_node_run → dataset loads → function call → dataset saves → after_node_run.

Example: tracking I/O time

use std::sync::Mutex;
use std::collections::HashMap;
use std::time::Instant;

struct IoTimingHook {
    starts: Mutex<HashMap<usize, Instant>>,
}

impl Hook for IoTimingHook {
    fn before_dataset_loaded(&self, _n: &dyn PipelineInfo, ds: &DatasetRef) {
        self.starts.lock().unwrap().insert(ds.id, Instant::now());
    }

    fn after_dataset_loaded(&self, _n: &dyn PipelineInfo, ds: &DatasetRef) {
        if let Some(start) = self.starts.lock().unwrap().remove(&ds.id) {
            let name = ds.name.unwrap_or("<unknown>");
            println!("  loaded {} in {:.1}ms", name, start.elapsed().as_secs_f64() * 1000.0);
        }
    }
}

Name resolution

In std builds, the runner resolves dataset names from the catalog indexer before dispatching to hooks. This is why ds.name is Option<&str> — it’s Some("readings") when the catalog indexer can map the pointer to a field name, and None in no_std builds where the indexer isn’t available.

Node Hooks

Node hooks fire when a runner starts and finishes executing a node, or when a node encounters an error.

Methods

fn before_node_run(&self, n: &dyn PipelineInfo) {}
fn after_node_run(&self, n: &dyn PipelineInfo) {}
fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {}

Arguments

  • n — the node being executed. Use n.name() for the node name, n.type_string() for the function’s type name.
  • error (on on_node_error) — the stringified error message. In std builds this is e.to_string(); in no_std it’s the fixed string "node error".

Lifecycle

For a successful node execution:

before_node_run(n)
  before_dataset_loaded(n, ds0)
  after_dataset_loaded(n, ds0)
  ... (function executes) ...
  before_dataset_saved(n, ds_out)
  after_dataset_saved(n, ds_out)
after_node_run(n)

For a failed node:

before_node_run(n)
  ... (error occurs during load, function, or save) ...
on_node_error(n, "error message")

after_node_run and on_node_error are mutually exclusive — exactly one fires per node execution.

Example: counting nodes

use std::sync::atomic::{AtomicUsize, Ordering};

struct NodeCounter {
    count: AtomicUsize,
}

impl Hook for NodeCounter {
    fn after_node_run(&self, n: &dyn PipelineInfo) {
        let i = self.count.fetch_add(1, Ordering::Relaxed) + 1;
        println!("Completed node {} ({}/total)", n.name(), i);
    }

    fn on_node_error(&self, n: &dyn PipelineInfo, error: &str) {
        eprintln!("Node {} failed: {}", n.name(), error);
    }
}

Parallel runner behavior

With the ParallelRunner, node hooks may fire from different threads concurrently. This is why Hook: Sync is required. Use atomic types or Mutex for any shared state in your hook.

Pipeline Hooks

Pipeline hooks fire at the boundaries of Pipeline structs (not flat tuples). They are useful for tracking the lifecycle of logical groups of nodes.

Methods

fn before_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn after_pipeline_run(&self, p: &dyn PipelineInfo) {}
fn on_pipeline_error(&self, p: &dyn PipelineInfo, error: &str) {}

Arguments

  • p — the pipeline being executed. p.name() returns the pipeline’s name, p.is_leaf() returns false.
  • error — the stringified error message from the failing node within the pipeline.

Sequential runner behavior

The SequentialRunner fires pipeline hooks as it enters and exits Pipeline structs:

before_pipeline_run("processing")
  before_node_run("clean")
  after_node_run("clean")
  before_node_run("transform")
  after_node_run("transform")
after_pipeline_run("processing")

If a child node fails, on_pipeline_error fires instead of after_pipeline_run.

Parallel runner behavior

The ParallelRunner fires pipeline hooks based on dataset availability:

  • before_pipeline_run fires when all of the pipeline’s declared inputs are available
  • after_pipeline_run fires when all of the pipeline’s declared outputs have been produced
  • on_pipeline_error fires when a child node fails — it propagates up through all ancestor pipelines

This means pipeline hooks may fire at different times than in sequential execution, since nodes may run in a different order.

Flat tuples vs Pipeline structs

Pipeline hooks only fire for Pipeline structs. A flat tuple of nodes at the top level does not trigger before_pipeline_run / after_pipeline_run. If you want pipeline-level hooks for your entire pipeline, wrap the top-level steps in a Pipeline.

Example: timing pipelines

struct PipelineTimer {
    timings: Mutex<HashMap<&'static str, Instant>>,
}

impl Hook for PipelineTimer {
    fn before_pipeline_run(&self, p: &dyn PipelineInfo) {
        self.timings.lock().unwrap().insert(p.name(), Instant::now());
    }

    fn after_pipeline_run(&self, p: &dyn PipelineInfo) {
        if let Some(start) = self.timings.lock().unwrap().remove(p.name()) {
            println!("[{}] completed in {:.1}ms", p.name(), start.elapsed().as_secs_f64() * 1000.0);
        }
    }
}

Built-in Hooks

pondrs provides two built-in hook implementations.

LoggingHook

Requires the std feature.

Logs pipeline and node lifecycle events using the log crate, with automatic timing:

use pondrs::hooks::LoggingHook;

App::new(catalog, params)
    .with_hooks((LoggingHook::new(),))
    .execute(pipeline)?;

Output (with env_logger at info level):

[INFO] [pipeline] processing - starting
[INFO] [node] clean - starting
[INFO] [node] clean - completed (12.3ms)
[INFO] [node] transform - starting
[INFO] [node] transform - completed (5.1ms)
[INFO] [pipeline] processing - completed (17.8ms)

At debug level, dataset load/save events are also logged:

[DEBUG]   loading readings
[DEBUG]   loaded readings (8.2ms)
[DEBUG]   saving summary
[DEBUG]   saved summary (0.1ms)

LoggingHook uses a TimingTracker internally to measure durations between before/after pairs.

VizHook

Requires the viz feature.

Posts live execution events to a running viz server via HTTP. This enables the interactive visualization to show real-time node status during app run:

use pondrs::viz::VizHook;

App::new(catalog, params)
    .with_hooks((LoggingHook::new(), VizHook::new("http://localhost:8080".into())))
    .execute(pipeline)?;

The typical workflow is:

  1. Start the viz server: my_app viz --port 8080
  2. In another terminal, run the pipeline with VizHook attached

VizHook is fire-and-forget — it silently ignores HTTP errors, so a missing viz server won’t crash your pipeline. It tracks:

  • Node start/end/error events
  • Dataset load/save durations

Each event is posted as a VizEvent to POST /api/status, which the viz server broadcasts to connected WebSocket clients.

Combining hooks

Hooks compose as tuples:

.with_hooks((
    LoggingHook::new(),
    VizHook::new("http://localhost:8080".into()),
    my_custom_hook,
))

Each hook receives every event independently. Order in the tuple determines call order, but hooks should not depend on ordering.

Runners

Runners execute pipeline steps. pondrs provides two built-in runners and lets you select between them at runtime.

The Runner trait

pub trait Runner {
    fn name(&self) -> &'static str;

    fn run<E>(
        &self,
        pipe: &impl Steps<E>,
        catalog: &impl Serialize,
        params: &impl Serialize,
        hooks: &impl Hooks,
    ) -> Result<(), E>
    where
        E: From<PondError> + Send + Sync + Display + Debug + 'static;
}
  • name() — identifies the runner for CLI selection (--runner sequential)
  • run() — executes the pipeline, calling hooks at each lifecycle point

The Runners trait

Multiple runners compose as tuples, enabling runtime selection:

pub trait Runners {
    fn first_name(&self) -> &'static str;
    fn run_by_name<E>(&self, name: &str, ...) -> Option<Result<(), E>>;
    fn for_each_name(&self, f: &mut dyn FnMut(&str));
}

The default runners depend on the feature set:

  • std(SequentialRunner, ParallelRunner) — sequential is the default
  • no_std(SequentialRunner,) — only sequential is available

Selecting a runner

Via CLI:

$ my_app run                       # uses default (sequential)
$ my_app run --runner parallel     # uses parallel runner
$ my_app run --runner sequential   # explicit sequential

Via code:

App::new(catalog, params)
    .with_runners((SequentialRunner, ParallelRunner))
    .execute(pipeline)?;

Custom runners

You can implement Runner for your own types. Add them to the runners tuple:

App::new(catalog, params)
    .with_runners((SequentialRunner, ParallelRunner, MyDistributedRunner))
    .execute(pipeline)?;
$ my_app run --runner my_distributed

This chapter covers:

Runner Trait

The Runner trait defines how a pipeline is executed. Implementing it lets you create custom execution strategies.

Definition

pub trait Runner {
    fn name(&self) -> &'static str;

    fn run<E>(
        &self,
        pipe: &impl Steps<E>,
        catalog: &impl Serialize,
        params: &impl Serialize,
        hooks: &impl Hooks,
    ) -> Result<(), E>
    where
        E: From<PondError> + Send + Sync + Display + Debug + 'static;
}

Implementing a runner

A runner walks the pipeline steps and executes each node. The key types to work with are:

  • Steps<E> — iterate steps via for_each_item()
  • RunnableStep<E> — each step, which is either a leaf node or a pipeline container
    • is_leaf()true for nodes, false for pipelines
    • call(on_event) — execute a leaf node, passing a callback for dataset events
    • for_each_child_step() — iterate children of a pipeline container
  • Hooks — fire lifecycle events via for_each_hook()

See the SequentialRunner and ParallelRunner source code for concrete implementation examples.

Dataset event dispatch

When calling item.call(on_event), the on_event callback receives (&DatasetRef, DatasetEvent) for each dataset load/save. The built-in runners use the dispatch_dataset_event helper to resolve dataset names from the catalog index and forward to hooks. A custom runner can do the same or handle events differently.

Catalog indexing

The catalog and params arguments to run() are provided so the runner can build a dataset name index. The built-in runners use index_catalog_with_params(catalog, params) (available in std) to create a HashMap<usize, String> mapping pointer IDs to field names. This is optional — a no_std runner can ignore these arguments.

Sequential Runner

The SequentialRunner executes pipeline steps one at a time in definition order.

Overview

#[derive(Default)]
pub struct SequentialRunner;

impl Runner for SequentialRunner {
    fn name(&self) -> &'static str { "sequential" }
    // ...
}
  • Available in both std and no_std environments
  • Executes nodes in the exact order they appear in the steps tuple
  • Recursively enters Pipeline containers, executing children in order
  • Default runner when no --runner flag is specified

Behavior

Given this pipeline:

(
    Node { name: "a", .. },
    Pipeline {
        name: "inner",
        steps: (
            Node { name: "b", .. },
            Node { name: "c", .. },
        ),
        ..
    },
    Node { name: "d", .. },
)

Execution order is: abcd.

Hook events fire in this order:

before_node_run("a")
after_node_run("a")
before_pipeline_run("inner")
  before_node_run("b")
  after_node_run("b")
  before_node_run("c")
  after_node_run("c")
after_pipeline_run("inner")
before_node_run("d")
after_node_run("d")

Error handling

If any node fails, execution stops immediately. No subsequent nodes are executed. The error propagates up through any enclosing Pipeline containers, firing on_pipeline_error at each level.

no_std differences

In no_std builds:

  • Dataset names are not resolved (no catalog indexer), so ds.name in hook callbacks is always None
  • Error messages in on_node_error are the fixed string "node error" instead of the full error message

When to use

Use the sequential runner when:

  • Your nodes have strict ordering requirements
  • You’re in a no_std environment
  • You want deterministic, predictable execution
  • Debugging — sequential execution makes it easier to trace issues

Parallel Runner

The ParallelRunner executes independent pipeline nodes concurrently using scoped threads.

Requires the std feature.

Overview

#[derive(Default)]
pub struct ParallelRunner;

impl Runner for ParallelRunner {
    fn name(&self) -> &'static str { "parallel" }
    // ...
}

How it works

  1. Builds a dependency graph from the pipeline using build_pipeline_graph()
  2. Identifies source datasets — params and external inputs that are available immediately
  3. Schedules nodes as soon as all their input datasets have been produced
  4. Tracks produced datasets — when a node completes, its output datasets become available, potentially unblocking other nodes
  5. Uses std::thread::scope for safe scoped threads — no 'static bounds needed

Usage

$ my_app run --runner parallel

Or programmatically:

use pondrs::runners::ParallelRunner;

App::new(catalog, params)
    .with_runners((SequentialRunner, ParallelRunner))
    .execute(pipeline)?;

Dependency analysis

The parallel runner determines which nodes can run concurrently by analyzing dataset dependencies:

    param
    /    \
  [a]    [b]     ← a and b can run in parallel (both read param)
    \    /
     [c]         ← c waits for both a and b to complete

Only data dependencies matter — the tuple ordering in your pipeline function is irrelevant to the parallel runner.

Error handling

On the first node failure:

  1. on_node_error fires for the failed node
  2. on_pipeline_error fires for all ancestor pipelines
  3. No new nodes are scheduled
  4. Already-running nodes are allowed to complete (drain)
  5. The first error is returned

Pipeline hooks

Pipeline hooks behave differently than in the sequential runner:

  • before_pipeline_run fires when all of the pipeline’s declared inputs are available
  • after_pipeline_run fires when all of the pipeline’s declared outputs have been produced

This means pipeline hooks may fire at different points in wall-clock time compared to sequential execution.

Thread safety requirements

Because nodes run on different threads:

  • Hooks must be Sync (use Mutex or atomics for mutable state)
  • Datasets used concurrently must support concurrent access. MemoryDataset uses Arc<Mutex<_>> and is safe. CellDataset is not safe for parallel use.

When to use

Use the parallel runner when:

  • Your pipeline has independent branches that can run concurrently
  • Nodes involve I/O (file reads, network calls) where parallelism helps
  • You’re in a std environment with thread support

Datasets

This chapter covers all built-in dataset types and how to implement your own.

The Dataset trait is introduced in A minimal pipeline — Datasets. Here we go deeper into each concrete implementation.

Custom Datasets

You can implement the Dataset trait for any type to integrate custom data sources into your pipeline.

The Dataset trait

pub trait Dataset: serde::Serialize {
    type LoadItem;
    type SaveItem;
    type Error;

    fn load(&self) -> Result<Self::LoadItem, Self::Error>;
    fn save(&self, output: Self::SaveItem) -> Result<(), Self::Error>;
    fn is_param(&self) -> bool { false }

    #[cfg(feature = "std")]
    fn html(&self) -> Option<String> { None }
}

Example: a plain text dataset

#[derive(Serialize, Deserialize, Clone)]
pub struct TextDataset {
    path: String,
}

impl Dataset for TextDataset {
    type LoadItem = String;
    type SaveItem = String;
    type Error = PondError;

    fn load(&self) -> Result<String, PondError> {
        Ok(std::fs::read_to_string(&self.path)?)
    }

    fn save(&self, text: String) -> Result<(), PondError> {
        std::fs::write(&self.path, text)?;
        Ok(())
    }
}

Checklist

  1. Derive Serialize (required by the supertrait) — and usually Deserialize too, so the catalog can be loaded from YAML.

  2. Name the type with a Dataset suffix — the catalog indexer uses serde struct names ending in "Dataset" to identify leaf datasets. Without this suffix, nested struct fields may not be discovered correctly.

  3. Choose your error type — use PondError for simplicity, or a custom error type if you want to preserve error detail (see Dataset Errors).

  4. Implement html() (optional, std only) — return an HTML snippet for the viz dashboard. This is shown in the dataset detail panel.

The FileDataset trait

If your dataset is backed by a file, implement FileDataset to enable use with PartitionedDataset:

pub trait FileDataset: Dataset + Clone {
    fn path(&self) -> &str;
    fn set_path(&mut self, path: &str);
}

This lets PartitionedDataset clone your dataset template and point each partition at a different file.

Param

Param<T> is a read-only dataset wrapper for configuration values. It is the primary way to pass parameters into pipeline nodes.

Definition

#[derive(Debug, Serialize, Deserialize)]
pub struct Param<T: Clone>(pub T);

impl<T: Clone + Serialize> Dataset for Param<T> {
    type LoadItem = T;
    type SaveItem = ();
    type Error = Infallible;

    fn load(&self) -> Result<T, Infallible> { Ok(self.0.clone()) }
    fn save(&self, _: ()) -> Result<(), Infallible> { unreachable!() }
    fn is_param(&self) -> bool { true }
}

Key properties:

  • Loading always succeedsError = Infallible
  • Writing is forbiddensave() is unreachable; the pipeline validator (check()) rejects any node that writes to a Param
  • is_param() returns true — used by the validator and visualization to distinguish params from data

Usage

#[derive(Serialize, Deserialize)]
struct Params {
    threshold: Param<f64>,
    max_retries: Param<u32>,
}

Node {
    name: "filter",
    func: |value: f64, threshold: f64| {
        (value >= threshold,)
    },
    input: (&cat.value, &params.threshold),
    output: (&cat.passed,),
}

YAML

threshold: 0.5
max_retries: 3

Param<T> deserializes directly from the YAML value — no wrapping object needed.

Visualization

In the viz dashboard, parameters appear as distinct node shapes, separate from datasets. They are also shown in the left panel’s “Parameters” section.

no_std

Param is available in no_std — it requires no feature flags and uses no allocation.

Memory Dataset

MemoryDataset<T> is a thread-safe in-memory dataset for intermediate pipeline values.

Requires the std feature.

Definition

#[derive(Debug, Serialize, Deserialize)]
pub struct MemoryDataset<T: Clone> {
    #[serde(skip)]
    value: Arc<Mutex<Option<T>>>,
}
  • Starts empty (None)
  • Loading before any save returns PondError::DatasetNotLoaded
  • Thread-safe via Arc<Mutex<_>> — works with both SequentialRunner and ParallelRunner

Usage

#[derive(Serialize, Deserialize)]
struct Catalog {
    intermediate: MemoryDataset<f64>,
}
intermediate: {}

MemoryDataset has no persistent configuration — it always starts empty. In YAML, use an empty mapping {}.

When to use

Use MemoryDataset for intermediate values that are computed by one node and consumed by another, without needing to persist to disk:

(
    Node {
        name: "compute",
        func: |input: DataFrame| {
            let mean = input.column("value").unwrap().mean().unwrap();
            (mean,)
        },
        input: (&cat.raw_data,),
        output: (&cat.mean_value,),  // MemoryDataset<f64>
    },
    Node {
        name: "use_result",
        func: |mean: f64| (format!("Mean: {mean}"),),
        input: (&cat.mean_value,),
        output: (&cat.report,),
    },
)

Parallel safety

MemoryDataset is safe for use with the ParallelRunner. The Mutex ensures that concurrent reads and writes are properly synchronized. However, the parallel runner’s dependency analysis ensures that a node won’t try to read a MemoryDataset until the node that writes to it has completed.

no_std alternative

In no_std environments, use CellDataset instead. It uses Cell instead of Arc<Mutex<_>>, but is limited to Copy types and single-threaded use.

Cell Dataset

CellDataset<T> is a stack-friendly dataset using Cell for no_std / single-threaded pipelines.

Definition

pub struct CellDataset<T: Copy> {
    value: Cell<Option<T>>,
}
  • Works only with Copy types (e.g. i32, f64, bool, u8)
  • Uses Cell for interior mutability — no heap allocation, no locking
  • Starts empty; loading before any save returns PondError::DatasetNotLoaded
  • const fn new() — can be used in static or const contexts

Usage

let a = CellDataset::<i32>::new();
let b = CellDataset::<i32>::new();

let pipe = (
    Node { name: "n1", func: |v| (v * 2,), input: (&params.x,), output: (&a,) },
    Node { name: "n2", func: |v| (v + 1,), input: (&a,), output: (&b,) },
);

Thread safety

CellDataset implements Sync via an unsafe impl because the RunnableStep trait requires Send + Sync. This is safe only for single-threaded runners like SequentialRunner.

Do not use CellDataset with ParallelRunner. Use MemoryDataset instead for parallel pipelines.

no_std

CellDataset is the primary intermediate dataset for no_std environments. It requires no feature flags, no allocator, and no standard library. Combined with Param and the SequentialRunner, it forms the foundation of a no_std pipeline.

Limitations

  • Only works with Copy types — cannot hold String, Vec, DataFrame, etc.
  • Not safe for concurrent access — single-threaded use only
  • No serialization of stored values — Serialize impl serializes as unit ()

Partitioned Dataset

PartitionedDataset and LazyPartitionedDataset represent a directory of files where each file is treated as a separate partition.

Requires the polars feature.

PartitionedDataset

Eagerly loads all files in a directory into a HashMap<String, D::LoadItem>:

pub struct PartitionedDataset<D: FileDataset> {
    pub path: String,
    pub ext: String,
    pub dataset: D,
}
  • path — the directory to read from / write to
  • ext — file extension to filter by (e.g. "csv", "parquet")
  • dataset — a template dataset that is cloned and pointed at each file

Loading

Returns HashMap<String, D::LoadItem> where keys are filename stems:

data/partitions/
  january.csv
  february.csv
  march.csv
// loads as HashMap { "january" => DataFrame, "february" => DataFrame, "march" => DataFrame }

Saving

Accepts HashMap<String, D::SaveItem> and writes each entry as {name}.{ext}:

Node {
    name: "split_by_month",
    func: |df: DataFrame| -> (HashMap<String, DataFrame>,) {
        // split DataFrame into partitions...
    },
    input: (&cat.all_data,),
    output: (&cat.monthly,),  // PartitionedDataset<PolarsCsvDataset>
}

LazyPartitionedDataset

Same as PartitionedDataset but returns HashMap<String, Lazy<D::LoadItem>> — each partition is loaded on demand:

Node {
    name: "process",
    func: |partitions: HashMap<String, Lazy<DataFrame>>| {
        // only load the partitions you need
        let jan = partitions["january"].load().unwrap();
        // ...
    },
    input: (&cat.monthly,),
    output: (&cat.result,),
}

Lazy<T> wraps a closure that calls dataset.load() when .load() is called on it.

YAML configuration

monthly:
  path: data/partitions
  ext: csv
  dataset:
    separator: ","
    has_header: true

The dataset field configures the template dataset that is cloned for each partition file.

FileDataset requirement

The inner dataset type must implement FileDataset:

pub trait FileDataset: Dataset + Clone {
    fn path(&self) -> &str;
    fn set_path(&mut self, path: &str);
}

Built-in types that implement FileDataset: PolarsCsvDataset, PolarsParquetDataset, TextDataset, JsonDataset.

Cache Dataset

CacheDataset<D> wraps any dataset and caches the loaded/saved value in memory. Subsequent loads return the cached value without hitting the underlying dataset.

Requires the std feature.

Definition

pub struct CacheDataset<D: Dataset> {
    pub dataset: D,
    cache: Arc<Mutex<Option<D::LoadItem>>>,
}

Usage

Wrap any dataset to add caching:

#[derive(Serialize, Deserialize)]
struct Catalog {
    readings: CacheDataset<PolarsCsvDataset>,
}
readings:
  dataset:
    path: data/readings.csv
    separator: ","

Behavior

  • First load() — delegates to the inner dataset, caches the result, returns it
  • Subsequent load() calls — returns the cached value without re-reading the file
  • save() — writes to the inner dataset and updates the cache
  • html() — delegates to the inner dataset

When to use

Use CacheDataset when a dataset is read by multiple nodes and the underlying I/O is expensive:

(
    Node { name: "analyze", input: (&cat.readings,), .. },
    Node { name: "validate", input: (&cat.readings,), .. },
    Node { name: "summarize", input: (&cat.readings,), .. },
)

Without caching, readings would be loaded from disk three times. With CacheDataset<PolarsCsvDataset>, it’s loaded once and served from memory for the remaining reads.

Constraints

The inner dataset must satisfy:

  • D::LoadItem: Clone — so the cached value can be cloned on each load
  • D::SaveItem: Clone + Into<D::LoadItem> — so saves can update the cache
  • PondError: From<D::Error> — for error conversion

List of Datasets

All built-in dataset types, their feature flags, and typical use cases.

Core datasets (no feature flags)

TypeLoad/Save typesDescription
Param<T>T / ()Read-only parameter. Error: Infallible.
CellDataset<T>T / Tno_std intermediate storage. T: Copy only.

std datasets

TypeFeatureLoad/Save typesDescription
MemoryDataset<T>stdT / TThread-safe in-memory storage via Arc<Mutex<_>>.
TextDatasetstdString / StringReads/writes plain text files.
CacheDataset<D>stdD::LoadItem / D::SaveItemCaching wrapper for any dataset.

File format datasets

TypeFeatureLoad/Save typesDescription
PolarsCsvDatasetpolarsDataFrame / DataFrameCSV files via Polars. Configurable separator, header, etc.
PolarsParquetDatasetpolarsDataFrame / DataFrameParquet files via Polars.
PolarsExcelDatasetpolarsDataFrame / —Excel files via fastexcel. Read-only.
JsonDatasetjsonserde_json::Value / serde_json::ValueJSON files.
YamlDatasetyamlVec<Yaml> / Vec<Yaml>YAML files using yaml_rust2.
PlotlyDatasetplotlyserde_json::Value / serde_json::ValuePlotly charts. Saves .json + .html. Custom html() for viz.
ImageDatasetimageDynamicImage / DynamicImageImage files via the image crate.

Partitioned datasets

TypeFeatureLoad/Save typesDescription
PartitionedDataset<D>polarsHashMap<String, D::LoadItem> / HashMap<String, D::SaveItem>Directory of files, eagerly loaded.
LazyPartitionedDataset<D>polarsHashMap<String, Lazy<D::LoadItem>> / HashMap<String, D::SaveItem>Directory of files, lazily loaded on demand.

Hardware datasets (no_std)

TypeFeatureLoad/Save typesDescription
RegisterDataset<T>T / TVolatile memory-mapped register. T: Copy.
GpioDatasetbool / boolSingle GPIO pin within a memory-mapped register.

Common traits

All file-backed datasets that support PartitionedDataset implement FileDataset:

pub trait FileDataset: Dataset + Clone {
    fn path(&self) -> &str;
    fn set_path(&mut self, path: &str);
}

Implementors: PolarsCsvDataset, PolarsParquetDataset, PolarsExcelDataset, TextDataset, JsonDataset.

App

The App struct bundles catalog, params, hooks, and runners together and provides methods for pipeline execution and CLI dispatch.

pub struct App<C, P, H = (), R = DefaultRunners> {
    catalog: C,
    params: P,
    hooks: H,
    runners: R,
    command: Command,
    // ...
}

App is generic over catalog, params, hooks, and runners. Builder methods return a new App with different type parameters, so the type system tracks what has been configured.

Subcommands

App::dispatch() selects behavior based on the stored Command:

CommandCLIDescription
Command::Runmy_app runExecute the pipeline
Command::Checkmy_app checkValidate pipeline structure
Command::Vizmy_app vizBuild graph and serve visualization

Chapter overview

  • Initialization — constructing an App (from_yaml, from_args, new)
  • YAML Configuration — catalog and params files, dot-notation overrides
  • Run — executing the pipeline
  • Check — validating pipeline structure via CLI
  • Viz — interactive pipeline visualization

Initialization

There are several ways to construct an App, depending on whether your catalog comes from YAML files or is built programmatically.

App::from_yaml + with_args

The most common pattern for file-based pipelines. Loads catalog and params from YAML, then applies CLI arguments:

App::from_yaml("conf/catalog.yml", "conf/params.yml")?
    .with_args(std::env::args_os())?
    .dispatch(pipeline)

from_yaml loads and deserializes both files. with_args parses CLI arguments for subcommand selection and parameter overrides.

Requires std. Catalog and params types must implement DeserializeOwned.

App::from_args

Parses CLI arguments including --catalog-path and --params-path flags:

App::from_args(std::env::args_os())?
    .dispatch(pipeline)

Default paths if not specified: conf/base/catalog.yml and conf/base/parameters.yml.

Requires std.

App::from_cli

Same as from_args but takes a pre-parsed CliArgs struct:

use pondrs::app::cli::CliArgs;
use clap::Parser;

let cli = CliArgs::parse();
App::from_cli(cli)?
    .dispatch(pipeline)

App::new

Construct with catalog and params directly — no YAML, no CLI:

let catalog = Catalog { /* ... */ };
let params = Params { /* ... */ };

App::new(catalog, params)
    .execute(pipeline)?;

This is the only constructor available in no_std. It defaults to Command::Run with no hooks and the default runners.

Use App::new when:

  • The catalog is built programmatically (e.g. RegisterDataset, GpioDataset)
  • You’re writing tests and want to inspect datasets after execution
  • You’re in a no_std environment

You can still add CLI support to a programmatic catalog via with_args:

App::new(catalog, params)
    .with_args(std::env::args_os())?  // adds subcommand + param overrides
    .dispatch(pipeline)?;

Builder methods

After construction, configure the app with builder methods:

App::from_yaml("catalog.yml", "params.yml")?
    .with_args(std::env::args_os())?
    .with_hooks((LoggingHook::new(),))
    .with_runners((SequentialRunner, ParallelRunner))
    .dispatch(pipeline)?;
MethodDescription
with_hooks(h)Set the hooks tuple
with_runners(r)Set the runners tuple
with_command(cmd)Set the command directly
with_args(iter)Parse CLI args, apply command + param overrides
with_cli(cli)Apply pre-parsed CliArgs

Execution methods

MethodDescription
execute(f)Run the pipeline directly (always Command::Run behavior)
dispatch(f)Choose behavior based on stored Command (run/check/viz)
catalog()Borrow the catalog (useful in tests)
params()Borrow the params

YAML Configuration

pondrs uses YAML files for catalog and parameter configuration. The App framework loads, patches, and deserializes these files automatically.

Catalog YAML

The catalog file maps dataset field names to their configuration:

# catalog.yml
readings:
  path: data/readings.csv
  separator: ","
summary: {}
report:
  path: data/report.json

Each top-level key corresponds to a field in your catalog struct. The configuration under each key is deserialized into the dataset type for that field.

  • File-backed datasets need at least a path
  • In-memory datasets use {}
  • Nested catalog structs create nested YAML sections

Nested catalogs

#[derive(Serialize, Deserialize)]
struct InputData {
    raw: PolarsCsvDataset,
    reference: YamlDataset,
}

#[derive(Serialize, Deserialize)]
struct Catalog {
    input: InputData,
    output: JsonDataset,
}
input:
  raw:
    path: data/raw.csv
    separator: ","
  reference:
    path: data/ref.yml
output:
  path: data/output.json

Parameters YAML

# params.yml
threshold: 0.5
model:
  learning_rate: 0.01
  epochs: 100

Each key maps to a Param<T> field. Nested structs create nested YAML sections.

CLI overrides

Both catalog and parameter values can be overridden from the command line using dot notation:

# Override parameters
$ my_app run --params threshold=0.8
$ my_app run --params model.learning_rate=0.001

# Override catalog configuration
$ my_app run --catalog output.path=/tmp/result.json
$ my_app run --catalog readings.separator=;

# Multiple overrides
$ my_app run --params threshold=0.8 --params model.epochs=200

How overrides work

  1. The YAML file is loaded into a serde_yaml::Value tree
  2. Each KEY=VALUE override is parsed and applied to the tree using dot notation
  3. Values are parsed as YAML scalars (auto-detecting numbers, bools, strings, null)
  4. The patched tree is deserialized into the target struct

Overrides create missing intermediate keys if needed — you can override deeply nested values even if the parent keys don’t exist in the file.

File paths

Default paths (when using App::from_args or App::from_cli):

  • Catalog: conf/base/catalog.yml
  • Params: conf/base/parameters.yml

Override with CLI flags:

$ my_app --catalog-path my/catalog.yml --params-path my/params.yml run

Or specify paths directly with App::from_yaml:

App::from_yaml("conf/catalog.yml", "conf/params.yml")?

Run

The run subcommand executes the pipeline.

CLI usage

$ my_app run                              # default runner (sequential)
$ my_app run --runner parallel            # use parallel runner
$ my_app run --params threshold=0.8       # override parameters
$ my_app run --catalog output.path=/tmp/out.json  # override catalog

How it works

  1. The pipeline function is called with (&catalog, &params) to construct the steps
  2. The selected runner (by --runner flag or default) is looked up by name
  3. The runner executes the steps, firing hooks at each lifecycle point
  4. If any node fails, execution stops and the error is returned

execute vs dispatch

  • app.execute(pipeline_fn) — always runs the pipeline, regardless of the stored command
  • app.dispatch(pipeline_fn) — checks the stored command and dispatches to run, check, or viz

Use execute in tests or when you want to run the pipeline directly. Use dispatch in CLI apps to support all subcommands.

Runner selection

The --runner flag selects a runner by name. Available runners depend on what’s configured:

// Default: sequential + parallel (std), sequential only (no_std)
App::from_yaml(..)?
    .dispatch(pipeline)?;

// Custom runners
App::from_yaml(..)?
    .with_runners((SequentialRunner, ParallelRunner, MyRunner))
    .dispatch(pipeline)?;

If the named runner isn’t found, PondError::RunnerNotFound is returned.

Post-run inspection

When using App::new + execute, you can inspect datasets after the pipeline runs:

let app = App::new(catalog, params);
app.execute(pipeline)?;

// Access results
let summary: f64 = app.catalog().summary.load()?;
assert!(summary > 0.0);

This pattern is useful for integration tests.

Check

The check subcommand validates the pipeline structure without executing it.

CLI usage

$ my_app check
Pipeline is valid.

If validation fails:

$ my_app check
Error: Pipeline validation failed:
  - Node 'process' requires dataset 0x7f8a..., which is produced by a later node

What it validates

check calls StepInfo::check() on the pipeline, which verifies:

  • Sequential ordering — no node reads a dataset produced by a later node
  • No duplicate outputs — each dataset is produced by at most one node
  • Params are read-only — no node writes to a Param
  • Pipeline contractsPipeline declared inputs/outputs match children

See Check for the full list of CheckError variants.

Programmatic use

You can call check() directly on any StepInfo:

let steps = pipeline(&catalog, &params);
match steps.check() {
    Ok(()) => println!("Valid!"),
    Err(e) => eprintln!("Error: {e}"),
}

Or via App::dispatch, which calls check() when the command is Command::Check:

App::new(catalog, params)
    .with_command(Command::Check)
    .dispatch(pipeline)?;

Viz

The viz subcommand builds the pipeline graph and serves an interactive visualization.

Requires the viz feature.

CLI usage

# Start the interactive web server (default port 8080)
$ my_app viz

# Custom port
$ my_app viz --port 3000

# Export graph as JSON
$ my_app viz --output pipeline.json

# Export self-contained HTML file
$ my_app viz --export pipeline.html

Interactive server

my_app viz starts an axum web server with:

EndpointDescription
GET /api/graphFull pipeline graph as JSON
GET /api/dataset/{id}/htmlHTML snapshot for a dataset
GET /api/statusCurrent node/dataset execution status
POST /api/statusReceives live events from VizHook
GET /wsWebSocket broadcast of live events
GET /Embedded React frontend

The frontend uses React + ReactFlow + dagre for a left-to-right DAG layout with:

  • Node, dataset, and pipeline visual types
  • Click-to-inspect dataset details (HTML preview, YAML config)
  • Left sidebar listing all nodes, datasets, and parameters
  • Dark/light theme support

Live execution status

To see real-time node status during pipeline execution:

  1. Start the viz server: my_app viz --port 8080
  2. In another terminal, run the pipeline with VizHook:
use pondrs::viz::VizHook;

App::from_yaml(..)?
    .with_args(std::env::args_os())?
    .with_hooks((LoggingHook::new(), VizHook::new("http://localhost:8080".into())))
    .dispatch(pipeline)?;

The frontend shows live node status (idle/running/completed/error) and updates automatically via WebSocket.

Static HTML export

--export produces a self-contained HTML file with the full graph and dataset snapshots embedded. No server needed — just open the file in a browser:

$ my_app viz --export pipeline.html
$ open pipeline.html

The export uses vite-plugin-singlefile to inline all JS/CSS into a single HTML file, and injects graph data via window.__STATIC_DATA__.

JSON export

--output writes the VizGraph as JSON, useful for custom tooling:

$ my_app viz --output graph.json

Graph name

The graph name shown in the frontend header is derived from the program name (first CLI argument, file stem only). For example, running cargo run --example weather_app sets the name to weather_app.

no_std Pipelines

pondrs is a #![no_std] crate at its core. The std feature adds filesystem I/O, threading, logging, CLI parsing, and additional dataset types — but the fundamental pipeline model works without any of it.

What’s available without std

Componentno_stdstd
Node, Pipeline, Steps, StepInfoyesyes
PipelineInfo, RunnableStepyesyes
check() validationyesyes
Param<T>yesyes
CellDataset<T>yesyes
RegisterDataset<T>yesyes
GpioDatasetyesyes
SequentialRunneryesyes
Hook / Hooks traitsyesyes
App::new()yesyes
PondError (limited variants)yesyes
MemoryDataset<T>yes
ParallelRunneryes
LoggingHookyes
CLI parsing / YAML loadingyes
Catalog indexer (dataset names)yes
File-backed datasetsyes

Building for no_std

cargo build --no-default-features --lib

No allocator is required. All dataset tracking in check() uses fixed-size stack arrays.

Typical embedded pattern

use pondrs::datasets::{CellDataset, Param, RegisterDataset, GpioDataset};
use pondrs::error::PondError;
use pondrs::{App, Node, Steps};

static SENSOR: RegisterDataset<u16> = unsafe { RegisterDataset::new(0x4000_0000) };
static LED: GpioDataset = unsafe { GpioDataset::new(0x4002_0000, 5, "LED") };

fn pipeline<'a>(
    cat: &'a Catalog,
    params: &'a Params,
) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "read_sensor",
            func: |raw: u16| (raw,),
            input: (&cat.sensor,),
            output: (&cat.reading,),
        },
        Node {
            name: "check_threshold",
            func: |value: u16, threshold: u16| (value > threshold,),
            input: (&cat.reading, &params.threshold),
            output: (&cat.led,),
        },
    )
}

See Datasets for details on the no_std dataset types and App & Debugging for the no_std App pattern.

no_std Datasets

Four dataset types are available without the standard library.

Param<T>

Read-only parameter values. Works identically in no_std and std. Requires T: Clone + Serialize.

let threshold = Param(500u16);

See Param.

CellDataset<T>

Stack-friendly intermediate storage using Cell. Requires T: Copy.

let reading = CellDataset::<u16>::new();
let result = CellDataset::<bool>::new();
  • const fn new() — usable in static contexts
  • No heap allocation, no locking
  • Single-threaded only (SequentialRunner)

See Cell Dataset.

RegisterDataset<T>

Volatile memory-mapped register access. Reads and writes at a raw memory address using read_volatile / write_volatile.

// SAFETY: address must point to a valid, aligned register
static SENSOR: RegisterDataset<u16> = unsafe { RegisterDataset::new(0x4000_0000) };
  • T is typically u8, u16, or u32
  • const unsafe fn new(address: usize) — suitable for static declarations
  • load() reads the register, save() writes it

GpioDataset

A single GPIO pin within a memory-mapped register. Reads and writes a single bit at a given position.

// SAFETY: address must point to a valid GPIO port register
static LED: GpioDataset = unsafe { GpioDataset::new(0x4002_0000, 5, "LED1") };
  • load() returns bool — whether the bit is set
  • save(true) sets the bit, save(false) clears it
  • Preserves other bits in the register (read-modify-write)
  • The label field is used in visualization

Hardware dataset safety

Both RegisterDataset and GpioDataset use unsafe constructors because they access raw memory addresses. They implement Send + Sync via unsafe impls — this is safe in single-threaded embedded contexts but the caller is responsible for preventing data races in multi-threaded environments.

Example: embedded pipeline

struct Catalog {
    sensor: RegisterDataset<u16>,
    reading: CellDataset<u16>,
    alert: GpioDataset,
}

struct Params {
    threshold: Param<u16>,
}

fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "read",
            func: |raw: u16| (raw,),
            input: (&cat.sensor,),
            output: (&cat.reading,),
        },
        Node {
            name: "check",
            func: |value: u16, thresh: u16| (value > thresh,),
            input: (&cat.reading, &params.threshold),
            output: (&cat.alert,),
        },
    )
}

App & Debugging in no_std

Compiling a no_std pipeline with std

One of the strengths of pondrs is that a pipeline written for no_std can also be compiled with std enabled. This lets you use the full App interface — including YAML configuration, CLI argument parsing, parameter overrides, check, and viz — for development and debugging, while deploying the same pipeline code to a no_std target.

Simply add pondrs as a dev-dependency with the all feature, and use App::from_args or App::from_yaml in a host-side binary or test:

// tests/run_on_host.rs — compiles with std
fn main() -> Result<(), PondError> {
    App::from_args(std::env::args_os())?
        .dispatch(my_no_std_pipeline)
}

This gives you visualization, validation, logging hooks, and parallel execution for free during development, without changing the pipeline itself.

App::new

In no_std environments, App::new is the only available constructor:

let app = App::new(catalog, params);
app.execute(pipeline)?;

There is no YAML loading, no CLI parsing, and no dispatch — you construct the catalog and params directly and call execute.

Default runner

The default runner tuple in no_std is (SequentialRunner,). The ParallelRunner requires std (threads).

Hooks in no_std

The Hook and Hooks traits work in no_std, but there are no built-in hook implementations (LoggingHook requires std). You can implement your own:

struct UartLogger;

impl Hook for UartLogger {
    fn before_node_run(&self, n: &dyn PipelineInfo) {
        // write to UART, toggle debug pin, etc.
        uart_print(n.name());
    }

    fn on_node_error(&self, n: &dyn PipelineInfo, _error: &str) {
        uart_print("ERROR: ");
        uart_print(n.name());
    }
}

App::new(catalog, params)
    .with_hooks((UartLogger,))
    .execute(pipeline)?;

Note that in no_std, the error string in on_node_error is always "node error" (not the full error message), since Display formatting requires allocation.

Dataset names

In no_std, the catalog indexer is not available, so DatasetRef::name in hook callbacks is always None. Hooks can still use ds.id (pointer-based) or ds.meta.type_string() for identification.

PondError in no_std

Only three PondError variants are available without std:

  • DatasetNotLoaded — a dataset was read before being written
  • RunnerNotFound — the specified runner name doesn’t match any runner
  • CheckFailed — pipeline validation failed (used by dispatch with Command::Check)

All other variants (Io, Polars, SerdeYaml, etc.) are feature-gated.

Validation

check() works fully in no_std. It uses fixed-size stack arrays instead of HashMap:

let steps = pipeline(&catalog, &params);
steps.check()?;

// For pipelines with more than 20 datasets:
steps.check_with_capacity::<64>()?;

Examples

Each example includes the full source code and an interactive pipeline visualization.

Weather Pipeline

Demonstrates subpipelines, struct params, nested catalog/params, PartitionedDataset, MemoryDataset, YamlDataset, PlotlyDataset, parallel execution, and an intentional error node.

Usage

cargo run --example weather_app -- run --runner parallel
cargo run --example weather_app -- check
cargo run --example weather_app -- viz

Types

// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------

#[derive(Debug)]
pub enum WeatherError {
    Pond(PondError),
    Polars(PolarsError),
    Validation(String),
}

impl From<PondError> for WeatherError {
    fn from(e: PondError) -> Self {
        WeatherError::Pond(e)
    }
}

impl From<PolarsError> for WeatherError {
    fn from(e: PolarsError) -> Self {
        WeatherError::Polars(e)
    }
}

impl std::fmt::Display for WeatherError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            WeatherError::Pond(e) => write!(f, "{e}"),
            WeatherError::Polars(e) => write!(f, "{e}"),
            WeatherError::Validation(msg) => write!(f, "Validation failed: {msg}"),
        }
    }
}

impl std::error::Error for WeatherError {}

// ---------------------------------------------------------------------------
// Domain types
// ---------------------------------------------------------------------------

/// Baseline period for anomaly detection (struct param).
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BaselinePeriod {
    pub start_month: u32,
    pub end_month: u32,
}

/// Aggregated weather statistics (must be Copy for MemoryDataset).
#[derive(Clone, Copy, Debug, Default)]
pub struct WeatherSummary {
    pub avg_temp: f64,
    pub max_temp: f64,
    pub min_temp: f64,
    pub total_rainfall: f64,
    pub station_count: u32,
    pub reading_count: u32,
}

// ---------------------------------------------------------------------------
// Params (nested containers with struct param)
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct WeatherParams {
    pub analysis: AnalysisConfig,
    pub display: DisplayConfig,
}

#[derive(Serialize, Deserialize)]
pub struct AnalysisConfig {
    pub anomaly_threshold: Param<f64>,
    pub baseline: Param<BaselinePeriod>,
}

#[derive(Serialize, Deserialize)]
pub struct DisplayConfig {
    pub chart_title: Param<String>,
    pub top_n: Param<usize>,
}

// ---------------------------------------------------------------------------
// Catalog (nested containers)
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct WeatherCatalog {
    pub sources: SourcesCatalog,
    pub analysis: AnalysisCatalog,
    pub reports: ReportsCatalog,
}

#[derive(Serialize, Deserialize)]
pub struct SourcesCatalog {
    pub station_readings: PartitionedDataset<PolarsCsvDataset>,
    pub station_metadata: YamlDataset,
}

#[derive(Serialize, Deserialize)]
pub struct AnalysisCatalog {
    pub combined_readings: PolarsCsvDataset,
    pub weather_summary: MemoryDataset<WeatherSummary>,
    pub anomalies: PolarsCsvDataset,
}

#[derive(Serialize, Deserialize)]
pub struct ReportsCatalog {
    pub temperature_chart: PlotlyDataset,
    pub rainfall_chart: PlotlyDataset,
    pub validation_passed: MemoryDataset<bool>,
}

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

fn merge_stations(
    partitions: HashMap<String, DataFrame>,
) -> Result<(DataFrame,), PolarsError> {
    let mut combined: Option<DataFrame> = None;
    for (station_name, mut df) in partitions {
        let n = df.height();
        let station_col = Column::new("station".into(), vec![station_name.as_str(); n]);
        df.with_column(station_col)?;
        match &mut combined {
            None => combined = Some(df),
            Some(c) => { c.vstack_mut(&df)?; }
        }
    }
    Ok((combined.unwrap_or_default(),))
}

fn load_metadata(meta: Yaml) {
    if let Yaml::Hash(ref map) = meta {
        for (key, _) in map {
            if let Yaml::String(name) = key {
                log::info!("Loaded metadata for station: {name}");
            }
        }
    }
}

fn compute_summary(
    df: DataFrame,
    baseline: BaselinePeriod,
) -> (WeatherSummary,) {
    let _ = baseline;

    let temp = df.column("temperature").unwrap().f64().unwrap();
    let rain = df.column("rainfall").unwrap().f64().unwrap();

    let stations: Vec<String> = df
        .column("station")
        .unwrap()
        .str()
        .unwrap()
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect();
    let unique_stations: std::collections::HashSet<&str> =
        stations.iter().map(|s| s.as_str()).collect();

    let summary = WeatherSummary {
        avg_temp: temp.mean().unwrap_or(0.0),
        max_temp: temp.max().unwrap_or(0.0),
        min_temp: temp.min().unwrap_or(0.0),
        total_rainfall: rain.sum().unwrap_or(0.0),
        station_count: unique_stations.len() as u32,
        reading_count: df.height() as u32,
    };
    (summary,)
}

fn detect_anomalies(
    df: DataFrame,
    threshold: f64,
) -> Result<(DataFrame,), PolarsError> {
    let temp = df.column("temperature")?.f64()?;
    let mean = temp.mean().unwrap_or(0.0);
    let std_dev = temp.std(1).unwrap_or(1.0);

    let lower = mean - threshold * std_dev;
    let upper = mean + threshold * std_dev;

    let mask = temp.lt(lower) | temp.gt(upper);
    Ok((df.filter(&mask)?,))
}

fn plot_temperatures(summary: WeatherSummary, title: String) -> (Plot,) {
    let labels = vec!["Min", "Avg", "Max"];
    let values = vec![summary.min_temp, summary.avg_temp, summary.max_temp];

    let mut plot = Plot::new();
    plot.add_trace(
        Bar::new(labels, values).name("Temperature"),
    );
    plot.set_layout(
        Layout::new()
            .title(format!("{title} - Temperature Summary"))
            .y_axis(plotly::layout::Axis::new().title("Temperature (C)")),
    );
    (plot,)
}

fn plot_rainfall(summary: WeatherSummary, title: String) -> (Plot,) {
    let labels = vec![
        format!("{} stations", summary.station_count),
        format!("{} readings", summary.reading_count),
    ];
    let values = vec![
        summary.total_rainfall / summary.station_count as f64,
        summary.total_rainfall,
    ];

    let mut plot = Plot::new();
    plot.add_trace(
        Bar::new(labels, values).name("Rainfall"),
    );
    plot.set_layout(
        Layout::new()
            .title(format!("{title} - Rainfall Summary"))
            .y_axis(plotly::layout::Axis::new().title("Rainfall (mm)")),
    );
    (plot,)
}

fn validate_reports(
    _temp_chart: serde_json::Value,
    _rain_chart: serde_json::Value,
) -> Result<(bool,), WeatherError> {
    Err(WeatherError::Validation(
        "Data quality check failed: east station has a suspicious 50C reading in July".into(),
    ))
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn weather_pipeline<'a>(
    cat: &'a WeatherCatalog,
    params: &'a WeatherParams,
) -> impl Steps<WeatherError> + 'a {
    (
        // Subpipeline: data preparation
        Pipeline {
            name: "data_prep",
            steps: (
                Node {
                    name: "merge_stations",
                    func: merge_stations,
                    input: (&cat.sources.station_readings,),
                    output: (&cat.analysis.combined_readings,),
                },
                Node {
                    name: "load_metadata",
                    func: load_metadata,
                    input: (&cat.sources.station_metadata,),
                    output: (),
                },
            ),
            input: (
                &cat.sources.station_readings,
                &cat.sources.station_metadata,
            ),
            output: (&cat.analysis.combined_readings,),
        },
        // These two nodes can run in parallel (both read combined_readings)
        Node {
            name: "compute_summary",
            func: compute_summary,
            input: (
                &cat.analysis.combined_readings,
                &params.analysis.baseline,
            ),
            output: (&cat.analysis.weather_summary,),
        },
        Node {
            name: "detect_anomalies",
            func: detect_anomalies,
            input: (
                &cat.analysis.combined_readings,
                &params.analysis.anomaly_threshold,
            ),
            output: (&cat.analysis.anomalies,),
        },
        // Subpipeline: reporting (inner nodes can run in parallel)
        Pipeline {
            name: "reporting",
            steps: (
                Node {
                    name: "plot_temperatures",
                    func: plot_temperatures,
                    input: (
                        &cat.analysis.weather_summary,
                        &params.display.chart_title,
                    ),
                    output: (&cat.reports.temperature_chart,),
                },
                Node {
                    name: "plot_rainfall",
                    func: plot_rainfall,
                    input: (
                        &cat.analysis.weather_summary,
                        &params.display.chart_title,
                    ),
                    output: (&cat.reports.rainfall_chart,),
                },
            ),
            input: (
                &cat.analysis.weather_summary,
                &params.display.chart_title,
            ),
            output: (
                &cat.reports.temperature_chart,
                &cat.reports.rainfall_chart,
            ),
        },
        // Validation node: intentionally fails
        Node {
            name: "validate_reports",
            func: validate_reports,
            input: (
                &cat.reports.temperature_chart,
                &cat.reports.rainfall_chart,
            ),
            output: (&cat.reports.validation_passed,),
        },
    )
}

App entry point

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_hooks((
        LoggingHook::new(),
        VizHook::new("http://localhost:8080".to_string()),
    ))
    .with_args(std::env::args_os())?
    .dispatch(weather_pipeline)

Pipeline visualization

Open fullscreen

Sales Pipeline

Monthly sales CSV processing: filter by minimum sales, compute totals, and produce a Plotly bar chart.

Usage

cargo run --example sales_app -- run
cargo run --example sales_app -- check
cargo run --example sales_app -- viz

Types

// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct SalesCatalog {
    pub raw_sales: PolarsCsvDataset,
    pub filtered_sales: PolarsCsvDataset,
    pub total_sales: MemoryDataset<i64>,
    pub chart: PlotlyDataset,
}

#[derive(Serialize, Deserialize)]
pub struct SalesParams {
    /// Minimum sales to include a month in the chart.
    pub min_sales: Param<i64>,
}

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

fn build_chart(df: DataFrame, total: i64) -> (Plot,) {
    let months: Vec<String> = df
        .column("month").unwrap()
        .str().unwrap()
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect();
    let sales: Vec<i64> = df
        .column("sales").unwrap()
        .i64().unwrap()
        .into_no_null_iter()
        .collect();

    let mut plot = Plot::new();
    plot.add_trace(Bar::new(months, sales).name("Monthly Sales"));
    plot.set_layout(
        Layout::new()
            .title(format!("Months with sales ≥ 1000  (total: {total})"))
            .y_axis(plotly::layout::Axis::new().title("Sales")),
    );
    (plot,)
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn sales_pipeline<'a>(
    cat: &'a SalesCatalog,
    params: &'a SalesParams,
) -> impl Steps<PondError> + 'a {
    (
        // Node 1: filter out months below the threshold
        Node {
            name: "filter_months",
            func: |df: DataFrame, min_sales: i64| -> Result<(DataFrame,), PolarsError> {
                let mask = df.column("sales")?.i64()?.gt_eq(min_sales);
                Ok((df.filter(&mask)?,))
            },
            input: (&cat.raw_sales, &params.min_sales),
            output: (&cat.filtered_sales,),
        },
        // Node 2: sum the filtered sales
        Node {
            name: "compute_total",
            func: |df: DataFrame| {
                let total =
                    df.column("sales").unwrap().i64().unwrap().sum().unwrap_or(0);
                (total,)
            },
            input: (&cat.filtered_sales,),
            output: (&cat.total_sales,),
        },
        // Node 3: build a bar chart of the filtered monthly sales
        Node {
            name: "build_chart",
            func: build_chart,
            input: (&cat.filtered_sales, &cat.total_sales),
            output: (&cat.chart,),
        },
    )
}

App entry point

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_hooks((
        LoggingHook::new(),
        VizHook::new("http://localhost:8080".to_string()),
    ))
    .with_args(std::env::args_os())?
    .dispatch(sales_pipeline)

Pipeline visualization

Open fullscreen

Identity Pipeline

Demonstrates the Ident node: write CSV as plain text, read it back as a Polars DataFrame via Ident, and produce a Plotly bar chart.

Usage

cargo run --example ident_app -- \
    --catalog-path examples/ident_data/catalog.yml \
    --params-path examples/ident_data/params.yml run

Types

// ---------------------------------------------------------------------------
// Catalog
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
struct IdentCatalog {
    csv_text: TextDataset,
    csv_data: PolarsCsvDataset,
    chart: PlotlyDataset,
}

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

fn generate_csv() -> (String,) {
    let csv = "\
fruit,count
Apples,35
Bananas,22
Cherries,48
Dates,15
Elderberries,31
Figs,9
Grapes,42";
    (csv.to_string(),)
}

fn build_chart(df: DataFrame) -> (Plot,) {
    let fruits: Vec<String> = df
        .column("fruit")
        .unwrap()
        .str()
        .unwrap()
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect();
    let counts: Vec<i64> = df
        .column("count")
        .unwrap()
        .i64()
        .unwrap()
        .into_no_null_iter()
        .collect();

    let mut plot = Plot::new();
    plot.add_trace(Bar::new(fruits, counts).name("Fruit Count"));
    plot.set_layout(
        Layout::new()
            .title("Fruit Inventory")
            .y_axis(plotly::layout::Axis::new().title("Count")),
    );
    (plot,)
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

fn ident_pipeline<'a>(
    cat: &'a IdentCatalog,
    _params: &'a (),
) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "generate_csv",
            func: generate_csv,
            input: (),
            output: (&cat.csv_text,),
        },
        Ident {
            name: "text_to_csv",
            input: &cat.csv_text,
            output: &cat.csv_data,
        },
        Node {
            name: "build_chart",
            func: build_chart,
            input: (&cat.csv_data,),
            output: (&cat.chart,),
        },
    )
}

Pipeline visualization

Open fullscreen

Register Pipeline

Simulates hardware register access: reads a sensor register, processes the value against thresholds, and sets GPIO pins accordingly. Demonstrates RegisterDataset, GpioDataset, and Param with a programmatically constructed catalog.

Usage

cargo run --example register_example -- run
cargo run --example register_example -- viz

Types

// ---------------------------------------------------------------------------
// Simulated hardware: heap-allocated "registers"
// ---------------------------------------------------------------------------

struct SimulatedHardware {
    sensor_reg: Box<u16>,
    status_reg: Box<u32>,
}

impl SimulatedHardware {
    fn new() -> Self {
        Self {
            sensor_reg: Box::new(0),
            status_reg: Box::new(0),
        }
    }

    fn sensor_address(&self) -> usize {
        &*self.sensor_reg as *const u16 as usize
    }

    fn status_address(&self) -> usize {
        &*self.status_reg as *const u32 as usize
    }
}

// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------

#[derive(Serialize)]
struct Catalog {
    sensor: RegisterDataset<u16>,
    status: RegisterDataset<u32>,
    led_ok: GpioDataset,
    led_warn: GpioDataset,
    led_crit: GpioDataset,
}

#[derive(Serialize, Deserialize)]
struct Params {
    warn_threshold: Param<u16>,
    crit_threshold: Param<u16>,
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline
// ---------------------------------------------------------------------------

fn register_pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> impl Steps<PondError> + 'a {
    (
        Node {
            name: "read_sensor",
            func: |raw: u16| -> (u32,) {
                println!("  Sensor reading: 0x{raw:04x} ({raw})");
                (raw as u32,)
            },
            input: (&cat.sensor,),
            output: (&cat.status,),
        },
        Node {
            name: "set_ok_led",
            func: |reading: u16, warn: u16| {
                let ok = reading < warn;
                println!("  OK LED: {ok} (reading {reading} < warn {warn})");
                (ok,)
            },
            input: (&cat.sensor, &params.warn_threshold),
            output: (&cat.led_ok,),
        },
        Node {
            name: "set_warn_led",
            func: |reading: u16, warn: u16, crit: u16| {
                let warning = reading >= warn && reading < crit;
                println!("  WARN LED: {warning}");
                (warning,)
            },
            input: (&cat.sensor, &params.warn_threshold, &params.crit_threshold),
            output: (&cat.led_warn,),
        },
        Node {
            name: "set_crit_led",
            func: |reading: u16, crit: u16| {
                let critical = reading >= crit;
                println!("  CRIT LED: {critical}");
                (critical,)
            },
            input: (&cat.sensor, &params.crit_threshold),
            output: (&cat.led_crit,),
        },
    )
}

Pipeline visualization

Open fullscreen

Dynamic Pipeline

Demonstrates runtime pipeline composition using StepVec: the report node is only included when the include_report param is true.

Usage

cargo run --example dyn_steps_app -- run
cargo run --example dyn_steps_app -- check
cargo run --example dyn_steps_app -- viz

Types

#[derive(Serialize, Deserialize)]
pub struct Catalog {
    pub readings: PolarsCsvDataset,
    pub summary: MemoryDataset<f64>,
    pub report: JsonDataset,
}

#[derive(Serialize, Deserialize)]
pub struct Params {
    pub threshold: Param<f64>,
    pub include_report: Param<bool>,
}

Pipeline definition

pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
    let mut steps = vec![
        Node {
            name: "summarize",
            func: |df: DataFrame| {
                let mean = df.column("value").unwrap().f64().unwrap().mean().unwrap();
                (mean,)
            },
            input: (&cat.readings,),
            output: (&cat.summary,),
        }
        .boxed(),
    ];

    if params.include_report.0 {
        steps.push(
            Node {
                name: "report",
                func: |mean: f64, threshold: f64| {
                    (json!({ "mean": mean, "passed": mean >= threshold }),)
                },
                input: (&cat.summary, &params.threshold),
                output: (&cat.report,),
            }
            .boxed(),
        );
    }

    steps
}

App entry point

fn main() -> Result<(), PondError> {
    let dir = {
        let manifest = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
        manifest.join("examples").join("dyn_steps_data")
    };
    write_fixtures(&dir);

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_args(std::env::args_os())?
    .dispatch(pipeline)
}

Pipeline visualization

Open fullscreen

Split/Join Pipeline

Demonstrates fan-out/fan-in patterns using TemplatedCatalog, Split, Join, and StepVec. A combined inventory CSV is split by store into per-store files, processed independently, then joined back into a comparison report.

Usage

cargo run --example split_join_app -- run
cargo run --example split_join_app -- check
cargo run --example split_join_app -- viz

Types

// ---------------------------------------------------------------------------
// Per-store catalog entry (expanded from YAML template)
// ---------------------------------------------------------------------------

#[derive(Debug, Serialize, Deserialize)]
pub struct StoreCatalog {
    pub inventory: PolarsCsvDataset,
    pub total_value: MemoryDataset<f64>,
}

// ---------------------------------------------------------------------------
// Catalog and params
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct Catalog {
    pub all_inventory: PolarsCsvDataset,
    pub grouped: MemoryDataset<HashMap<String, DataFrame>>,
    pub stores: TemplatedCatalog<StoreCatalog>,
    pub store_values: MemoryDataset<HashMap<String, f64>>,
    pub report: JsonDataset,
}

#[derive(Serialize, Deserialize)]
pub struct Params {
    pub low_stock_threshold: Param<i64>,
}

The StoreCatalog struct is the per-entry template. Each store gets its own inventory CSV dataset and total_value memory dataset, with file paths expanded from a YAML template:

stores:
  placeholder: "store"
  template:
    inventory:
      path: "data/{store}_inventory.csv"
    total_value: {}
  names: [north, south, east]

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

/// Read the combined CSV and group rows by the "store" column.
fn group_by_store(df: DataFrame) -> Result<(HashMap<String, DataFrame>,), PolarsError> {
    let store_col = df.column("store")?.str()?;
    let unique: Vec<String> = store_col
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect::<std::collections::HashSet<_>>()
        .into_iter()
        .collect();

    let mut map = HashMap::new();
    for store in &unique {
        let mask = store_col.equal(store.as_str());
        map.insert(store.clone(), df.filter(&mask)?);
    }
    Ok((map,))
}

/// Compute the total stock value for a single store.
///
/// For each row: value = quantity * unit_price.
/// Items below `low_stock_threshold` are flagged but still counted.
fn compute_store_value(df: DataFrame, threshold: i64) -> (f64,) {
    let qty = df.column("quantity").unwrap().i64().unwrap();
    let price = df.column("unit_price").unwrap().f64().unwrap();

    let mut total = 0.0;
    let mut low_stock_items = 0;
    for i in 0..df.height() {
        let q = qty.get(i).unwrap_or(0);
        let p = price.get(i).unwrap_or(0.0);
        total += q as f64 * p;
        if q < threshold {
            low_stock_items += 1;
        }
    }

    if low_stock_items > 0 {
        log::warn!(
            "{low_stock_items} item(s) below stock threshold of {threshold}"
        );
    }
    (total,)
}

/// Build a JSON comparison report from per-store totals.
fn build_report(store_values: HashMap<String, f64>) -> (Value,) {
    let grand_total: f64 = store_values.values().sum();
    let mut stores: Vec<Value> = store_values
        .iter()
        .map(|(name, &value)| {
            json!({
                "store": name,
                "total_value": (value * 100.0).round() / 100.0,
                "share_pct": ((value / grand_total * 10000.0).round() / 100.0),
            })
        })
        .collect();
    stores.sort_by(|a, b| {
        b["total_value"]
            .as_f64()
            .unwrap()
            .partial_cmp(&a["total_value"].as_f64().unwrap())
            .unwrap()
    });

    let report = json!({
        "grand_total": (grand_total * 100.0).round() / 100.0,
        "stores": stores,
    });
    (report,)
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn pipeline<'a>(cat: &'a Catalog, params: &'a Params) -> StepVec<'a> {
    // Step 1: group the combined CSV into a HashMap by store.
    let mut steps: StepVec<'a> = vec![
        Node {
            name: "group_by_store",
            func: group_by_store,
            input: (&cat.all_inventory,),
            output: (&cat.grouped,),
        }
        .boxed(),
    ];

    // Step 2: Split distributes per-store DataFrames to individual CSV files.
    steps.push(
        Split {
            name: "split_stores",
            input: &cat.grouped,
            catalog: &cat.stores,
            field: |s: &StoreCatalog| &s.inventory,
        }
        .boxed(),
    );

    // Step 3: per-store processing — dynamically build a node for each store.
    for (_, store) in cat.stores.iter() {
        steps.push(
            Node {
                name: "compute_store_value",
                func: compute_store_value,
                input: (&store.inventory, &params.low_stock_threshold),
                output: (&store.total_value,),
            }
            .boxed(),
        );
    }

    // Step 4: Join collects per-store totals back into a HashMap.
    steps.push(
        Join {
            name: "join_values",
            catalog: &cat.stores,
            field: |s: &StoreCatalog| &s.total_value,
            output: &cat.store_values,
        }
        .boxed(),
    );

    // Step 5: build a comparison report from the joined values.
    steps.push(
        Node {
            name: "build_report",
            func: build_report,
            input: (&cat.store_values,),
            output: (&cat.report,),
        }
        .boxed(),
    );

    steps
}

The pipeline uses StepVec because the per-store processing nodes are built dynamically from the TemplatedCatalog entries. The flow is:

  1. group_by_store — reads the combined CSV and groups rows into a HashMap<String, DataFrame>
  2. split_stores — distributes each store’s DataFrame to its per-store CSV file
  3. compute_store_value (one per store) — computes total stock value from each store’s CSV
  4. join_values — collects per-store totals back into a HashMap<String, f64>
  5. build_report — produces a JSON comparison report

App entry point

fn main() -> Result<(), PondError> {
    let dir = data_dir();
    write_fixtures(&dir);

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_hooks((LoggingHook::new(),))
    .with_args(std::env::args_os())?
    .dispatch(pipeline)
}

Pipeline visualization

Open fullscreen