Data Lineage Tracking: OpenLineage, Apache Atlas
Difficulty: Senior/Staff Level | Companies: LinkedIn, Airbnb, Netflix, Lyft, Stripe, Databricks
This question tests your understanding of data lineage systems, metadata management, and how to implement end-to-end visibility in complex data pipelines.
1. Data Lineage Fundamentals
What is Data Lineage?
Data lineage tracks the origin, movement, and transformation of data:
Lineage Dimensions
- Horizontal: Data flow across systems
- Vertical: Column-level transformations within tables
- Temporal: Version history of schema and data changes
Lineage Graph Model
Where:
- = {datasets, jobs, dashboards, ML models}
- = {data_flow, dependency, transformation}
# Lineage graph representation
from dataclasses import dataclass
from typing import List, Dict, Set
from collections import defaultdict
@dataclass
class LineageEdge:
source: str
target: str
transform_type: str # 'copy', 'aggregate', 'filter', 'join'
transform_sql: str
column_mapping: Dict[str, str]
@dataclass
class LineageGraph:
nodes: Dict[str, dict]
edges: List[LineageEdge]
def add_edge(self, edge: LineageEdge):
self.edges.append(edge)
def get_upstream(self, dataset: str, depth: int = None) -> Set[str]:
"""Get all upstream dependencies"""
visited = set()
queue = [(dataset, 0)]
while queue:
current, d = queue.pop(0)
if depth and d >= depth:
continue
if current in visited:
continue
visited.add(current)
for edge in self.edges:
if edge.target == current and edge.source not in visited:
queue.append((edge.source, d + 1))
return visited
def get_downstream(self, dataset: str) -> Set[str]:
"""Get all downstream dependents"""
visited = set()
queue = [dataset]
while queue:
current = queue.pop(0)
if current in visited:
continue
visited.add(current)
for edge in self.edges:
if edge.source == current and edge.target not in visited:
queue.append(edge.target)
return visited
def impact_analysis(self, dataset: str) -> Dict[str, List[str]]:
"""Analyze impact of changes to a dataset"""
downstream = self.get_downstream(dataset)
impacts = {
'tables': [],
'dashboards': [],
'ml_models': [],
'jobs': []
}
for node_id in downstream:
node = self.nodes.get(node_id, {})
node_type = node.get('type', '')
if node_type == 'table':
impacts['tables'].append(node_id)
elif node_type == 'dashboard':
impacts['dashboards'].append(node_id)
elif node_type == 'ml_model':
impacts['ml_models'].append(node_id)
elif node_type == 'job':
impacts['jobs'].append(node_id)
return impacts
2. OpenLineage Framework
Architecture
OpenLineage Architecture
βββ Producers (Spark, Airflow, dbt, Flink)
β βββ Emit RunEvents, JobEvents, DatasetEvents
βββ API (Marquez)
β βββ Job Registry
β βββ Dataset Registry
β βββ Run Registry
βββ Backends
β βββ PostgreSQL
β βββ MySQL
β βββ BigQuery
βββ Consumers
βββ Lineage UI
βββ Impact Analysis Service
βββ Data Catalog Integration
Event Model
{
"eventType": "COMPLETE",
"eventTime": "2024-01-15T10:30:00Z",
"run": {
"runId": "run-abc-123",
"facets": {
"spark_version": {
"version": "3.4.1"
},
"job_parameters": {
"parameters": {
"input_date": "2024-01-15"
}
}
}
},
"job": {
"namespace": "production",
"name": "etl_daily_aggregation",
"facets": {
"sql": {
"query": "SELECT date, SUM(amount) FROM sales GROUP BY date"
}
}
},
"inputs": [
{
"namespace": "s3://data-lake",
"name": "sales_raw",
"facets": {
"schema": {
"fields": [
{"name": "date", "type": "date"},
{"name": "amount", "type": "decimal"}
]
}
}
}
],
"outputs": [
{
"namespace": "s3://data-lake",
"name": "sales_daily",
"facets": {
"schema": {
"fields": [
{"name": "date", "type": "date"},
{"name": "daily_total", "type": "decimal"}
]
},
"columnLineage": {
"fields": {
"date": {
"inputFields": [
{
"namespace": "s3://data-lake",
"name": "sales_raw",
"field": "date"
}
]
},
"daily_total": {
"inputFields": [
{
"namespace": "s3://data-lake",
"name": "sales_raw",
"field": "amount",
"transformations": [
{
"type": "AGGREGATION",
"function": "SUM"
}
]
}
]
}
}
}
}
}
]
}
Spark OpenLineage Integration
from pyspark.sql import SparkSession
from openlineage.spark import OpenLineageSparkListener
# Configure OpenLineage with Spark
spark = SparkSession.builder \
.appName("LineageExample") \
.config("spark.extraListeners", "openlineage.spark.SparkOpenLineageListener") \
.config("spark.openlineage.url", "http://marquez:8080/api/v1/lineage") \
.config("spark.openlineage.apiKey", "your-api-key") \
.config("spark.openlineage.namespace", "production") \
.config("spark.openlineage.parentRunId", "parent-run-uuid") \
.getOrCreate()
# All Spark operations automatically emit lineage events
df = spark.read.parquet("s3://data-lake/raw/sales")
df_filtered = df.filter(df.amount > 100)
df_filtered.write.parquet("s3://data-lake/processed/high_value_sales")
# Manual lineage emission (advanced)
from openlineage.client import OpenLineageClient
from openlineage.client.event import RunEvent, RunState, Job, Run, Dataset
client = OpenLineageClient("http://marquez:8080/api/v1")
run = Run(runId="custom-run-id")
job = Job(namespace="production", name="custom_etl_job")
dataset = Dataset(
namespace="s3://data-lake",
name="processed/sales"
)
event = RunEvent(
eventType=RunState.COMPLETE,
run=run,
job=job,
outputs=[dataset]
)
client.emit(event)
3. Apache Atlas
Atlas Metadata Model
{
"typeName": "DataSet",
"typeCategory": "entity",
"attributeDefinitions": [
{
"name": "name",
"typeName": "string",
"isUnique": true,
"isIndexable": true
},
{
"name": "description",
"typeName": "string"
},
{
"name": "owner",
"typeName": "string"
}
],
"relationshipAttributeDefinitions": [
{
"name": "inputToProcesses",
"typeName": "array",
"cardinality": "LIST",
"elementType": {
"typeName": "Process"
}
},
{
"name": "outputFromProcesses",
"typeName": "array",
"cardinality": "LIST",
"elementType": {
"typeName": "Process"
}
}
]
}
Atlas API Integration
import requests
import json
from typing import Dict, List, Optional
class AtlasClient:
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip('/')
self.auth = (username, password)
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def create_entity(self, entity_type: str, attributes: dict) -> dict:
"""Create a new entity in Atlas"""
entity = {
'typeName': entity_type,
'attributes': attributes
}
response = requests.post(
f'{self.base_url}/api/v2/entity',
auth=self.auth,
headers=self.headers,
json=entity
)
response.raise_for_status()
return response.json()
def get_lineage(self, guid: str, direction: str = 'BOTH', depth: int = 3) -> dict:
"""Get lineage for an entity"""
response = requests.get(
f'{self.base_url}/api/v2/lineage/{guid}',
auth=self.auth,
headers=self.headers,
params={
'direction': direction,
'depth': depth
}
)
response.raise_for_status()
return response.json()
def add_classification(self, entity_guid: str, classification: dict) -> dict:
"""Add classification to an entity"""
response = requests.post(
f'{self.base_url}/api/v2/entity/guid/{entity_guid}/classifications',
auth=self.auth,
headers=self.headers,
json=classification
)
response.raise_for_status()
return response.json()
def search_entities(self, query: str, entity_type: Optional[str] = None) -> list:
"""Search for entities"""
params = {'query': query}
if entity_type:
params['typeName'] = entity_type
response = requests.get(
f'{self.base_url}/api/v2/search/basic',
auth=self.auth,
headers=self.headers,
params=params
)
response.raise_for_status()
return response.json()['entities']
# Usage
atlas = AtlasClient('http://atlas:21000', 'admin', 'admin')
# Create table entity
table_guid = atlas.create_entity('hive_table', {
'name': 'sales_daily',
'dbName': 'analytics',
'owner': 'data-team',
'columns': [
{'name': 'date', 'type': 'date'},
{'name': 'daily_total', 'type': 'decimal'}
]
})
# Get lineage
lineage = atlas.get_lineage(table_guid['guid'], direction='INPUT', depth=5)
4. Column-Level Lineage
SQL Parsing for Column Lineage
import sqlparse
from sqlparse.sql import IdentifierList, Identifier
from sqlparse.tokens import Keyword, DML
class ColumnLineageExtractor:
def __init__(self):
self.lineage = {}
def extract(self, sql: str) -> dict:
"""Extract column-level lineage from SQL"""
parsed = sqlparse.parse(sql)[0]
# Extract SELECT columns and their sources
select_found = False
for token in parsed.tokens:
if token.ttype is DML and token.value.upper() == 'SELECT':
select_found = True
continue
if select_found and isinstance(token, IdentifierList):
for identifier in token.get_identifiers():
self._process_identifier(identifier)
return self.lineage
def _process_identifier(self, identifier):
"""Process a single identifier from SELECT"""
alias = identifier.get_alias()
name = identifier.get_real_name()
# Handle column expressions
if isinstance(identifier, Identifier):
# Simple column reference
if '.' in name:
source_table, source_column = name.split('.')
self.lineage[alias or name] = {
'source_table': source_table,
'source_column': source_column,
'transform': None
}
else:
self.lineage[alias or name] = {
'source_table': None,
'source_column': name,
'transform': None
}
# Example usage
sql = """
INSERT INTO analytics.sales_daily
SELECT
sale_date AS date,
SUM(amount) AS daily_total,
COUNT(*) AS transaction_count,
AVG(amount) AS avg_order_value
FROM raw.sales
WHERE sale_date >= '2024-01-01'
GROUP BY sale_date
"""
extractor = ColumnLineageExtractor()
lineage = extractor.extract(sql)
print(json.dumps(lineage, indent=2))
Column Lineage Storage Model
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class ColumnLineage:
target_dataset: str
target_column: str
source_dataset: str
source_column: str
transformation: Optional[str]
job_name: str
timestamp: datetime
def to_dict(self):
return {
'target': f"{self.target_dataset}.{self.target_column}",
'source': f"{self.source_dataset}.{self.source_column}",
'transform': self.transformation,
'job': self.job_name,
'timestamp': self.timestamp.isoformat()
}
class ColumnLineageStore:
def __init__(self):
self.lineages: List[ColumnLineage] = []
def add_lineage(self, lineage: ColumnLineage):
self.lineages.append(lineage)
def get_column_lineage(self, dataset: str, column: str) -> List[ColumnLineage]:
"""Get all lineage for a specific column"""
return [
l for l in self.lineages
if l.target_dataset == dataset and l.target_column == column
]
def get_impact(self, dataset: str, column: str) -> Dict[str, List[str]]:
"""Get impact analysis for a column change"""
upstream = set()
downstream = set()
for l in self.lineages:
if l.source_dataset == dataset and l.source_column == column:
downstream.add(f"{l.target_dataset}.{l.target_column}")
elif l.target_dataset == dataset and l.target_column == column:
upstream.add(f"{l.source_dataset}.{l.source_column}")
return {
'upstream': list(upstream),
'downstream': list(downstream)
}
5. Lineage in Modern Data Stacks
dbt Lineage
# dbt automatically captures lineage
# In your dbt project
# models/staging/stg_sales.sql
"""
SELECT
id,
sale_date,
amount,
product_id
FROM {{ source('raw', 'sales') }}
WHERE is_valid = TRUE
"""
# models/marts/daily_sales.sql
"""
SELECT
sale_date,
SUM(amount) AS daily_total,
COUNT(*) AS transaction_count
FROM {{ ref('stg_sales') }}
GROUP BY sale_date
"""
# dbt provides built-in lineage
# Run: dbt docs generate
# Open: target/index.html -> Lineage Graph
Spark + OpenLineage Airflow DAG
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
}
with DAG(
'etl_pipeline_with_lineage',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
tags=['lineage', 'production']
) as dag:
extract = SparkSubmitOperator(
task_id='extract_raw_data',
application='s3://apps/extract.py',
conf={
'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
'spark.openlineage.namespace': 'production',
'spark.openlineage.job.name': 'etl_pipeline.extract'
}
)
transform = SparkSubmitOperator(
task_id='transform_data',
application='s3://apps/transform.py',
conf={
'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
'spark.openlineage.namespace': 'production'
}
)
load = SparkSubmitOperator(
task_id='load_to_warehouse',
application='s3://apps/load.py',
conf={
'spark.openlineage.url': 'http://marquez:8080/api/v1/lineage',
'spark.openlineage.namespace': 'production'
}
)
extract >> transform >> load
6. Impact Analysis & Change Management
Schema Change Detection
class SchemaChangeDetector:
def __init__(self, atlas_client):
self.atlas = atlas_client
def detect_changes(self, dataset_guid: str, new_schema: dict) -> list:
"""Detect schema changes and their impact"""
current = self.atlas.get_entity(dataset_guid)
changes = []
current_columns = {c['name']: c for c in current['attributes']['columns']}
new_columns = {c['name']: c for c in new_schema['columns']}
# Detect added columns
added = set(new_columns.keys()) - set(current_columns.keys())
for col in added:
changes.append({
'type': 'COLUMN_ADDED',
'column': col,
'impact': 'LOW',
'action': 'Safe to add'
})
# Detect removed columns
removed = set(current_columns.keys()) - set(new_columns.keys())
for col in removed:
downstream = self.atlas.get_lineage(dataset_guid, direction='OUTPUT')
impacted = [d for d in downstream if col in d.get('input_columns', [])]
changes.append({
'type': 'COLUMN_REMOVED',
'column': col,
'impact': 'HIGH' if impacted else 'MEDIUM',
'action': f'Impacts {len(impacted)} downstream consumers'
})
# Detect type changes
for col in set(current_columns.keys()) & set(new_columns.keys()):
old_type = current_columns[col]['type']
new_type = new_columns[col]['type']
if old_type != new_type:
changes.append({
'type': 'TYPE_CHANGED',
'column': col,
'from': old_type,
'to': new_type,
'impact': 'HIGH',
'action': 'May break downstream queries'
})
return changes
Compliance Lineage Queries
-- GDPR: Find all PII data lineage
WITH pii_lineage AS (
SELECT
t.table_name,
c.column_name,
c.classification,
l.source_table,
l.source_column
FROM metadata.tables t
JOIN metadata.columns c ON t.table_id = c.table_id
LEFT JOIN metadata.lineage l ON t.table_id = l.target_table_id
WHERE c.classification IN ('PII', 'SENSITIVE', 'PHI')
)
SELECT
table_name,
column_name,
classification,
STRING_AGG(DISTINCT source_table, ', ') AS upstream_sources
FROM pii_lineage
GROUP BY table_name, column_name, classification;
-- Track data retention compliance
SELECT
dataset_name,
retention_policy,
last_access_date,
DATEDIFF('day', last_access_date, CURRENT_DATE()) AS days_since_access,
CASE
WHEN DATEDIFF('day', last_access_date, CURRENT_DATE()) > 365 THEN 'ARCHIVE'
WHEN DATEDIFF('day', last_access_date, CURRENT_DATE()) > 730 THEN 'DELETE'
else 'KEEP'
END AS recommended_action
FROM metadata.datasets
WHERE retention_policy IS NOT NULL;
7. Best Practices
βΉοΈ
Incremental Lineage Collection: Don't recompute lineage for entire pipelines. Use event-driven collection (OpenLineage) to capture changes incrementally.
β οΈ
Lineage Accuracy: Manual lineage annotations are error-prone. Prefer automated extraction from SQL parsers and orchestration tools.
π‘
Column-Level Granularity: Table-level lineage is insufficient for impact analysis. Invest in column-level lineage for proper change management.
βΉοΈ
Lineage for Debugging: When a pipeline fails, use lineage to identify all affected downstream datasets and stakeholders.
Follow-up Questions
- How would you implement real-time lineage tracking for streaming pipelines?
- Explain the trade-offs between pull-based (polling) and push-based (event) lineage collection.
- How do you handle lineage for federated queries across multiple data sources?
- Design a lineage-based access control system for sensitive data.
- How would you implement lineage for machine learning feature pipelines?
- Explain how lineage helps with data quality root cause analysis.
- How do you handle lineage when using dynamic SQL or code-generated queries?
- Design a lineage system that can handle petabyte-scale data platforms.