Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Weather Pipeline

Demonstrates subpipelines, struct params, nested catalog/params, PartitionedDataset, MemoryDataset, YamlDataset, PlotlyDataset, parallel execution, and an intentional error node.

Usage

cargo run --example weather_app -- run --runner parallel
cargo run --example weather_app -- check
cargo run --example weather_app -- viz

Types

// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------

#[derive(Debug)]
pub enum WeatherError {
    Pond(PondError),
    Polars(PolarsError),
    Validation(String),
}

impl From<PondError> for WeatherError {
    fn from(e: PondError) -> Self {
        WeatherError::Pond(e)
    }
}

impl From<PolarsError> for WeatherError {
    fn from(e: PolarsError) -> Self {
        WeatherError::Polars(e)
    }
}

impl std::fmt::Display for WeatherError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            WeatherError::Pond(e) => write!(f, "{e}"),
            WeatherError::Polars(e) => write!(f, "{e}"),
            WeatherError::Validation(msg) => write!(f, "Validation failed: {msg}"),
        }
    }
}

impl std::error::Error for WeatherError {}

// ---------------------------------------------------------------------------
// Domain types
// ---------------------------------------------------------------------------

/// Baseline period for anomaly detection (struct param).
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BaselinePeriod {
    pub start_month: u32,
    pub end_month: u32,
}

/// Aggregated weather statistics (must be Copy for MemoryDataset).
#[derive(Clone, Copy, Debug, Default)]
pub struct WeatherSummary {
    pub avg_temp: f64,
    pub max_temp: f64,
    pub min_temp: f64,
    pub total_rainfall: f64,
    pub station_count: u32,
    pub reading_count: u32,
}

// ---------------------------------------------------------------------------
// Params (nested containers with struct param)
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct WeatherParams {
    pub analysis: AnalysisConfig,
    pub display: DisplayConfig,
}

#[derive(Serialize, Deserialize)]
pub struct AnalysisConfig {
    pub anomaly_threshold: Param<f64>,
    pub baseline: Param<BaselinePeriod>,
}

#[derive(Serialize, Deserialize)]
pub struct DisplayConfig {
    pub chart_title: Param<String>,
    pub top_n: Param<usize>,
}

// ---------------------------------------------------------------------------
// Catalog (nested containers)
// ---------------------------------------------------------------------------

#[derive(Serialize, Deserialize)]
pub struct WeatherCatalog {
    pub sources: SourcesCatalog,
    pub analysis: AnalysisCatalog,
    pub reports: ReportsCatalog,
}

#[derive(Serialize, Deserialize)]
pub struct SourcesCatalog {
    pub station_readings: PartitionedDataset<PolarsCsvDataset>,
    pub station_metadata: YamlDataset,
}

#[derive(Serialize, Deserialize)]
pub struct AnalysisCatalog {
    pub combined_readings: PolarsCsvDataset,
    pub weather_summary: MemoryDataset<WeatherSummary>,
    pub anomalies: PolarsCsvDataset,
}

#[derive(Serialize, Deserialize)]
pub struct ReportsCatalog {
    pub temperature_chart: PlotlyDataset,
    pub rainfall_chart: PlotlyDataset,
    pub validation_passed: MemoryDataset<bool>,
}

Node functions

// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------

fn merge_stations(
    partitions: HashMap<String, DataFrame>,
) -> Result<(DataFrame,), PolarsError> {
    let mut combined: Option<DataFrame> = None;
    for (station_name, mut df) in partitions {
        let n = df.height();
        let station_col = Column::new("station".into(), vec![station_name.as_str(); n]);
        df.with_column(station_col)?;
        match &mut combined {
            None => combined = Some(df),
            Some(c) => { c.vstack_mut(&df)?; }
        }
    }
    Ok((combined.unwrap_or_default(),))
}

fn load_metadata(meta: Yaml) {
    if let Yaml::Hash(ref map) = meta {
        for (key, _) in map {
            if let Yaml::String(name) = key {
                log::info!("Loaded metadata for station: {name}");
            }
        }
    }
}

fn compute_summary(
    df: DataFrame,
    baseline: BaselinePeriod,
) -> (WeatherSummary,) {
    let _ = baseline;

    let temp = df.column("temperature").unwrap().f64().unwrap();
    let rain = df.column("rainfall").unwrap().f64().unwrap();

    let stations: Vec<String> = df
        .column("station")
        .unwrap()
        .str()
        .unwrap()
        .into_no_null_iter()
        .map(|s| s.to_string())
        .collect();
    let unique_stations: std::collections::HashSet<&str> =
        stations.iter().map(|s| s.as_str()).collect();

    let summary = WeatherSummary {
        avg_temp: temp.mean().unwrap_or(0.0),
        max_temp: temp.max().unwrap_or(0.0),
        min_temp: temp.min().unwrap_or(0.0),
        total_rainfall: rain.sum().unwrap_or(0.0),
        station_count: unique_stations.len() as u32,
        reading_count: df.height() as u32,
    };
    (summary,)
}

