每日 ETL 流水線

PR Ready

S3 → 驗證 → 轉換 → Snowflake → 儀表板重整,並在失敗時發送告警。

6 個節點 · 8 條連接pr ready
airflowetldata-engineeringsnowflakedbt
視覺化
從 S3 提取資料data

從 s3://data-lake/raw/ 下載每日 CSV 匯出檔案。

sequential驗證 Schema
errorSlack 失敗告警
驗證 Schemacicd

依據預期值檢查欄位型別、可為空性與資料列數。

sequential使用 dbt 轉換資料
errorSlack 失敗告警
使用 dbt 轉換資料cli

執行 dbt 模型進行資料清理、去重與資料表合併。

sequential載入 Snowflake
errorSlack 失敗告警
載入 Snowflakedb

將轉換後的資料寫入 Snowflake 正式環境資料表。

sequential重整 Metabase 儀表板
errorSlack 失敗告警
重整 Metabase 儀表板api

觸發高階主管 KPI 儀表板的快取失效。

Slack 失敗告警api

在 #data-alerts 頻道發布含錯誤詳情與執行連結的訊息。

ex-airflow-etl-pipeline.osop.yaml
# Airflow ETL Pipeline — OSOP Portable Workflow
#
# Classic extract-transform-load: pulls raw data from S3, validates the
# schema, transforms with dbt, loads into Snowflake, refreshes a Metabase
# dashboard, and sends a Slack alert on any failure.
#
# Run with Airflow or validate: osop validate airflow-etl-pipeline.osop.yaml

osop_version: "1.0"
id: "airflow-etl-pipeline"
name:"每日 ETL 流水線"
description:"S3 → 驗證 → 轉換 → Snowflake → 儀表板重整,並在失敗時發送告警。"
version: "1.0.0"
tags: [airflow, etl, data-engineering, snowflake, dbt]

nodes:
  - id: "extract_s3"
    type: "data"
    name: "從 S3 提取資料"
    description: "從 s3://data-lake/raw/ 下載每日 CSV 匯出檔案。"
    config:
      source: "s3://data-lake/raw/{{ds}}/"
      format: csv

  - id: "validate_schema"
    type: "cicd"
    subtype: "test"
    name: "驗證 Schema"
    description: "依據預期值檢查欄位型別、可為空性與資料列數。"
    config:
      tool: "great_expectations"
      suite: "raw_data_suite"

  - id: "transform_dbt"
    type: "cli"
    subtype: "script"
    name: "使用 dbt 轉換資料"
    description: "執行 dbt 模型進行資料清理、去重與資料表合併。"
    config:
      command: "dbt run --select tag:daily"

  - id: "load_warehouse"
    type: "db"
    name: "載入 Snowflake"
    description: "將轉換後的資料寫入 Snowflake 正式環境資料表。"
    config:
      target: "snowflake://analytics/prod"

  - id: "refresh_dashboard"
    type: "api"
    subtype: "rest"
    name: "重整 Metabase 儀表板"
    description: "觸發高階主管 KPI 儀表板的快取失效。"
    config:
      url: "https://metabase.internal/api/card/{{card_id}}/refresh"

  - id: "notify_failure"
    type: "api"
    subtype: "rest"
    name: "Slack 失敗告警"
    description: "在 #data-alerts 頻道發布含錯誤詳情與執行連結的訊息。"
    config:
      channel: "#data-alerts"

edges:
  - from: "extract_s3"
    to: "validate_schema"
    mode: "sequential"
  - from: "validate_schema"
    to: "transform_dbt"
    mode: "sequential"
  - from: "transform_dbt"
    to: "load_warehouse"
    mode: "sequential"
  - from: "load_warehouse"
    to: "refresh_dashboard"
    mode: "sequential"
  - from: "extract_s3"
    to: "notify_failure"
    mode: "error"
    label: "Any step fails"
  - from: "validate_schema"
    to: "notify_failure"
    mode: "error"
  - from: "transform_dbt"
    to: "notify_failure"
    mode: "error"
  - from: "load_warehouse"
    to: "notify_failure"
    mode: "error"