EMR Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AMAZON EMR ARCHITECTURE β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EMR CLUSTER (VPC) β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β MASTER NODE (1) β β β
β β β β’ Cluster management β β β
β β β β’ Resource allocation β β β
β β β β’ Application history server β β β
β β β Instance: m5.xlarge (4 vCPU, 16 GB) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββΌββββββββββββββββ β β
β β βΌ βΌ βΌ β β
β β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β β
β β β CORE NODES (N) β β CORE NODES (N) β β CORE NODES (N) β β β
β β β β’ HDFS storage β β β’ HDFS storage β β β’ HDFS storage β β β
β β β β’ Task execution β β β’ Task execution β β β’ Task execution β β β
β β β β’ YARN NodeMan. β β β’ YARN NodeMan. β β β’ YARN NodeMan. β β β
β β β Instance: r5.2xl β β Instance: r5.2xl β β Instance: r5.2xl β β β
β β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β β
β β β β β
β β βββββββββββββββββΌββββββββββββββββ β β
β β βΌ βΌ βΌ β β
β β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β β
β β β TASK NODES (M) β β TASK NODES (M) β β TASK NODES (M) β β β
β β β β’ Task execution β β β’ Task execution β β β’ Task execution β β β
β β β β’ No HDFS β β β’ No HDFS β β β’ No HDFS β β β
β β β β’ Spot instances β β β’ Spot instances β β β’ Spot instances β β β
β β β Instance: c5.4xl β β Instance: c5.4xl β β Instance: c5.4xl β β β
β β βββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SOFTWARE STACK β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β β β Spark β β Hadoop β β Hive β β HBase β β β
β β β 3.3.x β β 3.3.x β β 3.1.x β β 2.5.x β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β β β Presto β β HUE β β Livy β β Jupyter β β β
β β β 0.277 β β 4.10 β β 0.7 β β Notebook β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
EMR Cluster Configuration
EMR on EC2 Configuration
import boto3
emr = boto3.client('emr')
cluster_config = {
'Name': 'data-processing-cluster',
'ReleaseLabel': 'emr-6.15.0',
'Applications': [
{'Name': 'Spark'},
{'Name': 'Hive'},
{'Name': 'HBase'},
{'Name': 'JupyterEnterpriseGateway'}
],
'Instances': {
'MasterInstanceType': 'm5.2xlarge',
'SlaveInstanceType': 'r5.2xlarge',
'InstanceCount': 3,
'Ec2KeyName': 'my-key-pair',
'Ec2SubnetId': 'subnet-12345678',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': True,
'Placement': {
'AvailabilityZones': ['us-east-1a', 'us-east-1b', 'us-east-1c']
},
'Ec2Attributes': {
'EmrManagedMasterSecurityGroup': 'sg-master',
'EmrManagedSlaveSecurityGroup': 'sg-slave',
'ServiceAccessSecurityGroup': 'sg-service'
}
},
'Configurations': [
{
'Classification': 'spark-defaults',
'Properties': {
'spark.dynamicAllocation.enabled': 'true',
'spark.shuffle.service.enabled': 'true',
'spark.sql.shuffle.partitions': '200',
'spark.default.parallelism': '200',
'spark.executor.memory': '8g',
'spark.executor.cores': '4',
'spark.driver.memory': '4g',
'spark.executor.instances': '4',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.sql.adaptive.skewJoin.enabled': 'true'
}
},
{
'Classification': 'yarn-site',
'Properties': {
'yarn.nodemanager.resource.memory-mb': '24576',
'yarn.scheduler.maximum-allocation-mb': '24576'
}
},
{
'Classification': 'hdfs-site',
'Properties': {
'dfs.replication': '3',
'dfs.blocksize': '134217728'
}
}
],
'ServiceRole': 'EMR_DefaultRole',
'JobFlowRole': 'EMR_EC2_DefaultRole',
'Tags': [
{'Key': 'Environment', 'Value': 'production'},
{'Key': 'Team', 'Value': 'data-engineering'}
],
'AutoScalingRole': 'EMR_AutoScaling_DefaultRole',
'ScaleDownBehavior': 'TERMINATE_AT_INSTANCE_HOUR'
}
response = emr.run_job_flow(**cluster_config)
cluster_id = response['JobFlowId']
print(f"Cluster created: {cluster_id}")
EMR Serverless Configuration
import boto3
emr_serverless = boto3.client('emr-serverless')
# Create EMR Serverless application
app_config = {
'name': 'data-processing-serverless',
'releaseLabel': 'emr-6.15.0',
'type': 'SPARK',
'runtimeConfiguration': [
{
'classification': 'spark-defaults',
'properties': {
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true'
}
}
],
'initialCapacity': {
'SPARK': {
'workerCount': 5,
'workerConfiguration': {
'cpu': '4 vCPU',
'memory': '16 GB',
'disk': '20 GB'
}
}
},
'maximumCapacity': {
'cpu': '200 vCPU',
'memory': '800 GB',
'disk': '2000 GB'
},
'tags': {
'Environment': 'production',
'Team': 'data-engineering'
}
}
response = emr_serverless.create_application(**app_config)
app_id = response['applicationId']
print(f"EMR Serverless app created: {app_id}")
βΉοΈ
Pro Tip: Use EMR Serverless for unpredictable workloads. It automatically scales resources based on demand and you only pay for the compute time used.
Spark on EMR Optimization
Dynamic Allocation
# Spark configuration for dynamic allocation
spark_config = {
'spark.dynamicAllocation.enabled': 'true',
'spark.shuffle.service.enabled': 'true',
'spark.dynamicAllocation.minExecutors': '2',
'spark.dynamicAllocation.maxExecutors': '100',
'spark.dynamicAllocation.executorIdleTimeout': '60s',
'spark.dynamicAllocation.schedulerBacklogTimeout': '1s',
'spark.dynamicAllocation.sustainedSchedulerBacklogTimeout': '1s'
}
Shuffle Optimization
# Shuffle optimization for large datasets
shuffle_config = {
'spark.sql.shuffle.partitions': '200',
'spark.sql.adaptive.enabled': 'true',
'spark.sql.adaptive.coalescePartitions.enabled': 'true',
'spark.sql.adaptive.skewJoin.enabled': 'true',
'spark.sql.adaptive.skewJoin.skewedPartitionFactor': '5',
'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes': '256MB',
'spark.shuffle.compress': 'true',
'spark.shuffle.spill.compress': 'true',
'spark.io.compression.codec': 'snappy'
}
Memory Management
# Memory optimization
memory_config = {
'spark.executor.memory': '16g',
'spark.executor.memoryFraction': '0.8',
'spark.executor.memoryOverhead': '4g',
'spark.driver.memory': '8g',
'spark.driver.memoryOverhead': '2g',
'spark.memory.fraction': '0.8',
'spark.memory.storageFraction': '0.3'
}
EMR Cost Optimization
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EMR COST OPTIMIZATION STRATEGIES β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 1. INSTANCE SELECTION β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β Master: m5.xlarge (on-demand) β β β
β β β Core: r5.2xlarge (reserved for 1yr = 40% savings) β β β
β β β Task: c5.4xlarge (spot instances = 60-90% savings) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 2. SPOT INSTANCES FOR TASK NODES β β
β β β β
β β Benefits: β β
β β β’ 60-90% cost reduction β β
β β β’ Suitable for fault-tolerant batch processing β β
β β β’ Can handle interruptions gracefully β β
β β β β
β β Configuration: β β
β β β’ Use Spot Fleet with multiple instance types β β
β β β’ Set max price to On-Demand price β β
β β β’ Enable graceful Decommissioning β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 3. AUTO SCALING β β
β β β β
β β Scale-out: β β
β β β’ CPU > 80% for 5 minutes β β
β β β’ Add 2 task nodes β β
β β β β
β β Scale-in: β β
β β β’ CPU < 30% for 10 minutes β β
β β β’ Remove 1 task node β β
β β β β
β β Constraints: β β
β β β’ Min: 2 task nodes β β
β β β’ Max: 20 task nodes β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β 4. EMR SERVERLESS β β
β β β β
β β Benefits: β β
β β β’ No cluster management overhead β β
β β β’ Pay only for compute time β β
β β β’ Automatic scaling β β
β β β’ Great for variable workloads β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
EMR Step Types
Spark Step
# Submit Spark job as EMR step
step = {
'Name': 'Process Data',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'spark-submit',
'--deploy-mode', 'cluster',
'--conf', 'spark.yarn.maxAppAttempts=3',
'--conf', 'spark.dynamicAllocation.enabled=true',
's3://my-bucket/scripts/process_data.py',
'--input', 's3://my-bucket/raw/',
'--output', 's3://my-bucket/processed/'
]
}
}
emr.add_job_flow_steps(
JobFlowId='j-1234567890abcdef0',
Steps=[step]
)
Hive Step
# Submit Hive query as EMR step
hive_step = {
'Name': 'Run Hive ETL',
'ActionOnFailure': 'CONTINUE',
'HiveJarStep': {
'Jar': 'command-runner.jar',
'Args': ['hive', '-f', 's3://my-bucket/scripts/etl.hql']
}
}
EMR Instance Types
| Category | Instance | vCPU | Memory | Use Case |
|---|---|---|---|---|
| Master | m5.xlarge | 4 | 16 GB | Cluster management |
| Core | r5.2xlarge | 8 | 64 GB | HDFS + compute |
| Task | c5.4xlarge | 16 | 32 GB | Compute only |
| Task | r5.xlarge | 4 | 32 GB | Memory-intensive |
| Task | i3.2xlarge | 8 | 61 GB | Storage-intensive |
β οΈ
Cost Warning: EMR clusters can become expensive quickly. Always use auto-scaling and spot instances for task nodes. Monitor cluster utilization and terminate idle clusters.
Interview Questions & Answers
Q1: What is the difference between EMR on EC2 and EMR Serverless?
Answer:
- EMR on EC2: You manage cluster infrastructure, instance types, scaling
- EMR Serverless: AWS manages infrastructure, automatic scaling
Use EMR on EC2 for predictable workloads with specific instance requirements. Use EMR Serverless for variable workloads or when you want to eliminate cluster management.
Q2: How does dynamic allocation work in Spark on EMR?
Answer: Dynamic allocation adjusts executor count based on workload:
- Scale-out: When tasks are pending in the queue
- Scale-in: When executors are idle for a timeout period
- Requirements: External shuffle service must be enabled
Benefits: Better resource utilization, cost savings for variable workloads.
Q3: What is the benefit of using spot instances for EMR task nodes?
Answer:
- Cost Savings: 60-90% reduction vs. On-Demand
- Fault Tolerance: Task nodes can be interrupted without data loss
- Best Practice: Use with graceful decommissioning and dynamic allocation
Q4: How do you optimize Spark shuffle on EMR?
Answer:
- Partitioning: Set
spark.sql.shuffle.partitionsbased on data size - Adaptive Query Execution: Enable
spark.sql.adaptive.enabled - Compression: Use Snappy or Zstd for shuffle files
- Memory: Allocate sufficient memory for shuffle operations
- Skew Handling: Enable
spark.sql.adaptive.skewJoin.enabled
Q5: When should you use EMR vs. AWS Glue?
Answer:
- EMR: Large datasets, complex Spark workloads, long-running jobs, custom configurations
- Glue: Serverless, short-running jobs, simpler ETL, no cluster management
EMR is better for petabyte-scale processing. Glue is better for scheduled ETL jobs that complete in hours.
Cost Considerations
| Component | Cost | Optimization |
|---|---|---|
| EMR on EC2 | EC2 + EMR fee ($0.52/hr per node) | Use reserved instances |
| EMR Serverless | $0.52 per vCPU-hour | Auto-scaling |
| Master Node | Always on-demand | Use smallest instance |
| Core Nodes | Reserved instances | 1-year commitment |
| Task Nodes | Spot instances | 60-90% savings |
| HDFS Storage | Included with EC2 | Use S3 for persistence |
βΉοΈ
EMR Pricing: EMR charges 374/month.
Summary
Amazon EMR is the go-to service for large-scale data processing. Key takeaways:
- Cluster Types: EC2 (managed) vs. Serverless (fully managed)
- Instance Roles: Master (management), Core (storage+compute), Task (compute only)
- Optimization: Dynamic allocation, adaptive query execution, shuffle tuning
- Cost: Use spot instances for task nodes, reserved for core nodes
- Serverless: Best for variable workloads, eliminates cluster management
- Software: Spark, Hive, HBase, Presto, and more