Weather Pipeline
Demonstrates subpipelines, struct params, nested catalog/params, PartitionedDataset, MemoryDataset, YamlDataset, PlotlyDataset, parallel execution, and an intentional error node.
Usage
cargo run --example weather_app -- run --runner parallel
cargo run --example weather_app -- check
cargo run --example weather_app -- viz
Types
// ---------------------------------------------------------------------------
// Error type
// ---------------------------------------------------------------------------
#[derive(Debug)]
pub enum WeatherError {
Pond(PondError),
Polars(PolarsError),
Validation(String),
}
impl From<PondError> for WeatherError {
fn from(e: PondError) -> Self {
WeatherError::Pond(e)
}
}
impl From<PolarsError> for WeatherError {
fn from(e: PolarsError) -> Self {
WeatherError::Polars(e)
}
}
impl std::fmt::Display for WeatherError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WeatherError::Pond(e) => write!(f, "{e}"),
WeatherError::Polars(e) => write!(f, "{e}"),
WeatherError::Validation(msg) => write!(f, "Validation failed: {msg}"),
}
}
}
impl std::error::Error for WeatherError {}
// ---------------------------------------------------------------------------
// Domain types
// ---------------------------------------------------------------------------
/// Baseline period for anomaly detection (struct param).
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub struct BaselinePeriod {
pub start_month: u32,
pub end_month: u32,
}
/// Aggregated weather statistics (must be Copy for MemoryDataset).
#[derive(Clone, Copy, Debug, Default)]
pub struct WeatherSummary {
pub avg_temp: f64,
pub max_temp: f64,
pub min_temp: f64,
pub total_rainfall: f64,
pub station_count: u32,
pub reading_count: u32,
}
// ---------------------------------------------------------------------------
// Params (nested containers with struct param)
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct WeatherParams {
pub analysis: AnalysisConfig,
pub display: DisplayConfig,
}
#[derive(Serialize, Deserialize)]
pub struct AnalysisConfig {
pub anomaly_threshold: Param<f64>,
pub baseline: Param<BaselinePeriod>,
}
#[derive(Serialize, Deserialize)]
pub struct DisplayConfig {
pub chart_title: Param<String>,
pub top_n: Param<usize>,
}
// ---------------------------------------------------------------------------
// Catalog (nested containers)
// ---------------------------------------------------------------------------
#[derive(Serialize, Deserialize)]
pub struct WeatherCatalog {
pub sources: SourcesCatalog,
pub analysis: AnalysisCatalog,
pub reports: ReportsCatalog,
}
#[derive(Serialize, Deserialize)]
pub struct SourcesCatalog {
pub station_readings: PartitionedDataset<PolarsCsvDataset>,
pub station_metadata: YamlDataset,
}
#[derive(Serialize, Deserialize)]
pub struct AnalysisCatalog {
pub combined_readings: PolarsCsvDataset,
pub weather_summary: MemoryDataset<WeatherSummary>,
pub anomalies: PolarsCsvDataset,
}
#[derive(Serialize, Deserialize)]
pub struct ReportsCatalog {
pub temperature_chart: PlotlyDataset,
pub rainfall_chart: PlotlyDataset,
pub validation_passed: MemoryDataset<bool>,
}
Node functions
// ---------------------------------------------------------------------------
// Node functions
// ---------------------------------------------------------------------------
fn merge_stations(
partitions: HashMap<String, DataFrame>,
) -> Result<(DataFrame,), PolarsError> {
let mut combined: Option<DataFrame> = None;
for (station_name, mut df) in partitions {
let n = df.height();
let station_col = Column::new("station".into(), vec![station_name.as_str(); n]);
df.with_column(station_col)?;
match &mut combined {
None => combined = Some(df),
Some(c) => { c.vstack_mut(&df)?; }
}
}
Ok((combined.unwrap_or_default(),))
}
fn load_metadata(meta: Yaml) {
if let Yaml::Hash(ref map) = meta {
for (key, _) in map {
if let Yaml::String(name) = key {
log::info!("Loaded metadata for station: {name}");
}
}
}
}
fn compute_summary(
df: DataFrame,
baseline: BaselinePeriod,
) -> (WeatherSummary,) {
let _ = baseline;
let temp = df.column("temperature").unwrap().f64().unwrap();
let rain = df.column("rainfall").unwrap().f64().unwrap();
let stations: Vec<String> = df
.column("station")
.unwrap()
.str()
.unwrap()
.into_no_null_iter()
.map(|s| s.to_string())
.collect();
let unique_stations: std::collections::HashSet<&str> =
stations.iter().map(|s| s.as_str()).collect();
let summary = WeatherSummary {
avg_temp: temp.mean().unwrap_or(0.0),
max_temp: temp.max().unwrap_or(0.0),
min_temp: temp.min().unwrap_or(0.0),
total_rainfall: rain.sum().unwrap_or(0.0),
station_count: unique_stations.len() as u32,
reading_count: df.height() as u32,
};
(summary,)
}
fn detect_anomalies(
df: DataFrame,
threshold: f64,
) -> Result<(DataFrame,), PolarsError> {
let temp = df.column("temperature")?.f64()?;
let mean = temp.mean().unwrap_or(0.0);
let std_dev = temp.std(1).unwrap_or(1.0);
let lower = mean - threshold * std_dev;
let upper = mean + threshold * std_dev;
let mask = temp.lt(lower) | temp.gt(upper);
Ok((df.filter(&mask)?,))
}
fn plot_temperatures(summary: WeatherSummary, title: String) -> (Plot,) {
let labels = vec!["Min", "Avg", "Max"];
let values = vec![summary.min_temp, summary.avg_temp, summary.max_temp];
let mut plot = Plot::new();
plot.add_trace(
Bar::new(labels, values).name("Temperature"),
);
plot.set_layout(
Layout::new()
.title(format!("{title} - Temperature Summary"))
.y_axis(plotly::layout::Axis::new().title("Temperature (C)")),
);
(plot,)
}
fn plot_rainfall(summary: WeatherSummary, title: String) -> (Plot,) {
let labels = vec![
format!("{} stations", summary.station_count),
format!("{} readings", summary.reading_count),
];
let values = vec![
summary.total_rainfall / summary.station_count as f64,
summary.total_rainfall,
];
let mut plot = Plot::new();
plot.add_trace(
Bar::new(labels, values).name("Rainfall"),
);
plot.set_layout(
Layout::new()
.title(format!("{title} - Rainfall Summary"))
.y_axis(plotly::layout::Axis::new().title("Rainfall (mm)")),
);
(plot,)
}
fn validate_reports(
_temp_chart: serde_json::Value,
_rain_chart: serde_json::Value,
) -> Result<(bool,), WeatherError> {
Err(WeatherError::Validation(
"Data quality check failed: east station has a suspicious 50C reading in July".into(),
))
}
Pipeline definition
// ---------------------------------------------------------------------------
// Pipeline function
// ---------------------------------------------------------------------------
pub fn weather_pipeline<'a>(
cat: &'a WeatherCatalog,
params: &'a WeatherParams,
) -> impl Steps<WeatherError> + 'a {
(
// Subpipeline: data preparation
Pipeline {
name: "data_prep",
steps: (
Node {
name: "merge_stations",
func: merge_stations,
input: (&cat.sources.station_readings,),
output: (&cat.analysis.combined_readings,),
},
Node {
name: "load_metadata",
func: load_metadata,
input: (&cat.sources.station_metadata,),
output: (),
},
),
input: (
&cat.sources.station_readings,
&cat.sources.station_metadata,
),
output: (&cat.analysis.combined_readings,),
},
// These two nodes can run in parallel (both read combined_readings)
Node {
name: "compute_summary",
func: compute_summary,
input: (
&cat.analysis.combined_readings,
¶ms.analysis.baseline,
),
output: (&cat.analysis.weather_summary,),
},
Node {
name: "detect_anomalies",
func: detect_anomalies,
input: (
&cat.analysis.combined_readings,
¶ms.analysis.anomaly_threshold,
),
output: (&cat.analysis.anomalies,),
},
// Subpipeline: reporting (inner nodes can run in parallel)
Pipeline {
name: "reporting",
steps: (
Node {
name: "plot_temperatures",
func: plot_temperatures,
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (&cat.reports.temperature_chart,),
},
Node {
name: "plot_rainfall",
func: plot_rainfall,
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (&cat.reports.rainfall_chart,),
},
),
input: (
&cat.analysis.weather_summary,
¶ms.display.chart_title,
),
output: (
&cat.reports.temperature_chart,
&cat.reports.rainfall_chart,
),
},
// Validation node: intentionally fails
Node {
name: "validate_reports",
func: validate_reports,
input: (
&cat.reports.temperature_chart,
&cat.reports.rainfall_chart,
),
output: (&cat.reports.validation_passed,),
},
)
}
App entry point
pondrs::app::App::from_yaml(
dir.join("catalog.yml").to_str().unwrap(),
dir.join("params.yml").to_str().unwrap(),
)?
.with_hooks((
LoggingHook::new(),
VizHook::new("http://localhost:8080".to_string()),
))
.with_args(std::env::args_os())?
.dispatch(weather_pipeline)