CDC 串流管線

Data
7 個節點 · 8 條連接data
ex-cdc-streaming.osop.yaml
# Change Data Capture Streaming Pipeline
# Capture database changes, filter, transform, apply to downstream systems, verify consistency
osop_version: "2.0"
id: cdc-streaming
name: "CDC 串流管線"

nodes:
  - id: capture_changes
    type: cli
    purpose: Start Debezium connector to capture row-level changes from source database WAL
    runtime:
      command: |
        curl -X POST https://kafka-connect.internal/connectors \
          -H "Content-Type: application/json" \
          -d '{
            "name": "source-db-cdc",
            "config": {
              "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
              "database.hostname": "${SOURCE_DB_HOST}",
              "database.port": "5432",
              "database.dbname": "production",
              "database.server.name": "source-db",
              "table.include.list": "public.orders,public.customers,public.inventory",
              "slot.name": "cdc_slot",
              "plugin.name": "pgoutput",
              "publication.name": "cdc_publication"
            }
          }'
    outputs: [connector_name, captured_tables, initial_snapshot_complete]
    timeout_sec: 300
    security:
      credentials: [SOURCE_DB_REPLICATION_USER, KAFKA_CONNECT_AUTH]
    explain: |
      Uses PostgreSQL logical replication via pgoutput plugin.
      Initial snapshot captures current state, then streams incremental changes.

  - id: filter_events
    type: cli
    purpose: Apply Kafka Streams filter to drop internal/system changes and PII scrubbing
    runtime:
      command: |
        java -jar cdc-filter.jar \
          --input-topics source-db.public.orders,source-db.public.customers,source-db.public.inventory \
          --output-topic cdc-filtered \
          --filter-config filter-rules.yaml \
          --pii-scrub email,phone,ssn
    inputs: [connector_name]
    outputs: [filtered_topic, events_dropped_count, pii_scrubbed_count]
    timeout_sec: 60
    explain: |
      Filters out: DELETE events on audit tables, internal flag updates,
      and test account changes. PII fields are hashed before downstream delivery.

  - id: transform_events
    type: cli
    purpose: Apply schema transformations and business logic enrichment via Flink job
    runtime:
      command: |
        flink run \
          --class com.company.cdc.TransformJob \
          --parallelism 4 \
          cdc-transform.jar \
          --source-topic cdc-filtered \
          --sink-topic cdc-transformed \
          --schema-registry https://schema-registry.internal
    inputs: [filtered_topic]
    outputs: [transformed_topic, schema_version, transform_errors]
    timeout_sec: 120
    explain: |
      Flink stateful stream processing: joins order events with customer dimension,
      computes derived fields, and validates against Avro schema registry.

  - id: apply_to_warehouse
    type: db
    purpose: Apply transformed change events to analytics data warehouse
    runtime:
      engine: bigquery
      connection: project.analytics.cdc_sink
    inputs: [transformed_topic]
    outputs: [warehouse_applied_count, warehouse_lag_ms]
    timeout_sec: 60
    explain: "BigQuery streaming insert via Kafka Connect BigQuery sink connector."

  - id: apply_to_cache
    type: cli
    purpose: Update Redis cache with latest entity state from change events
    runtime:
      command: |
        python apply_to_cache.py \
          --topic cdc-transformed \
          --redis-cluster redis-cluster.internal:6379 \
          --key-pattern "entity:{table}:{id}" \
          --ttl 3600
    inputs: [transformed_topic]
    outputs: [cache_updated_count, cache_misses]
    timeout_sec: 60
    security:
      credentials: [REDIS_AUTH_TOKEN]

  - id: verify_consistency
    type: cli
    purpose: Run periodic consistency checks between source, warehouse, and cache
    runtime:
      command: |
        python verify_cdc_consistency.py \
          --source-db postgresql://source:5432/production \
          --warehouse bigquery://project.analytics \
          --cache redis-cluster.internal:6379 \
          --tables orders,customers,inventory \
          --sample-rate 0.01 \
          --max-lag-seconds 30
    inputs: [warehouse_applied_count, cache_updated_count]
    outputs: [consistency_passed, lag_seconds, mismatched_records]
    timeout_sec: 300
    retry_policy:
      max_retries: 3
      backoff_sec: 60

  - id: alert_on_failure
    type: api
    purpose: Send PagerDuty alert if consistency check fails or lag exceeds threshold
    runtime:
      endpoint: /v2/enqueue
      method: POST
      url: https://events.pagerduty.com
    inputs: [consistency_passed, lag_seconds, mismatched_records]
    outputs: [alert_id]
    security:
      auth: bearer_token
      secret_ref: PAGERDUTY_ROUTING_KEY

edges:
  - from: capture_changes
    to: filter_events
    mode: sequential

  - from: filter_events
    to: transform_events
    mode: sequential

  - from: transform_events
    to: apply_to_warehouse
    mode: parallel

  - from: transform_events
    to: apply_to_cache
    mode: parallel

  - from: apply_to_warehouse
    to: verify_consistency
    mode: sequential

  - from: apply_to_cache
    to: verify_consistency
    mode: sequential

  - from: verify_consistency
    to: alert_on_failure
    mode: conditional
    condition: "consistency_passed == false || lag_seconds > 30"

  - from: verify_consistency
    to: verify_consistency
    mode: loop
    when: "true"
    label: "Continuous consistency monitoring every 5 minutes"