Batch Processing Architecture
Splittable DoFn
import apache_beam as beam
from apache_beam.transforms.splittablefn import SplittableDoFn
from apache_beam.transforms.window import TimestampedValue
import datetime
class ReadFilesInParallel(SplittableDoFn):
"""Read and process files in parallel using Splittable DoFn."""
def process(self, element, timestamp=beam.DoFn.TimestampParam):
"""Process a file element."""
file_path = element
# Simulate reading and processing file
yield {
'file': file_path,
'processed_at': datetime.datetime.now().isoformat(),
'event_time': timestamp
}
def run_batch_pipeline():
"""Run optimized batch pipeline."""
pipeline_options = PipelineOptions([
'--project', 'my-project',
'--runner', 'DataflowRunner',
'--region', 'us-central1',
'--temp_location', 'gs://my-bucket/temp/',
'--machine_type', 'n1-standard-4',
'--max_num_workers', '20',
'--autoscaling_algorithm', 'THROUGHPUT_BASED',
'--experiments', 'use_runner_v2',
'--number_of_worker_harness_threads', '4'
])
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read Files' >> beam.Create([
'gs://my-bucket/data/file1.json',
'gs://my-bucket/data/file2.json',
'gs://my-bucket/data/file3.json'
])
| 'Process' >> beam.ParDo(ReadFilesInParallel())
| 'Write' >> beam.io.WriteToBigQuery(
'my-project:analytics.processed_data',
schema='file:STRING,processed_at:TIMESTAMP',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
)
if __name__ == '__main__':
run_batch_pipeline()
Autoscaling Configuration
# Autoscaling configuration
pipeline_options = PipelineOptions([
'--autoscaling_algorithm', 'THROUGHPUT_BASED',
'--num_workers', '2',
'--max_num_workers', '50',
'--min_num_workers', '1',
'--worker_machine_type', 'n1-standard-4',
'--disk_size_gb', '100'
])
β¨
Best Practice: Use autoscaling for variable workloads. Set appropriate min/max workers based on data volume. Use FlexRS for non-urgent batch jobs. Monitor worker utilization and adjust machine types. Use Splittable DoFn for large file processing.
Common Interview Questions
Q1: What is Splittable DoFn?
Answer: Splittable DoFn allows a single DoFn to be split into multiple bundles for parallel processing. It's useful for processing large files or collections where dynamic work rebalancing is needed.
Q2: When would you use FlexRS vs. standard Dataflow?
Answer: Use FlexRS for non-urgent batch jobs (daily aggregations, backfills) with up to 50% savings. Use standard for time-sensitive processing. FlexRS allows up to 6 hours execution vs. standard's faster completion.
Q3: How do you optimize batch pipeline performance?
Answer: 1) Use appropriate machine types, 2) Enable autoscaling, 3) Optimize parallelism, 4) Use efficient file formats (Parquet), 5) Minimize shuffle, 6) Use Splittable DoFn for large files.
Q4: What is dynamic work rebalancing?
Answer: Dataflow automatically redistributes work among workers to prevent stragglers. If one worker is slower, remaining work is reassigned to faster workers. This ensures optimal resource utilization.
Q5: How do you handle large file processing?
Answer: Use Splittable DoFn to split large files into smaller bundles. Use file-based sources with range reading. Implement custom splitters for binary formats. Consider splitting large files before processing.