πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Streaming Data Pipelines

⭐ Premium

Advertisement

Streaming Data Pipelines

Real-time data processing enables live features, instant recommendations, and immediate fraud detection. This lesson covers the core technologies and patterns for building streaming pipelines.

Streaming Architecture Overview

DataSourcesKafkaIngestionFlink / SparkStream ProcessingFeatureStoreAPIs /DashboardsEvents, LogsTopics, PartitionsWindows, StateOnline/OfflineReal-time Serving

Apache Kafka Fundamentals

Kafka is the backbone of most streaming architectures. Master producers, consumers, and partitioning.

from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import AdminClient, NewTopic
import json

# Producer configuration
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8'),
    acks='all',
    retries=3,
    batch_size=16384,
    linger_ms=10
)

# Send events with partitioning
for event in events:
    producer.send(
        topic='user-events',
        key=event['user_id'],
        value=event
    )
producer.flush()

# Consumer with group management
consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['localhost:9092'],
    group_id='ml-pipeline',
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    process_event(event)
    consumer.commit()

Kafka Streams for Real-Time Processing

Kafka Streams provides exactly-once semantics and windowed aggregations without external dependencies.

from kafka import KafkaConsumer, KafkaProducer
from collections import defaultdict
from datetime import datetime, timedelta
import json

class WindowedAggregator:
    def __init__(self, window_size_seconds=60, slide_seconds=10):
        self.window_size = timedelta(seconds=window_size_seconds)
        self.slide = timedelta(seconds=slide_seconds)
        self.windows = defaultdict(list)
    
    def add_event(self, key, timestamp, value):
        window_start = timestamp - (timestamp % self.slide)
        self.windows[(key, window_start)].append(value)
    
    def get_window_results(self, current_time):
        results = {}
        for (key, window_start), values in self.windows.items():
            if window_start <= current_time - self.window_size:
                results[key] = {
                    'count': len(values),
                    'sum': sum(values),
                    'mean': sum(values) / len(values),
                    'window_start': window_start.isoformat()
                }
                del self.windows[(key, window_start)]
        return results

# Usage
aggregator = WindowedAggregator(window_size_seconds=300)
consumer = KafkaConsumer('raw-metrics')

for msg in consumer:
    event = msg.value
    aggregator.add_event(
        key=event['metric_name'],
        timestamp=event['timestamp'],
        value=event['value']
    )
    
    results = aggregator.get_window_results(datetime.now())
    if results:
        publish_aggregates(results)

Apache Flink for Complex Event Processing

Flink excels at stateful processing, exactly-once guarantees, and complex event patterns.

# Flink SQL for streaming analytics
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
tenv = StreamTableEnvironment.create(env)

# Create a source table from Kafka
tenv.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        event_type STRING,
        product_id STRING,
        amount DECIMAL(10,2),
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'localhost:9092',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset'
    )
""")

# Windowed aggregation with session windows
tenv.execute_sql("""
    SELECT 
        user_id,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
        COUNT(*) AS event_count,
        SUM(amount) AS total_amount
    FROM user_events
    WHERE event_type = 'purchase'
    GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

# Session window for activity tracking
tenv.execute_sql("""
    SELECT 
        user_id,
        SESSION_START(event_time, INTERVAL '30' MINUTE) AS session_start,
        SESSION_END(event_time, INTERVAL '30' MINUTE) AS session_end,
        COUNT(*) AS actions_in_session
    FROM user_events
    GROUP BY user_id, SESSION(event_time, INTERVAL '30' MINUTE)
""")

Real-Time Feature Computation

import json
from kafka import KafkaConsumer
from redis import Redis
import numpy as np
from collections import deque

class RealTimeFeatureEngine:
    def __init__(self):
        self.redis = Redis(host='localhost', port=6379, db=0)
        self.consumer = KafkaConsumer(
            'user-actions',
            bootstrap_servers=['localhost:9092'],
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
    
    def compute_features(self, user_id, event):
        pipe = self.redis.pipeline()
        
        # Count features (last hour)
        pipe.incr(f"count:{user_id}:1h:{event['action']}")
        pipe.expire(f"count:{user_id}:1h:{event['action']}", 3600)
        
        # Sliding window average
        key = f"window:{user_id}:amounts"
        pipe.lpush(key, event.get('amount', 0))
        pipe.ltrim(key, 0, 99)  # Keep last 100
        pipe.ttl(key)
        
        # Engagement score
        action_weights = {'view': 1, 'click': 2, 'add_to_cart': 3, 'purchase': 5}
        weight = action_weights.get(event['action'], 0)
        pipe.incrbyfloat(f"engagement:{user_id}", weight * 0.1)
        pipe.expire(f"engagement:{user_id}", 86400)
        
        results = pipe.execute()
        
        return {
            'user_id': user_id,
            'action_count': results[0],
            'recent_amounts': self._get_amounts(user_id),
            'engagement_score': float(results[-1] or 0),
            'timestamp': event['timestamp']
        }
    
    def _get_amounts(self, user_id):
        raw = self.redis.lrange(f"window:{user_id}:amounts", 0, -1)
        return [float(x) for x in raw] if raw else [0.0]

Event-Driven Architecture Patterns

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Callable
import asyncio

class EventType(Enum):
    USER_CREATED = "user.created"
    ORDER_PLACED = "order.placed"
    PAYMENT_RECEIVED = "payment.received"
    INVENTORY_UPDATED = "inventory.updated"

@dataclass
class Event:
    event_type: EventType
    payload: dict
    correlation_id: str = ""
    metadata: dict = field(default_factory=dict)

class EventBus:
    def __init__(self):
        self.handlers: dict[EventType, List[Callable]] = {}
    
    def subscribe(self, event_type: EventType, handler: Callable):
        if event_type not in self.handlers:
            self.handlers[event_type] = []
        self.handlers[event_type].append(handler)
    
    async def publish(self, event: Event):
        handlers = self.handlers.get(event.event_type, [])
        tasks = [handler(event) for handler in handlers]
        await asyncio.gather(*tasks)

# Domain event handlers
async def on_order_placed(event: Event):
    order = event.payload
    await send_confirmation_email(order['user_id'])
    await update_inventory(order['items'])
    await trigger_revenue_calculation(order)

async def on_payment_received(event: Event):
    payment = event.payload
    await update_order_status(payment['order_id'], 'paid')
    await credit_loyalty_points(payment['user_id'], payment['amount'])

# Wire up the event bus
bus = EventBus()
bus.subscribe(EventType.ORDER_PLACED, on_order_placed)
bus.subscribe(EventType.PAYMENT_RECEIVED, on_payment_received)

Key Takeaways

  • Kafka is the standard for event ingestion; use partitioning for scalability
  • Flink excels at stateful processing with exactly-once semantics
  • Real-time features require careful windowing and state management
  • Event-driven architectures decouple producers from consumers for resilience

Advertisement