ETL Architecture Patterns on GCP
Dataproc Spark ETL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("ETL_Sales_Data") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# EXTRACT: Read raw data from GCS
raw_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("gs://my-data-lake/bronze/sales/*.csv")
# TRANSFORM: Clean and enrich
transformed_df = raw_df \
.withColumn("order_date", to_date(col("order_date"), "yyyy-MM-dd")) \
.withColumn("amount", col("quantity") * col("unit_price")) \
.withColumn("tax", col("amount") * 0.08) \
.withColumn("total", col("amount") + col("tax")) \
.withColumn("customer_domain", regexp_extract(col("email"), "@(.+)$", 1)) \
.filter(col("amount") > 0) \
.filter(col("email").isNotNull())
# Aggregate by product category
summary_df = transformed_df \
.groupBy("order_date", "product_category") \
.agg(
count("*").alias("order_count"),
sum("total").alias("total_revenue"),
avg("total").alias("avg_order_value"),
countDistinct("customer_email").alias("unique_customers")
)
# LOAD: Write to BigQuery
summary_df.write \
.format("bigquery") \
.option("table", "project.analytics.sales_summary") \
.option("temporaryGcsBucket", "my-temp-bucket") \
.mode("overwrite") \
.save()
# Also write detailed data to GCS as Parquet
transformed_df.write \
.mode("overwrite") \
.partitionBy("order_date") \
.parquet("gs://my-data-lake/silver/sales/")
spark.stop()
β¨
Best Practice: For Dataproc ETL, use dynamic allocation and adaptive query execution. Write to both BigQuery (for analytics) and GCS (for reprocessing). Use partitionBy for efficient data pruning. Always validate data quality before loading.
Dataflow Beam ETL
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery, BigQueryDisposition
import json
class TransformSalesData(beam.DoFn):
"""Transform raw sales data."""
def process(self, element):
try:
record = json.loads(element)
yield {
'order_id': record['order_id'],
'customer_id': record['customer_id'],
'product_category': record['product_category'],
'amount': float(record['quantity']) * float(record['unit_price']),
'order_date': record['order_date'],
'processed_at': datetime.utcnow().isoformat()
}
except (KeyError, ValueError):
return
def run_etl_pipeline():
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-data-lake/temp/',
'--machine_type', 'n1-standard-4',
'--max_num_workers', '10'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read' >> beam.io.ReadFromText('gs://my-data-lake/bronze/sales/*.json')
| 'Transform' >> beam.ParDo(TransformSalesData())
| 'Write BQ' >> WriteToBigQuery(
'my-project:analytics.sales',
schema='order_id:STRING,customer_id:STRING,product_category:STRING,amount:FLOAT64,order_date:TIMESTAMP',
write_disposition=BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == '__main__':
run_etl_pipeline()
Common Interview Questions
Q1: When would you choose Dataproc over Dataflow for ETL?
Answer: Dataproc is better for existing Spark/Hadoop workloads, complex ML pipelines, or when you need cluster control. Dataflow is better for new development, unified batch/streaming, or when you want serverless operation. Dataproc provides more flexibility; Dataflow eliminates management overhead.
Q2: How do you implement error handling in ETL pipelines?
Answer: 1) Use try/catch blocks in transforms, 2) Implement dead-letter queues for failed records, 3) Use quality checks at each stage, 4) Log errors to Cloud Logging, 5) Implement retry logic for transient failures, 6) Set up alerts for error rates.
Q3: What is the difference between ETL and ELT?
Answer: ETL transforms data before loading into the target system. ELT loads raw data first, then transforms within the target. ELT is common with BigQuery because it separates storage and compute. Use ETL for complex transformations, ELT for simple transformations leveraging BigQuery's compute.
Q4: How do you optimize ETL pipeline costs?
Answer: 1) Use preemptible VMs for batch processing, 2) Enable autoscaling, 3) Use FlexRS for non-urgent jobs, 4) Right-size machine types, 5) Implement incremental processing, 6) Use columnar formats (Parquet) for efficiency.
Q5: How do you test ETL pipelines?
Answer: 1) Unit test transforms with sample data, 2) Integration test with small datasets, 3) Use data quality frameworks (Great Expectations), 4) Implement schema validation, 5) Test error handling paths, 6) Monitor production metrics.