fn detect_anomalies(
    df: DataFrame,
    threshold: f64,
) -> Result<(DataFrame,), PolarsError> {
    let temp = df.column("temperature")?.f64()?;
    let mean = temp.mean().unwrap_or(0.0);
    let std_dev = temp.std(1).unwrap_or(1.0);

    let lower = mean - threshold * std_dev;
    let upper = mean + threshold * std_dev;

    let mask = temp.lt(lower) | temp.gt(upper);
    Ok((df.filter(&mask)?,))
}

fn plot_temperatures(summary: WeatherSummary, title: String) -> (Plot,) {
    let labels = vec!["Min", "Avg", "Max"];
    let values = vec![summary.min_temp, summary.avg_temp, summary.max_temp];

    let mut plot = Plot::new();
    plot.add_trace(
        Bar::new(labels, values).name("Temperature"),
    );
    plot.set_layout(
        Layout::new()
            .title(format!("{title} - Temperature Summary"))
            .y_axis(plotly::layout::Axis::new().title("Temperature (C)")),
    );
    (plot,)
}

fn plot_rainfall(summary: WeatherSummary, title: String) -> (Plot,) {
    let labels = vec![
        format!("{} stations", summary.station_count),
        format!("{} readings", summary.reading_count),
    ];
    let values = vec![
        summary.total_rainfall / summary.station_count as f64,
        summary.total_rainfall,
    ];

    let mut plot = Plot::new();
    plot.add_trace(
        Bar::new(labels, values).name("Rainfall"),
    );
    plot.set_layout(
        Layout::new()
            .title(format!("{title} - Rainfall Summary"))
            .y_axis(plotly::layout::Axis::new().title("Rainfall (mm)")),
    );
    (plot,)
}

fn validate_reports(
    _temp_chart: serde_json::Value,
    _rain_chart: serde_json::Value,
) -> Result<(bool,), WeatherError> {
    Err(WeatherError::Validation(
        "Data quality check failed: east station has a suspicious 50C reading in July".into(),
    ))
}

Pipeline definition

// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------

pub fn weather_pipeline<'a>(
    cat: &'a WeatherCatalog,
    params: &'a WeatherParams,
) -> impl Steps<WeatherError> + 'a {
    (
        // Subpipeline: data preparation
        Pipeline {
            name: "data_prep",
            steps: (
                Node {
                    name: "merge_stations",
                    func: merge_stations,
                    input: (&cat.sources.station_readings,),
                    output: (&cat.analysis.combined_readings,),
                },
                Node {
                    name: "load_metadata",
                    func: load_metadata,
                    input: (&cat.sources.station_metadata,),
                    output: (),
                },
            ),
            input: (
                &cat.sources.station_readings,
                &cat.sources.station_metadata,
            ),
            output: (&cat.analysis.combined_readings,),
        },
        // These two nodes can run in parallel (both read combined_readings)
        Node {
            name: "compute_summary",
            func: compute_summary,
            input: (
                &cat.analysis.combined_readings,
                &params.analysis.baseline,
            ),
            output: (&cat.analysis.weather_summary,),
        },
        Node {
            name: "detect_anomalies",
            func: detect_anomalies,
            input: (
                &cat.analysis.combined_readings,
                &params.analysis.anomaly_threshold,
            ),
            output: (&cat.analysis.anomalies,),
        },
        // Subpipeline: reporting (inner nodes can run in parallel)
        Pipeline {
            name: "reporting",
            steps: (
                Node {
                    name: "plot_temperatures",
                    func: plot_temperatures,
                    input: (
                        &cat.analysis.weather_summary,
                        &params.display.chart_title,
                    ),
                    output: (&cat.reports.temperature_chart,),
                },
                Node {
                    name: "plot_rainfall",
                    func: plot_rainfall,
                    input: (
                        &cat.analysis.weather_summary,
                        &params.display.chart_title,
                    ),
                    output: (&cat.reports.rainfall_chart,),
                },
            ),
            input: (
                &cat.analysis.weather_summary,
                &params.display.chart_title,
            ),
            output: (
                &cat.reports.temperature_chart,
                &cat.reports.rainfall_chart,
            ),
        },
        // Validation node: intentionally fails
        Node {
            name: "validate_reports",
            func: validate_reports,
            input: (
                &cat.reports.temperature_chart,
                &cat.reports.rainfall_chart,
            ),
            output: (&cat.reports.validation_passed,),
        },
    )
}

App entry point

    pondrs::app::App::from_yaml(
        dir.join("catalog.yml").to_str().unwrap(),
        dir.join("params.yml").to_str().unwrap(),
    )?
    .with_hooks((
        LoggingHook::new(),
        VizHook::new("http://localhost:8080".to_string()),
    ))
    .with_args(std::env::args_os())?
    .dispatch(weather_pipeline)

Pipeline visualization

Open fullscreen