Airflow-Style Data Processing DAG
Data8 nodes · 9 edgesdata
Visual
ex-airflow-dag.osop.yaml
# Airflow-Style DAG Workflow
# Sensor-driven pipeline with parallel transforms, quality checks, and notification.
osop_version: "2.0"
id: airflow-dag
name: Airflow-Style Data Processing DAG
nodes:
- id: file_sensor
type: system
purpose: Wait for upstream data file to appear in S3 landing zone
runtime:
tool: s3-sensor
outputs:
- file_path
timeout_sec: 3600
explain: "Polls S3 every 60 seconds until the expected file lands or timeout."
- id: extract
type: api
purpose: Download the landed file and stage for processing
runtime:
endpoint: /v1/stage
method: POST
url: https://data-platform.internal
inputs:
- file_path
outputs:
- staged_data
- id: transform_users
type: data
purpose: Transform and enrich user dimension data
runtime:
engine: spark
config:
app_name: transform-users
master: k8s
inputs:
- staged_data
outputs:
- users_table
- id: transform_events
type: data
purpose: Transform and aggregate event fact data
runtime:
engine: spark
config:
app_name: transform-events
master: k8s
inputs:
- staged_data
outputs:
- events_table
- id: transform_metrics
type: data
purpose: Compute daily business metrics from events
runtime:
engine: spark
config:
app_name: transform-metrics
master: k8s
inputs:
- staged_data
outputs:
- metrics_table
- id: load_warehouse
type: db
purpose: Load all transformed tables into the data warehouse
runtime:
engine: bigquery
connection: project.analytics
inputs:
- users_table
- events_table
- metrics_table
outputs:
- load_status
- id: quality_check
type: data
purpose: Run data quality assertions on loaded tables
runtime:
engine: dbt
config:
command: test
select: "tag:daily_quality"
inputs:
- load_status
outputs:
- quality_result
- id: notify
type: api
purpose: Send pipeline completion notification to Slack and PagerDuty
runtime:
endpoint: /v1/notify
method: POST
url: https://alerts.internal
inputs:
- quality_result
outputs:
- notification_status
edges:
- from: file_sensor
to: extract
mode: event
explain: "Triggered when sensor detects the file."
- from: extract
to: transform_users
mode: parallel
- from: extract
to: transform_events
mode: parallel
- from: extract
to: transform_metrics
mode: parallel
explain: "All three transforms run concurrently after extraction."
- from: transform_users
to: load_warehouse
mode: sequential
- from: transform_events
to: load_warehouse
mode: sequential
- from: transform_metrics
to: load_warehouse
mode: sequential
- from: load_warehouse
to: quality_check
mode: sequential
- from: quality_check
to: notify
mode: sequential