Modular Pipeline with Sub-Workflows
v1.1 FeatureDemonstrates v1.1 sub-workflows — compose complex pipelines from reusable building blocks.
6 nodes · 6 edgesv1.1 features
v1.1sub-workflowcompositiondata-pipeline
Visual
extract_salessystem
↓parallel→ transform
extract_inventorysystem
↓parallel→ transform
transformsystem
↓sequential→ quality_gate
quality_gatesystem
↓conditional→ load_warehouse
↓fallback→ notify_team
load_warehousedb
↓sequential→ notify_team
notify_teamapi
ex-sub-workflow-composition.osop.yaml
osop_version: "1.1"
id: "sub-workflow-composition"
name: "Modular Pipeline with Sub-Workflows"
description: "Demonstrates v1.1 sub-workflows — compose complex pipelines from reusable building blocks."
tags: [v1.1, sub-workflow, composition, data-pipeline]
imports:
- "./etl-extract.osop"
- "./etl-transform.osop"
- "./quality-check.osop"
timeout_sec: 7200 # 2 hour global deadline
nodes:
- id: "extract_sales"
type: "system"
purpose: "Extract sales data from source systems."
workflow_ref: "etl-extract"
workflow_inputs:
source: "salesforce"
date_range: "${inputs.date_range}"
- id: "extract_inventory"
type: "system"
purpose: "Extract inventory data from warehouse system."
workflow_ref: "etl-extract"
workflow_inputs:
source: "warehouse_api"
date_range: "${inputs.date_range}"
- id: "transform"
type: "system"
purpose: "Clean and normalize extracted data."
workflow_ref: "etl-transform"
workflow_inputs:
sales_data: "${outputs.extract_sales.data}"
inventory_data: "${outputs.extract_inventory.data}"
- id: "quality_gate"
type: "system"
purpose: "Run data quality checks."
workflow_ref: "quality-check"
workflow_inputs:
dataset: "${outputs.transform.result}"
rules: "strict"
- id: "load_warehouse"
type: "db"
purpose: "Load validated data into analytics warehouse."
runtime:
engine: "bigquery"
table: "analytics.daily_report"
- id: "notify_team"
type: "api"
purpose: "Send completion notification to Slack."
runtime:
endpoint: "https://hooks.slack.com/services/..."
method: "POST"
edges:
- from: "extract_sales"
to: "transform"
mode: "parallel"
- from: "extract_inventory"
to: "transform"
mode: "parallel"
join_mode: "wait_all"
- from: "transform"
to: "quality_gate"
mode: "sequential"
- from: "quality_gate"
to: "load_warehouse"
mode: "conditional"
when: "outputs.quality_gate.passed == true"
- from: "load_warehouse"
to: "notify_team"
mode: "sequential"
- from: "quality_gate"
to: "notify_team"
mode: "fallback"
label: "Alert team on quality failure"