ETL Patterns: ADF, Databricks & Synapse Pipelines
Enterprise ETL patterns with Azure Data Factory, Databricks, and Synapse for data transformation workloads
ETL Pattern Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ETL PATTERN ARCHITECTURE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β EXTRACT TRANSFORM LOAD β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β Source β β Business β β Target β β
β β Systems ββββββββ>β Logic βββββββββββ>β Systems β β
β β β β β β β β
β β β’ DB β β β’ Clean β β β’ DW β β
β β β’ Files β β β’ Enrich β β β’ Datalakeβ β
β β β’ APIs β β β’ Aggregateβ β β’ NoSQL β β
β β β’ Streamsβ β β’ Validateβ β β’ Files β β
β ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β ORCHESTRATION PATTERNS: β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β
β β PATTERN 1: Star Pattern (Fan-out) β β
β β ββββββββ ββββββββ ββββββββ ββββββββ β β
β β βSourceββββ>βExtractββ>βTrans1β βTrans2β βTrans3β β β
β β ββββββββ ββββββββ ββββ¬ββββ ββββ¬ββββ ββββ¬ββββ β β
β β β β β β β
β β βββββββββββΌββββββββββ β β
β β βΌ β β
β β ββββββββββββ β β
β β β Load β β β
β β ββββββββββββ β β
β β β β
β β PATTERN 2: Chain Pattern (Sequential) β β
β β ββββββββ ββββββββ ββββββββ ββββββββ β β
β β βSourceββββ>βExtractββββ>βTrans ββββ>βLoad β β β
β β ββββββββ ββββββββ ββββββββ ββββββββ β β
β β β β
β β PATTERN 3: Parallel ETL β β
β β ββββββββ βββββββββββββββββββββββββββββββββββ β β
β β βSourceββββ>β Parallel Workers (Spark/ADF IR) β β β
β β ββββββββ β βββββββ βββββββ βββββββ ββββββββ β β
β β β βW1 β βW2 β βW3 β βW4 ββ β β
β β β ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬ββββ β β
β β ββββββΌβββββββΌβββββββΌβββββββΌββββββββ β β
β β ββββββββΌβββββββΌβββββββ β β
β β ββββββββ β β
β β βΌ β β
β β ββββββββββββ β β
β β β Merge β β β
β β ββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
ADF ETL Pipeline Example
{
"name": "pl_etl_sales_pipeline",
"properties": {
"activities": [
{
"name": "LookupLastLoadDate",
"type": "Lookup",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": "SELECT MAX(LoadDate) as LastLoadDate FROM ControlTable WHERE PipelineName = 'pl_etl_sales'"
},
"firstRowOnly": true
},
"outputs": [
{ "name": "ds_control_table" }
]
},
{
"name": "ExtractIncremental",
"type": "Copy",
"typeProperties": {
"source": {
"type": "AzureSqlSource",
"sqlReaderQuery": {
"value": "SELECT * FROM Sales WHERE ModifiedDate > '@{activity('LookupLastLoadDate').output.firstRow.LastLoadDate}'",
"type": "Expression"
}
},
"sink": {
"type": "ParquetSink",
"storeSettings": {
"type": "AzureDataLakeGen2WriteSettings",
"copyBehavior": "PreserveHierarchy"
}
}
},
"dependsOn": [
{
"activity": "LookupLastLoadDate",
"dependencyConditions": ["Succeeded"]
}
]
},
{
"name": "TransformWithSpark",
"type": "DatabricksNotebook",
"typeProperties": {
"notebookPath": "/Repos/etl/sales_transform",
"baseParameters": {
"raw_path": "@{pipeline().parameters.rawPath}/@{formatDateTime(pipeline().parameters.processDate, 'yyyy/MM/dd')}",
"curated_path": "@{pipeline().parameters.curatedPath}"
}
},
"dependsOn": [
{
"activity": "ExtractIncremental",
"dependencyConditions": ["Succeeded"]
}
]
},
{
"name": "UpdateControlTable",
"type": "AzureSqlSink",
"typeProperties": {
"sqlWriterQuery": {
"value": "UPDATE ControlTable SET LoadDate = '@{pipeline().parameters.processDate}' WHERE PipelineName = 'pl_etl_sales'",
"type": "Expression"
}
},
"dependsOn": [
{
"activity": "TransformWithSpark",
"dependencyConditions": ["Succeeded"]
}
]
}
]
}
}
Databricks ETL Notebook
# Databricks ETL transformation notebook
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from delta.tables import DeltaTable
spark = SparkSession.builder.getOrCreate()
# Extract
raw_path = dbutils.widgets.get("raw_path")
curated_path = dbutils.widgets.get("curated_path")
raw_df = spark.read.parquet(f"{raw_path}/**/*.parquet")
# Transform
transformed_df = raw_df \
.withColumn("sale_date", F.to_date("sale_date")) \
.withColumn("total_amount", F.col("quantity") * F.col("unit_price")) \
.withColumn("year", F.year("sale_date")) \
.withColumn("month", F.month("sale_date")) \
.withColumn("day", F.dayofmonth("sale_date")) \
.dropDuplicates(["sale_id"]) \
.filter(F.col("total_amount") > 0)
# Load (Merge/Upsert)
target_path = f"{curated_path}/fact_sales"
if DeltaTable.isDeltaTable(spark, target_path):
delta_table = DeltaTable.forPath(spark, target_path)
delta_table.alias("target").merge(
transformed_df.alias("source"),
"target.sale_id = source.sale_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
transformed_df.write \
.format("delta") \
.partitionBy("year", "month") \
.mode("overwrite") \
.save(target_path)
# Optimize
spark.sql(f"OPTIMIZE delta.`{target_path}`")
βΉοΈ
Pro Tip: Use Delta Lake merge (upsert) for idempotent loads. This ensures the same data can be reprocessed without duplicates, critical for fault-tolerant ETL.
Interview Questions
Q1: How do you handle schema changes in ETL pipelines? A: Use schema drift in ADF Data Flows, implement schema validation with PySpark, use Delta Lake schema evolution (mergeSchema option), or create schema registry for versioned schemas.
Q2: What are the best practices for ETL error handling? A: 1) Implement retry policies, 2) Use dead-letter queues for failed records, 3) Log errors to monitoring system, 4) Create alerting for failures, 5) Implement data quality checks before load, 6) Use try-catch in notebooks.
Q3: How do you test ETL pipelines? A: 1) Unit test transformation logic, 2) Integration test with sample data, 3) Data quality validation (row counts, null checks), 4) Performance testing with production data volumes, 5) End-to-end testing with downstream dependencies.