Interview Question
βΉοΈInterview Context
Company: Meta / Netflix Role: Senior Data Engineer / Platform Engineer Difficulty: Advanced Time: 45-60 minutes
Question: "Explain the relationship between Hooks, Connections, and Operators in Airflow. How do you manage secrets and credentials in production? Describe different secrets backends and when to use each."
Detailed Theory
Hook Architecture
# hook_architecture.py
"""
Hook Architecture in Airflow:
Connections:
- Stored in metadata DB (connection table)
- Define how to connect to external systems
- Include host, login, password, schema, extra fields
Hooks:
- Python classes that interact with external systems
- Use Connection objects for authentication
- Provide methods for common operations
- Handle connection pooling and lifecycle
Operators:
- Use Hooks to interact with external systems
- Abstract Hook usage behind task interface
- Handle task lifecycle and error handling
Flow: Operator -> Hook -> Connection -> External System
"""
1. Connection Management
# connection_management.py
from airflow.models.connection import Connection
from airflow.hooks.base import BaseHook
from airflow.utils.session import create_session
# Method 1: Get connection using BaseHook
def get_connection_example():
"""Get connection using BaseHook"""
conn = BaseHook.get_connection(conn_id='my_connection')
print(f"Host: {conn.host}")
print(f"Login: {conn.login}")
print(f"Password: {conn.password}")
print(f"Schema: {conn.schema}")
print(f"Port: {conn.port}")
print(f"Extra: {conn.extra}")
# Method 2: Create connection programmatically
def create_connection():
"""Create connection in database"""
with create_session() as session:
conn = Connection(
conn_id='my_new_connection',
conn_type='postgres',
host='localhost',
login='user',
password='password',
schema='my_database',
port=5432,
)
session.add(conn)
session.commit()
# Method 3: Import connections from JSON
def import_connections_from_json():
"""Import connections from JSON file"""
import json
with open('connections.json') as f:
connections = json.load(f)
with create_session() as session:
for conn_data in connections:
conn = Connection(**conn_data)
session.add(conn)
session.commit()
# Connection Types
CONNECTION_TYPES = {
'postgres': 'PostgreSQL',
'mysql': 'MySQL',
'google_cloud_platform': 'Google Cloud',
'aws_default': 'AWS',
'http': 'HTTP',
'slack': 'Slack',
'email': 'Email',
}
β οΈSecurity
Never store credentials in DAG files. Always use Connections or Secrets Backends. DAG files are visible in the Webserver and can be accessed by multiple users.
2. Common Hooks
# common_hooks.py
from airflow.hooks.base import BaseHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
from airflow.providers.http.hooks.http import HttpHook
from datetime import datetime
# PostgreSQL Hook
def postgres_example():
"""PostgreSQL operations"""
hook = PostgresHook(postgres_conn_id='postgres_default')
# Execute query
records = hook.get_records("SELECT * FROM users WHERE active = true")
# Insert data
hook.insert_rows(
table='users',
rows=[('john@example.com', 'John', 'Doe')],
target_fields=['email', 'first_name', 'last_name']
)
# Get pandas DataFrame
df = hook.get_pandas_df("SELECT * FROM users")
# Execute with parameters
hook.run(
"INSERT INTO users (email, name) VALUES (%s, %s)",
parameters=('jane@example.com', 'Jane Doe')
)
# S3 Hook
def s3_example():
"""S3 operations"""
hook = S3Hook(aws_conn_id='aws_default')
# Upload file
hook.load_file(
filename='/tmp/data.csv',
key='data/{{ ds }}/data.csv',
bucket_name='my-bucket'
)
# Download file
file_obj = hook.get_key(
key='data/{{ ds }}/data.csv',
bucket_name='my-bucket'
)
# List objects
keys = hook.list_keys(
bucket_name='my-bucket',
prefix='data/'
)
# Check if object exists
exists = hook.check_for_key(
key='data/{{ ds }}/data.csv',
bucket_name='my-bucket'
)
# BigQuery Hook
def bigquery_example():
"""BigQuery operations"""
hook = BigQueryHook(
bigquery_conn_id='google_cloud_default',
use_legacy_sql=False
)
# Execute query
records = hook.get_records("SELECT * FROM dataset.table")
# Insert data
hook.insert_rows(
table='dataset.table',
rows=[('value1', 'value2')]
)
# Get client
client = hook.get_client()
# Execute query with job config
job_config = {
'query': {
'query': 'SELECT * FROM dataset.table',
'useLegacySql': False,
}
}
job = hook.run(job_config=job_config)
# HTTP Hook
def http_example():
"""HTTP operations"""
hook = HttpHook(http_conn_id='api_default')
# GET request
response = hook.run(
endpoint='/api/data',
method='GET'
)
# POST request
response = hook.run(
endpoint='/api/data',
method='POST',
data='{"key": "value"}',
headers={'Content-Type': 'application/json'}
)
# Get response
print(response.json())
3. Custom Hooks
# custom_hooks.py
from airflow.hooks.base import BaseHook
from airflow.models import Connection
from typing import Any, Dict, Optional
import logging
class CustomApiHook(BaseHook):
"""
Custom hook for external API.
:param api_key: API key for authentication
:param base_url: Base URL for API
"""
conn_name_attr = 'custom_api_conn_id'
default_conn_name = 'custom_api_default'
conn_type = 'custom_api'
hook_name = 'Custom API'
def __init__(
self,
custom_api_conn_id: str = default_conn_name,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.custom_api_conn_id = custom_api_conn_id
self._conn = None
def get_conn(self) -> Dict[str, Any]:
"""Get API connection details"""
conn = self.get_connection(self.custom_api_conn_id)
return {
'base_url': conn.host,
'api_key': conn.password,
'headers': {
'Authorization': f'Bearer {conn.password}',
'Content-Type': 'application/json'
}
}
def run(
self,
endpoint: str,
method: str = 'GET',
data: Optional[Dict] = None,
*args,
**kwargs
) -> Any:
"""Execute API call"""
import requests
conn = self.get_conn()
url = f"{conn['base_url']}{endpoint}"
response = requests.request(
method=method,
url=url,
headers=conn['headers'],
json=data,
*args,
**kwargs
)
response.raise_for_status()
return response.json()
# Usage
custom_hook = CustomApiHook(custom_api_conn_id='my_api')
result = custom_hook.run(endpoint='/data', method='GET')
4. Secrets Backends
# secrets_backends.py
"""
Airflow Secrets Backends:
1. Metadata DB (default):
- Stores connections in DB
- Simple setup
- Good for small deployments
2. Environment Variables:
- Use AIRFLOW_CONN_<conn_id> format
- Good for Docker/K8s
- No DB setup required
3. HashiCorp Vault:
- Enterprise secret management
- Dynamic secrets
- Audit logging
4. AWS Secrets Manager:
- Managed AWS service
- Automatic rotation
- IAM integration
5. GCP Secret Manager:
- Managed GCP service
- Version management
- IAM integration
"""
# Configuration
SECRETS_CONFIG = """
[secrets]
# Default: use metadata DB
backend = airflow.providers.secrets.local_filesystem.LocalFilesystemBackend
# Or use environment variables
backend = airflow.providers.secrets.environment_variables.EnvironmentVariablesBackend
# Or use HashiCorp Vault
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"connections_path": "airflow/connections", "variables_path": "airflow/variables"}
# Or use AWS Secrets Manager
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
# Or use GCP Secret Manager
backend = airflow.providers.google.cloud.secrets.secret_manager.SecretManagerBackend
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
"""
βΉοΈPro Tip
Use the --conn-id flag to test connections: airflow connections get my_connection. This helps verify your secrets backend is configured correctly.
5. Connection Pooling
# connection_pooling.py
"""
Connection Pooling in Airflow:
Airflow supports connection pooling for database connections.
This is important for high-concurrency deployments.
"""
# Configuration
POOLING_CONFIG = """
[database]
# Connection pool settings
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 10
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = True
# For PostgreSQL with PgBouncer
# sql_alchemy_pool_size = 20
# sql_alchemy_max_overflow = 30
"""
# Custom connection pooling
from airflow.hooks.base import BaseHook
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
class PooledDatabaseHook(BaseHook):
"""Database hook with connection pooling"""
def __init__(self, conn_id: str):
super().__init__()
self.conn_id = conn_id
self._engine = None
@property
def engine(self):
if self._engine is None:
conn = self.get_connection(self.conn_id)
# Create engine with pooling
self._engine = create_engine(
conn.get_uri(),
poolclass=QueuePool,
pool_size=20,
max_overflow=30,
pool_pre_ping=True,
pool_recycle=1800,
)
return self._engine
def get_conn(self):
return self.engine.connect()
Real-World Scenarios
Scenario 1: Meta's Multi-Region Setup
# meta_multi_region.py
"""
Meta-style multi-region connection management:
- Region-specific connections
- Failover handling
- Connection caching
"""
from airflow.hooks.base import BaseHook
from typing import Dict, Optional
import logging
class MultiRegionHook(BaseHook):
"""Hook with multi-region support"""
def __init__(
self,
primary_region: str = 'us-east-1',
fallback_regions: Optional[list] = None,
):
super().__init__()
self.primary_region = primary_region
self.fallback_regions = fallback_regions or ['us-west-2', 'eu-west-1']
self._connections: Dict[str, any] = {}
def get_connection(
self,
region: Optional[str] = None
):
"""Get connection for specific region"""
region = region or self.primary_region
conn_id = f'database_{region}'
# Check cache
if region in self._connections:
return self._connections[region]
# Try to get connection
try:
conn = super().get_connection(conn_id)
self._connections[region] = conn
return conn
except Exception as e:
logging.warning(f"Failed to get connection for {region}: {e}")
# Try fallback regions
for fallback in self.fallback_regions:
try:
conn_id = f'database_{fallback}'
conn = super().get_connection(conn_id)
self._connections[region] = conn
return conn
except Exception:
continue
raise Exception(f"No available connections for {region}")
# Usage
hook = MultiRegionHook(primary_region='us-east-1')
conn = hook.get_connection() # Gets us-east-1 connection
conn_west = hook.get_connection(region='us-west-2') # Gets us-west-2 connection
Scenario 2: Netflix's Secrets Rotation
# netflix_secrets_rotation.py
"""
Netflix-style secrets rotation:
- Automatic credential rotation
- Zero-downtime updates
- Audit logging
"""
from airflow.hooks.base import BaseHook
from datetime import datetime, timedelta
from typing import Dict, Any
import logging
class RotatableSecretsHook(BaseHook):
"""Hook with automatic secrets rotation"""
def __init__(
self,
conn_id: str,
rotation_interval: int = 86400, # 24 hours
):
super().__init__()
self.conn_id = conn_id
self.rotation_interval = rotation_interval
self._last_rotation: Dict[str, datetime] = {}
def get_connection(
self,
force_refresh: bool = False
) -> Any:
"""Get connection with automatic rotation"""
now = datetime.now()
# Check if rotation needed
last_rotation = self._last_rotation.get(self.conn_id)
if (
force_refresh or
last_rotation is None or
(now - last_rotation).total_seconds() > self.rotation_interval
):
self._rotate_secret()
self._last_rotation[self.conn_id] = now
# Get connection
return super().get_connection(self.conn_id)
def _rotate_secret(self):
"""Rotate the secret"""
logging.info(f"Rotating secret for {self.conn_id}")
# Implementation:
# 1. Generate new secret
# 2. Update in secrets backend
# 3. Update connection in Airflow
# 4. Log rotation event
# Example: Update in AWS Secrets Manager
# client = boto3.client('secretsmanager')
# client.update_secret(
# SecretId=f'airflow/connections/{self.conn_id}',
# SecretString=new_secret
# )
logging.info(f"Secret rotated for {self.conn_id}")
# Usage
hook = RotatableSecretsHook(conn_id='database', rotation_interval=86400)
conn = hook.get_connection() # Automatically rotates if needed
Edge Cases
β οΈCommon Pitfalls
-
Connection Leaks: Always close connections properly. Use context managers when available.
-
Cache Invalidation: Connections cached in memory. Use force_refresh when credentials change.
-
Network Timeouts: Set appropriate timeouts for external connections. Handle network failures gracefully.
-
Concurrent Access: Be careful with shared connections in multi-threaded environments.
# edge_cases.py
from airflow.hooks.base import BaseHook
from contextlib import contextmanager
# Proper connection handling
@contextmanager
def get_managed_connection(conn_id: str):
"""Context manager for connection handling"""
hook = BaseHook.get_hook(conn_id=conn_id)
conn = hook.get_conn()
try:
yield conn
finally:
conn.close()
# Usage
with get_managed_connection('postgres_default') as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
QuizBox
Best Practices
# best_practices.py
"""
Hooks & Connections Best Practices:
1. Security:
- Never hardcode credentials
- Use Secrets Backends
- Enable audit logging
- Rotate credentials regularly
2. Connection Management:
- Use connection pooling
- Set appropriate timeouts
- Handle connection failures
- Close connections properly
3. Performance:
- Cache connections when possible
- Use appropriate pool sizes
- Monitor connection usage
- Optimize query patterns
4. Reliability:
- Implement retry logic
- Handle network failures
- Use fallback connections
- Monitor connection health
5. Maintainability:
- Document connection requirements
- Use consistent naming conventions
- Version control connection configs
- Test connections regularly
"""
βΉοΈMeta Interview Tip
At Meta, they use a custom secrets management system with automatic rotation. When discussing hooks and connections, emphasize the importance of security, connection pooling, and automatic rotation. Also mention how they handle multi-region failover.
Summary
Hooks and Connections are fundamental to Airflow's integration capabilities. Key takeaways:
- Connections store authentication details
- Hooks provide Python interfaces to external systems
- Secrets Backends manage credentials securely
- Connection Pooling improves performance
- Automatic Rotation ensures security
For Meta and Netflix interviews, focus on:
- Security best practices
- Connection pooling strategies
- Secrets rotation
- Multi-region failover
- Monitoring and debugging
This question is part of the Apache Airflow Advanced interview preparation series. Practice implementing these patterns before your interview.