Batch Processing Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS Batch Analytics Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β S3 βββββΆβ EMR βββββΆβ Glue βββββΆβ Redshift β β
β β Data β β Spark β β ETL β β DW β β
β β Lake β β Cluster β β Jobs β β β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Athena β β DynamoDBβ β SQS β β QuickSightβ β
β β Query β β Metadataβ β Queue β β Viz β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q1: How do you design a fault-tolerant batch processing pipeline on AWS?
Answer:
A fault-tolerant batch pipeline requires multiple layers of protection:
1. Checkpointing Strategy:
# EMR Spark checkpointing
spark.conf.set("spark.streaming.streamingContext.checkpointDirectory",
"s3://bucket/checkpoints/")
# Glue bookmarking
glueContext.create_dynamic_frame.from_catalog(
database="mydb",
table_name="mytable",
transformation_ctx="datasource0" # Enables job bookmarking
)
2. Dead Letter Queue Pattern:
Source β SQS Main Queue β Processing β Success β S3
β
Failure β DLQ β Alert β Manual Review
3. Retry Configuration:
# Step Functions retry logic
{
"Type": "Task",
"Resource": "arn:aws:states:::elasticmapreduce:addStep.sync",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed"],
"IntervalSeconds": 60,
"MaxAttempts": 3,
"BackoffRate": 2.0
}
]
}
βΉοΈ
Key Interview Point: Always mention idempotency when discussing fault tolerance. Each batch job should produce the same result even if executed multiple times.
Q2: Compare EMR vs Glue for batch processing. When would you choose each?
Answer:
| Feature | EMR | Glue |
|---|---|---|
| Control | Full cluster control | Serverless, managed |
| Cost Model | EC2 instances | DPU-hours |
| Customization | Custom JARs, bootstrap | PySpark/Scala only |
| Use Case | Complex, long-running | Standard ETL |
Choose EMR when:
- Running custom Spark/Flink applications
- Need specific instance types (GPU, memory-optimized)
- Long-running clusters (hours/days)
- Require fine-grained tuning
Choose Glue when:
- Standard ETL transformations
- Want serverless, auto-scaling
- Need built-in data catalog
- Short-running jobs (minutes)
Decision Tree:
βββββββββββββββββββ
β Need custom JAR? β
ββββββββββ¬βββββββββ
β
ββββββββββββββββ΄βββββββββββββββ
βΌ βΌ
Yes No
β β
βΌ βΌ
βββββββββββ βββββββββββββββββββ
β EMR β β Need real-time? β
βββββββββββ ββββββββββ¬βββββββββ
β
ββββββββββββββββ΄βββββββββββββββ
βΌ βΌ
Yes No
β β
βΌ βΌ
βββββββββββ βββββββββββ
βKinesis/ β β Glue β
β MSK β βββββββββββ
βββββββββββ
Q3: How do you optimize Spark jobs on EMR for better performance?
Answer:
1. Instance Selection:
# Use Graviton instances for cost savings
aws emr create-cluster --instance-groups \
InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m6g.xlarge \
InstanceGroupType=CORE,InstanceCount=4,InstanceType=m6g.2xlarge
2. Spark Configuration Tuning:
// Optimal Spark settings for batch
spark.sql.shuffle.partitions=200 // Default 200, adjust based on data
spark.executor.memory=8g
spark.executor.cores=4
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.minExecutors=2
spark.dynamicAllocation.maxExecutors=50
3. Data Format Optimization:
# Use Parquet with Snappy compression
df.write.partitionBy("date").parquet("s3://bucket/output/")
# Optimal partition size: 128MB - 1GB per file
4. Caching Strategy:
# Cache frequently accessed DataFrames
df.cache() # or df.persist(StorageLevel.MEMORY_AND_DISK)
β οΈ
Common Mistake: Over-partitioning data. Too many small files cause overhead. Aim for 128MB-1GB partitions.
Q4: Explain the Glue Job Bookmark mechanism and its benefits.
Answer:
Glue Bookmarks track processed data to enable incremental processing:
# Enable bookmarking (default behavior)
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="mydb",
table_name="mytable",
transformation_ctx="datasource0" # Bookmark context
)
# Process only new data
apply_mapping = ApplyMapping.apply(
frame=datasource0,
transformation_ctx="apply_mapping"
)
sink = glueContext.write_dynamic_frame.from_catalog(
frame=apply_mapping,
database="outputdb",
table_name="outputtable",
transformation_ctx="sink0"
)
Bookmark States:
Job Start β Read Bookmark β Filter New Data β Process β Write β Update Bookmark
β β
ββββββββββββββββββββββββββββββββββββββββββββ
Benefits:
- Eliminates duplicate processing
- Reduces runtime for incremental loads
- No need for custom watermark tracking
- Automatic state management
Q5: How do you handle schema evolution in batch pipelines?
Answer:
1. Glue Schema Evolution:
# Enable schema evolution in Glue
sink = glueContext.write_dynamic_frame.from_catalog(
frame=apply_mapping,
database="outputdb",
table_name="outputtable",
enable_update_catalog=True,
update_catalog_options={
"updateBehavior": "UPDATE_IN_DATABASE",
"schemaChangePolicy": "LOG" # or "UPDATE"
}
)
2. Spark Schema Evolution:
# Read with merge schema option
df = spark.read \
.option("mergeSchema", "true") \
.parquet("s3://bucket/data/")
# Write with overwrite schema
df.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.parquet("s3://bucket/output/")
3. Schema Registry with Glue:
# Register schema in Glue Data Catalog
import boto3
glue = boto3.client('glue')
glue.register_schema_version(
SchemaId={
'RegistryName': 'my-registry',
'SchemaName': 'my-schema'
},
SchemaVersionNumber={
'LatestVersion': True
}
)
Schema Evolution Flow:
ββββββββββββ ββββββββββββ ββββββββββββ
β Source ββββββΆβ Schema ββββββΆβ Target β
β Schema β β Registry β β Schema β
ββββββββββββ ββββββββββββ ββββββββββββ
β β β
βΌ βΌ βΌ
Compare βββΆ Compatible? βββΆ Auto Merge
β
Incompatible
β
βΌ
Alert & Stop
Q6: Design a data quality framework for batch processing on AWS.
Answer:
Multi-Layer Quality Framework:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Data Quality Framework β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Layer 1: Schema Validation β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Column existence β’ Data types β’ Null checks β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Layer 2: Statistical Validation β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Range checks β’ Distribution β’ Outlier detection β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Layer 3: Business Rules β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Referential integrity β’ Cross-column logic β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β Layer 4: Completeness β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β’ Record counts β’ Freshness β’ Coverage β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Implementation with Glue:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, when
def validate_data(df: DataFrame, rules: dict) -> dict:
results = {}
# Schema validation
for col_name, expected_type in rules.get('schema', {}).items():
results[f'schema_{col_name}'] = (
df.schema[col_name].dataType == expected_type
)
# Null checks
for col_name in rules.get('not_null', []):
null_count = df.filter(col(col_name).isNull()).count()
results[f'null_{col_name}'] = null_count == 0
# Range checks
for col_name, (min_val, max_val) in rules.get('range', {}).items():
out_of_range = df.filter(
(col(col_name) < min_val) | (col(col_name) > max_val)
).count()
results[f'range_{col_name}'] = out_of_range == 0
return results
Q7: How do you implement idempotency in AWS batch jobs?
Answer:
Idempotency Patterns:
1. Deterministic Output Paths:
# Use partition keys for deterministic paths
output_path = f"s3://bucket/output/year={year}/month={month}/day={day}/"
df.write.mode("overwrite").partitionBy("year", "month", "day").parquet(output_path)
2. Upsert Pattern with DynamoDB:
import boto3
from boto3.dynamodb.conditions import Key
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('processed_records')
def upsert_record(record):
table.put_item(
Item=record,
ConditionExpression='attribute_not_exists(record_id) OR updated_at < :ts',
ExpressionAttributeValues={':ts': record['updated_at']}
)
3. Glue Bookmark Reset:
# Reset bookmark for reprocessing
aws glue reset-job-bookmark --job-name my-job
4. S3 Versioning for Rollback:
# Enable versioning for safe overwrites
s3 = boto3.client('s3')
s3.put_bucket_versioning(
Bucket='my-bucket',
VersioningConfiguration={'Status': 'Enabled'}
)
β οΈ
Critical: Always design batch jobs to be re-runnable. If a job fails midway, re-running should not create duplicates or corrupt data.
Q8: Explain the cost optimization strategies for EMR clusters.
Answer:
1. Instance Fleet with Spot:
aws emr create-cluster --instance-fleets '[
{
"InstanceFleetType": "MASTER",
"TargetOnDemandCapacity": 1,
"InstanceTypeConfigs": [{"InstanceType": "m5.xlarge"}]
},
{
"InstanceFleetType": "CORE",
"TargetOnDemandCapacity": 2,
"TargetSpotCapacity": 8,
"InstanceTypeConfigs": [
{"InstanceType": "m5.xlarge", "WeightedCapacity": 1},
{"InstanceType": "m5.2xlarge", "WeightedCapacity": 2}
]
}
]'
2. Auto-scaling Configuration:
{
"ScaleOut": {
"MetricName": "YARNMemoryAvailablePercentage",
"TargetValue": 75.0,
"ScaleOutCooldown": 300
},
"ScaleIn": {
"MetricName": "YARNMemoryAvailablePercentage",
"TargetValue": 30.0,
"ScaleInCooldown": 600
}
}
3. S3 Storage Classes:
# Move processed data to cheaper storage
s3 = boto3.client('s3')
s3.put_object_tagging(
Bucket='my-bucket',
Key='processed/data.parquet',
Tagging={'TagSet': [{'Key': 'tier', 'Value': 'archive'}]}
)
# Lifecycle policy moves to Glacier after 30 days
Cost Comparison:
βββββββββββββββββββ¬βββββββββββββ¬βββββββββββββ¬ββββββββββββββ
β Instance Type β On-Demand β 1yr RI β Spot β
βββββββββββββββββββΌβββββββββββββΌβββββββββββββΌββββββββββββββ€
β m5.xlarge β $0.192/hr β $0.121/hr β $0.058/hr β
β m5.2xlarge β $0.384/hr β $0.242/hr β $0.116/hr β
β r5.xlarge β $0.252/hr β $0.159/hr β $0.076/hr β
βββββββββββββββββββ΄βββββββββββββ΄βββββββββββββ΄ββββββββββββββ
Q9: How do you monitor and alert on batch job failures?
Answer:
CloudWatch Metrics + Alarms:
import boto3
cloudwatch = boto3.client('cloudwatch')
# Create alarm for Glue job failures
cloudwatch.put_metric_alarm(
AlarmName='GlueJobFailure',
MetricName='Failed',
Namespace='AWS/Glue',
Statistic='Sum',
Period=300,
EvaluationPeriods=1,
Threshold=1,
ComparisonOperator='GreaterThanOrEqualToThreshold',
AlarmActions=['arn:aws:sns:us-east-1:123456789:alerts']
)
Step Functions Error Handling:
{
"Type": "Parallel",
"Branches": [
{
"StartAt": "ProcessData",
"States": {
"ProcessData": {
"Type": "Task",
"Resource": "arn:aws:states:::emr:addStep.sync",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"Next": "NotifyFailure",
"ResultPath": "$.error"
}
]
},
"NotifyFailure": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "arn:aws:sns:us-east-1:123456789:alerts",
"Message.$": "$.error"
}
}
}
}
]
}
Dashboard Setup:
CloudWatch Dashboard: Batch Monitoring
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Jobs Running β Jobs Failed β Avg Duration β Cost Today β
β 12 β 1 β 45 min β $234 β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β [Chart: Job Duration Trend] β
β [Chart: Error Rate by Job Type] β
β [Chart: Cost by Cluster] β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q10: Describe the architecture for a petabyte-scale data warehouse on AWS.
Answer:
Architecture Components:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Petabyte-Scale Data Warehouse Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Ingestion Layer Processing Layer Storage β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββββββΆβ EMR Spark ββββββΆβ S3 Data Lakeβ β
β β Streams β β β β (Parquet) β β
β βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ β
β β β
β βββββββββββββββ βββββββββββββββ β β
β β DMS ββββββββββΆβ Glue ETL ββββββββββββββ€ β
β β (CDC) β β β β β
β βββββββββββββββ βββββββββββββββ β β
β βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β SFTP/FTP ββββββββββΆβ Lambda ββββββΆβ Redshift β β
β β β β Parser β β Spectrum β β
β βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ β
β β β
β ββββββΌββββββ β
β βQuickSightβ β
β β Athena β β
β ββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Redshift Configuration for Petabyte Scale:
-- Create distkey and sortkey for optimal performance
CREATE TABLE fact_orders (
order_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
order_date TIMESTAMP NOT NULL,
amount DECIMAL(10,2),
status VARCHAR(20)
)
DISTSTYLE KEY
DISTKEY(customer_id)
COMPOUND SORTKEY(order_date, customer_id);
-- Enable auto vacuum
VACUUM SORT ONLY fact_orders;
-- Analyze compression
ANALYZE COMPRESSION fact_orders;
Data Distribution Strategy:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Redshift Distribution Styles β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β KEY Distribution EVEN Distribution β
β βββββ¬ββββ¬ββββ βββββ¬ββββ¬ββββ β
β β A β A β B β β 1 β 2 β 3 β β
β β A β A β B β β 4 β 5 β 6 β β
β β B β B β A β β 7 β 8 β 9 β β
β βββββ΄ββββ΄ββββ βββββ΄ββββ΄ββββ β
β (Join columns) (Even spread) β
β β
β ALL Distribution β
β βββββ¬ββββ¬ββββ β
β β X β X β X β (Small tables - replicated) β
β β X β X β X β β
β βββββ΄ββββ΄ββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q11: How do you implement data partitioning strategies for batch analytics?
Answer:
Partitioning Principles:
# Optimal partition size: 128MB - 1GB per file
# Calculate partitions based on data volume
# For 1TB daily data, target ~1000 partitions
daily_partitions = total_bytes / target_partition_size
# 1TB / 1GB = 1024 partitions
# Partition by date and region
df.write \
.partitionBy("year", "month", "day", "region") \
.parquet("s3://bucket/data/")
Partition Pruning:
-- Effective partition pruning
SELECT * FROM events
WHERE year = 2024 AND month = 1 AND day = 15;
-- Ineffective - full scan
SELECT * FROM events
WHERE month = 1;
Partition Strategy Decision Tree:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Partitioning Decision Tree β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Query Pattern Analysis β
β βββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β What columns are most frequently filtered? β β
β ββββββββββββββββββββββββ¬βββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββΌββββββββββββββββ β
β βΌ βΌ βΌ β
β Time-based Category-based Hybrid β
β (date/hour) (region/type) (date+region) β
β β β β β
β βΌ βΌ βΌ β
β High cardinality Low cardinality Balanced β
β (hourly) (10-100 values) (daily+region) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q12: Explain the ETL vs ELT paradigm in AWS data pipelines.
Answer:
ETL (Extract, Transform, Load):
Source βββΆ Extract βββΆ Transform βββΆ Load βββΆ Target
β
(in processing
engine: Glue/EMR)
ELT (Extract, Load, Transform):
Source βββΆ Extract βββΆ Load βββΆ Transform βββΆ Target
β β
(raw data (in target
to S3) warehouse)
When to Use Each:
| Scenario | ETL | ELT |
|---|---|---|
| Small datasets (<100GB) | β | |
| Large datasets (>1TB) | β | |
| Complex transformations | β | |
| Simple transformations | β | |
| Need raw dataδΏη | β | |
| Cost-sensitive | β |
AWS Implementation:
# ETL with Glue
# Transform before loading to Redshift
source = glueContext.create_dynamic_frame.from_catalog(...)
transformed = ApplyMapping.apply(frame=source, ...)
glueContext.write_dynamic_frame.from_jdbc_conf(
frame=transformed,
catalog_connection="redshift",
connection_options={"dbtable": "fact_table", "database": "dw"}
)
# ELT with Redshift Spectrum
# Load raw to S3, transform in Redshift
# 1. Load raw to S3
raw_df.write.parquet("s3://bucket/raw/")
# 2. Transform in Redshift
"""
CREATE TABLE fact_table AS
SELECT * FROM raw_data
WHERE is_valid = true;
"""
Q13: How do you handle late-arriving data in batch processing?
Answer:
Late Data Strategies:
1. Watermark-Based Processing:
# Spark Structured Streaming with watermark
streaming_df \
.withWatermark("event_time", "1 hour") \
.groupBy(
window("event_time", "1 hour"),
"category"
) \
.count()
2. Lambda Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Lambda Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Real-time Layer Batch Layer β
β βββββββββββββββ βββββββββββββββ β
β β Kinesis β β S3 + EMR β β
β β Streams β β Daily Job β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β βΌ βΌ β
β βββββββββββββββ βββββββββββββββ β
β β Real-time β β Batch β β
β β Views β β Views β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β
β ββββββββββββ¬βββββββββββ β
β βΌ β
β βββββββββββββββ β
β β Serving β β
β β Layer β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Backfill Mechanism:
# Process late data by re-running specific partitions
def process_date_range(start_date, end_date):
for date in date_range(start_date, end_date):
# Reprocess specific partition
process_partition(date)
# Schedule backfill via Step Functions
{
"Type": "Map",
"ItemsProcessor": {
"Iterator": {
"StartAt": "ProcessPartition",
"States": {
"ProcessPartition": {
"Type": "Task",
"Resource": "processLambda"
}
}
}
}
}
Q14: Design a metadata-driven batch processing framework.
Answer:
Metadata Store Design:
-- Glue Data Catalog or custom DynamoDB table
CREATE TABLE job_config (
job_id VARCHAR(50) PRIMARY KEY,
source_type VARCHAR(20),
source_path VARCHAR(500),
target_path VARCHAR(500),
transform_type VARCHAR(50),
schedule VARCHAR(20),
enabled BOOLEAN,
last_run TIMESTAMP,
next_run TIMESTAMP
);
Dynamic Job Execution:
import json
import boto3
def lambda_handler(event, context):
dynamodb = boto3.resource('dynamodb')
config_table = dynamodb.Table('job_config')
# Get enabled jobs
response = config_table.scan(
FilterExpression='enabled = :val',
ExpressionAttributeValues={':val': True}
)
for job in response['Items']:
execute_job(job)
def execute_job(job_config):
# Dynamic ETL based on configuration
if job_config['transform_type'] == 'aggregation':
run_aggregation_job(job_config)
elif job_config['transform_type'] == 'join':
run_join_job(job_config)
elif job_config['transform_type'] == 'filter':
run_filter_job(job_config)
Orchestration with Step Functions:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Metadata-Driven Orchestration β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β DynamoDB β β
β β Config β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Lambda β β
β β Dispatcher β β
β ββββββββ¬βββββββ β
β β β
β ββββββ΄βββββ¬βββββββββ¬βββββββββ β
β βΌ βΌ βΌ βΌ β
β βββββββ βββββββ βββββββ βββββββ β
β βJob Aβ βJob Bβ βJob Cβ βJob Dβ β
β βββββββ βββββββ βββββββ βββββββ β
β β β β β β
β βββββββββββ΄βββββββββ΄βββββββββ β
β β β
β βΌ β
β βββββββββββββββ β
β β Status β β
β β Update β β
β βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q15: How do you implement data lineage tracking in batch pipelines?
Answer:
Lineage Tracking Components:
# Custom lineage tracker
class LineageTracker:
def __init__(self, job_name):
self.job_name = job_name
self.lineage = {
'job': job_name,
'inputs': [],
'outputs': [],
'transformations': [],
'timestamp': datetime.now().isoformat()
}
def add_input(self, source, record_count):
self.lineage['inputs'].append({
'source': source,
'record_count': record_count
})
def add_output(self, target, record_count):
self.lineage['outputs'].append({
'target': target,
'record_count': record_count
})
def save_lineage(self):
# Store in DynamoDB or Glue
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('data_lineage')
table.put_item(Item=self.lineage)
AWS Native Lineage:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS Lineage Tracking β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Glue Data Catalog βββββββΆ Table/Partition History β
β β β
β ββββΆ Column lineage β
β ββββΆ Job run history β
β ββββΆ Transformation details β
β β
β CloudTrail ββββββββββββββΆ API call lineage β
β β β
β ββββΆ S3, EMR, Glue operations β
β β
β CloudWatch ββββββββββββββΆ Job metrics lineage β
β β β
β ββββΆ Execution flow β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βΉοΈ
Interview Tip: Mention AWS Lake Formation for centralized lineage tracking. It provides column-level lineage across AWS analytics services.
Q16: Explain the concept of data skew and how to handle it in Spark.
Answer:
Data Skew Detection:
# Detect skew by checking partition sizes
df.groupBy("key").count().orderBy("count", ascending=False).show()
# If some partitions are much larger, you have skew
Handling Skew:
1. Salting Technique:
# Add random salt to skewed key
from pyspark.sql.functions import rand, lit
# Salt the skewed key
salted_df = df.withColumn("salt", (rand() * 10).cast("int"))
salted_df = salted_df.withColumn("salted_key",
concat(col("key"), lit("_"), col("salt")))
# Repartition and join
repartitioned = salted_df.repartition(200, "salted_key")
2. Broadcast Join for Small Tables:
from pyspark.sql.functions import broadcast
# Broadcast small table to avoid shuffle
result = large_df.join(broadcast(small_df), "key")
3. AQE (Adaptive Query Execution):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Q17: How do you implement a data warehouse slowly changing dimension (SCD) on AWS?
Answer:
SCD Type 2 Implementation:
-- Redshift SCD Type 2
CREATE TABLE dim_customer_scd (
customer_sk BIGINT IDENTITY(1,1), -- Surrogate key
customer_id INT,
name VARCHAR(100),
email VARCHAR(200),
effective_date TIMESTAMP,
end_date TIMESTAMP,
is_current BOOLEAN
);
-- Merge procedure
MERGE INTO dim_customer_scd t
USING staging_customer s
ON t.customer_id = s.customer_id AND t.is_current = TRUE
WHEN MATCHED AND (
t.name != s.name OR t.email != s.email
) THEN
UPDATE SET
is_current = FALSE,
end_date = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, effective_date, end_date, is_current)
VALUES (s.customer_id, s.name, s.email, CURRENT_TIMESTAMP, NULL, TRUE);
Glue SCD Implementation:
# Use AWS Glue resolveChoice for SCD
from awsglue.transforms import ResolveChoice
# Apply SCD Type 2 logic
apply_scd = DynamicFrame.fromDF(
resolve_choice.resolve_choice(
dynamic_frame=source_df,
choice="matched:columns",
casecade=True
),
glueContext,
"apply_scd"
)
Q18: Describe the architecture for real-time batch aggregation on AWS.
Answer:
Micro-Batch Aggregation Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Real-Time Batch Aggregation β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ β
β β Event Sourcesβ β
β β (IoT/Apps) β β
β ββββββββ¬βββββββ β
β β β
β βΌ β
β βββββββββββββββ βββββββββββββββ β
β β Kinesis ββββββΆβ Lambda β β
β β Data Firehoseβ β Aggregation β β
β βββββββββββββββ ββββββββ¬βββββββ β
β β β
β ββββββββββββββββΌβββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββ βββββββββββ βββββββββββ β
β β 1-min β β 5-min β β 1-hour β β
β β Agg β β Agg β β Agg β β
β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββ β
β β S3 (Partitioned by time) β β
β βββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββ β
β β Athena / Redshift β β
β βββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lambda Aggregation Code:
import json
from datetime import datetime
def lambda_handler(event, context):
# Aggregate records in micro-batch
aggregated = {}
for record in event['Records']:
data = json.loads(record['kinesis']['data'])
key = data['category']
if key not in aggregated:
aggregated[key] = {'count': 0, 'sum': 0}
aggregated[key]['count'] += 1
aggregated[key]['sum'] += data['amount']
# Write aggregated results
for category, metrics in aggregated.items():
write_to_s3(category, metrics)
Q19: How do you optimize SQL queries for large-scale batch analytics?
Answer:
Query Optimization Techniques:
1. Materialized Views:
-- Create materialized view for frequent aggregations
CREATE MATERIALIZED VIEW mv_daily_sales AS
SELECT
date_trunc('day', order_date) as sale_date,
region,
product_category,
SUM(amount) as total_sales,
COUNT(*) as order_count
FROM fact_orders
GROUP BY 1, 2, 3;
-- Refresh periodically
REFRESH MATERIALIZED VIEW mv_daily_sales;
2. Sort Key Optimization:
-- Compound sort key for multi-column filters
CREATE TABLE events (
event_id BIGINT,
event_date TIMESTAMP,
user_id BIGINT,
event_type VARCHAR(50)
)
COMPOUND SORTKEY(event_date, user_id);
-- Interleaved sort key for independent column filters
CREATE TABLE logs (
log_id BIGINT,
timestamp TIMESTAMP,
user_id BIGINT,
action VARCHAR(100)
)
INTERLEAVED SORTKEY(timestamp, user_id, action);
3. Query Execution Plan:
-- Analyze query plan
EXPLAIN
SELECT * FROM events
WHERE event_date = '2024-01-01'
AND user_id = 12345;
Performance Comparison:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Query Performance Optimization β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Without Optimization With Optimization β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β Full table scan β β Partition pruning β β
β β 2.5 hours β β 15 minutes β β
β β $150 cost β β $15 cost β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β
β Techniques Applied: β
β β Sort keys aligned with query patterns β
β β Materialized views for aggregations β
β β Partition pruning β
β β Compression encoding β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Q20: Explain how to implement data quality SLAs in batch processing.
Answer:
SLA Framework:
from datetime import datetime, timedelta
class SLAMonitor:
def __init__(self, job_name, sla_config):
self.job_name = job_name
self.sla_config = sla_config
def check_sla(self, job_metrics):
violations = []
# Check completion time
if job_metrics['end_time'] > self.sla_config['deadline']:
violations.append({
'type': 'COMPLETION_TIME',
'expected': self.sla_config['deadline'],
'actual': job_metrics['end_time']
})
# Check data freshness
freshness = datetime.now() - job_metrics['last_record_time']
if freshness > self.sla_config['max_freshness']:
violations.append({
'type': 'DATA_FRESHNESS',
'expected': self.sla_config['max_freshness'],
'actual': freshness
})
# Check completeness
completeness = job_metrics['processed'] / job_metrics['expected']
if completeness < self.sla_config['min_completeness']:
violations.append({
'type': 'COMPLETENESS',
'expected': self.sla_config['min_completeness'],
'actual': completeness
})
return violations
CloudWatch SLA Metrics:
cloudwatch = boto3.client('cloudwatch')
# Publish SLA metrics
cloudwatch.put_metric_data(
Namespace='BatchAnalytics/SLA',
MetricData=[
{
'MetricName': 'SLAViolations',
'Dimensions': [
{'Name': 'JobName', 'Value': 'daily_etl'},
{'Name': 'ViolationType', 'Value': 'completion_time'}
],
'Value': 1,
'Unit': 'Count'
}
]
)
Q21: How do you implement data replication for disaster recovery in batch systems?
Answer:
Cross-Region Replication Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Disaster Recovery Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Primary Region (us-east-1) DR Region (us-west-2) β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β S3 Bucket βββββββββΆβ S3 Replica Bucket β β
β β (Versioning Enabled)β CRR β β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β Redshift Cluster βββββββββΆβ Redshift Snapshot β β
β β β Cross β Transfer β β
β βββββββββββββββββββββββ Regionβββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β DynamoDB Table βββββββββΆβ DynamoDB Replica β β
β β β DTR β β β
β βββββββββββββββββββββββ βββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
S3 Cross-Region Replication:
# Enable CRR via boto3
s3 = boto3.client('s3')
s3.put_bucket_replication(
Bucket='primary-bucket',
ReplicationConfiguration={
'Role': 'arn:aws:iam::role/s3-replication-role',
'Rules': [
{
'ID': 'replicate-all',
'Status': 'Enabled',
'Destination': {
'Bucket': 'arn:aws:s3:::dr-bucket',
'StorageClass': 'STANDARD_IA'
}
}
]
}
)
Automated Failover:
# Route 53 health check + failover
route53 = boto3.client('route53')
route53.change_resource_record_sets(
HostedZoneId='Z1234567890',
ChangeBatch={
'Changes': [{
'Action': 'UPSERT',
'ResourceRecordSet': {
'Name': 'analytics.example.com',
'Type': 'CNAME',
'SetIdentifier': 'primary',
'Failover': 'PRIMARY',
'TTL': 60,
'ResourceRecords': [{'Value': 'primary-redshift.cluster-xxx.us-east-1.rds.amazonaws.com'}]
}
}]
}
)
Q22: Design a multi-tenant batch processing system on AWS.
Answer:
Multi-Tenant Architecture:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Multi-Tenant Batch Processing β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Tenant A Tenant B Tenant C β
β βββββββββββ βββββββββββ βββββββββββ β
β β Config β β Config β β Config β β
β β Table β β Table β β Table β β
β ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Job Dispatcher (Lambda) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββ βββββββββββ βββββββββββ β
β β EMR β β EMR β β EMR β β
β β Cluster β β Cluster β β Cluster β β
β β (VPC A) β β (VPC B) β β (VPC C) β β
β βββββββββββ βββββββββββ βββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β S3 (Partitioned by tenant_id/year/month/day/) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Tenant Isolation:
# IAM policy for tenant isolation
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:PutObject"],
"Resource": "arn:aws:s3:::data-lake/${aws:PrincipalTag/tenant_id}/*"
}
]
}
Q23: How do you implement data versioning and rollback in batch pipelines?
Answer:
Data Versioning Strategy:
# S3-based versioning with metadata
class DataVersionManager:
def __init__(self, bucket, base_path):
self.bucket = bucket
self.base_path = base_path
self.s3 = boto3.client('s3')
def create_version(self, dataset_name, version_info):
version_id = f"v{version_info['major']}.{version_info['minor']}"
version_path = f"{self.base_path}/{dataset_name}/{version_id}/"
# Write data to versioned path
self.write_data(version_path, version_info['data'])
# Update latest pointer
self.update_pointer(dataset_name, version_id)
return version_id
def rollback(self, dataset_name, target_version):
# Restore data from target version
source_path = f"{self.base_path}/{dataset_name}/{target_version}/"
target_path = f"{self.base_path}/{dataset_name}/latest/"
self.copy_data(source_path, target_path)
self.update_pointer(dataset_name, target_version)
Glue Versioning:
# Use Glue table versions
glue = boto3.client('glue')
# Get table version
response = glue.get_table_version(
DatabaseName='mydb',
TableName='mytable',
VersionId='4'
)
# Restore table version
glue.batch_update_table_version(
DatabaseName='mydb',
TableName='mytable',
VersionDeltas=[{
'VersionId': '4',
'ViewToUpdate': response['TableVersion']['Table']
}]
)
Q24: Explain the testing strategies for batch data pipelines.
Answer:
Testing Pyramid:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Testing Pyramid β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β β±β² β
β β± β² E2E Tests β
β β± E2Eβ² (Step Functions) β
β β±βββββββ² β
β β± β² Integration Tests β
β β±Integrationβ² (Glue + EMR) β
β β±βββββββββββββββ² β
β β± β² Unit Tests β
β β± Unit Tests β² (PySpark functions) β
β β±βββββββββββββββββββββ² β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Unit Testing with PySpark:
import unittest
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
class TestTransformations(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.spark = SparkSession.builder \
.master("local[*]") \
.appName("Unit Tests") \
.getOrCreate()
def test_data_quality_checks(self):
# Create test data
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), True)
])
test_data = [(1, "valid"), (2, None), (3, "")]
df = self.spark.createDataFrame(test_data, schema)
# Apply transformation
result = df.filter(df.name.isNotNull() & (df.name != ""))
# Assert
self.assertEqual(result.count(), 1)
@classmethod
def tearDownClass(cls):
cls.spark.stop()
if __name__ == '__main__':
unittest.main()
Integration Testing:
def test_etl_pipeline():
# Setup
input_data = create_test_input()
upload_to_s3(input_data, 's3://test-bucket/input/')
# Run Glue job
run_glue_job('test-etl-job')
# Verify output
output = read_from_s3('s3://test-bucket/output/')
assert output.count() > 0
assert 'transformed_column' in output.columns
Q25: How do you implement observability in batch processing systems?
Answer:
Observability Stack:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Observability Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β Metrics Layer Logging Layer Tracing Layer β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β CloudWatch β β CloudWatch β β X-Ray β β
β β Metrics β β Logs β β Tracing β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Centralized Dashboard β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Job Health β β Data Qualityβ β Cost β β β
β β β Status β β Metrics β β Tracking β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Alerts β β Anomaly β β Root Cause β β
β β (SNS) β β Detection β β Analysis β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Custom Metrics:
import boto3
from datetime import datetime
class BatchMetrics:
def __init__(self, namespace):
self.cloudwatch = boto3.client('cloudwatch')
self.namespace = namespace
def record_job_metrics(self, job_name, duration, records_processed, success):
self.cloudwatch.put_metric_data(
Namespace=self.namespace,
MetricData=[
{
'MetricName': 'JobDuration',
'Dimensions': [
{'Name': 'JobName', 'Value': job_name}
],
'Value': duration,
'Unit': 'Seconds'
},
{
'MetricName': 'RecordsProcessed',
'Dimensions': [
{'Name': 'JobName', 'Value': job_name}
],
'Value': records_processed,
'Unit': 'Count'
},
{
'MetricName': 'JobSuccess',
'Dimensions': [
{'Name': 'JobName', 'Value': job_name}
],
'Value': 1 if success else 0,
'Unit': 'Count'
}
]
)
Structured Logging:
import json
from datetime import datetime
class StructuredLogger:
def log_job_event(self, event_type, job_name, details):
log_entry = {
'timestamp': datetime.now().isoformat(),
'event_type': event_type,
'job_name': job_name,
'details': details,
'environment': 'production'
}
# CloudWatch Logs Insights compatible format
print(json.dumps(log_entry))
βΉοΈ
Key Interview Point: Always discuss the three pillars of observability: metrics, logs, and traces. Each provides different insights into batch pipeline behavior.
Summary
Mastering AWS batch analytics requires understanding:
- Service Selection: EMR vs Glue vs Redshift based on use case
- Performance Optimization: Partitioning, caching, and resource tuning
- Fault Tolerance: Checkpointing, retries, and idempotency
- Cost Management: Spot instances, reserved capacity, and right-sizing
- Data Quality: Validation frameworks and SLA monitoring
- Observability: Metrics, logging, and tracing across the pipeline
These concepts form the foundation for designing scalable, reliable, and cost-effective batch processing systems on AWS.