Daily ETL Pipeline

PR Ready

S3 → validate → transform → Snowflake → dashboard refresh with failure alerts.

6 nodes · 8 edgespr ready
airflowetldata-engineeringsnowflakedbt
Visual
Extract from S3data

Download daily CSV exports from s3://data-lake/raw/.

sequentialValidate Schema
errorSlack Failure Alert
Validate Schemacicd

Check column types, nullability, and row count against expectations.

sequentialTransform with dbt
errorSlack Failure Alert
Transform with dbtcli

Run dbt models to clean, deduplicate, and join tables.

sequentialLoad to Snowflake
errorSlack Failure Alert
Load to Snowflakedb

Insert transformed data into Snowflake production tables.

sequentialRefresh Metabase Dashboard
errorSlack Failure Alert
Refresh Metabase Dashboardapi

Trigger cache invalidation on the executive KPI dashboard.

Slack Failure Alertapi

Post to #data-alerts with error details and run link.

ex-airflow-etl-pipeline.osop.yaml
# Airflow ETL Pipeline — OSOP Portable Workflow
#
# Classic extract-transform-load: pulls raw data from S3, validates the
# schema, transforms with dbt, loads into Snowflake, refreshes a Metabase
# dashboard, and sends a Slack alert on any failure.
#
# Run with Airflow or validate: osop validate airflow-etl-pipeline.osop.yaml

osop_version: "1.0"
id: "airflow-etl-pipeline"
name: "Daily ETL Pipeline"
description: "S3 → validate → transform → Snowflake → dashboard refresh with failure alerts."
version: "1.0.0"
tags: [airflow, etl, data-engineering, snowflake, dbt]

nodes:
  - id: "extract_s3"
    type: "data"
    name: "Extract from S3"
    description: "Download daily CSV exports from s3://data-lake/raw/."
    config:
      source: "s3://data-lake/raw/{{ds}}/"
      format: csv

  - id: "validate_schema"
    type: "cicd"
    subtype: "test"
    name: "Validate Schema"
    description: "Check column types, nullability, and row count against expectations."
    config:
      tool: "great_expectations"
      suite: "raw_data_suite"

  - id: "transform_dbt"
    type: "cli"
    subtype: "script"
    name: "Transform with dbt"
    description: "Run dbt models to clean, deduplicate, and join tables."
    config:
      command: "dbt run --select tag:daily"

  - id: "load_warehouse"
    type: "db"
    name: "Load to Snowflake"
    description: "Insert transformed data into Snowflake production tables."
    config:
      target: "snowflake://analytics/prod"

  - id: "refresh_dashboard"
    type: "api"
    subtype: "rest"
    name: "Refresh Metabase Dashboard"
    description: "Trigger cache invalidation on the executive KPI dashboard."
    config:
      url: "https://metabase.internal/api/card/{{card_id}}/refresh"

  - id: "notify_failure"
    type: "api"
    subtype: "rest"
    name: "Slack Failure Alert"
    description: "Post to #data-alerts with error details and run link."
    config:
      channel: "#data-alerts"

edges:
  - from: "extract_s3"
    to: "validate_schema"
    mode: "sequential"
  - from: "validate_schema"
    to: "transform_dbt"
    mode: "sequential"
  - from: "transform_dbt"
    to: "load_warehouse"
    mode: "sequential"
  - from: "load_warehouse"
    to: "refresh_dashboard"
    mode: "sequential"
  - from: "extract_s3"
    to: "notify_failure"
    mode: "error"
    label: "Any step fails"
  - from: "validate_schema"
    to: "notify_failure"
    mode: "error"
  - from: "transform_dbt"
    to: "notify_failure"
    mode: "error"
  - from: "load_warehouse"
    to: "notify_failure"
    mode: "error"