Interview Question (Hard) β Asked at: Google, Netflix, Uber, Airbnb, Spotify
"Design an ML data pipeline that handles data validation, transformation, and augmentation at scale. How do you ensure data quality, handle schema evolution, and manage data versioning?"
ML Data Pipeline Architecture
ML data pipelines transform raw data into features suitable for model training and serving. They must handle validation, transformation, and quality monitoring at scale.
Data Pipeline Diagram
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ML Data Pipeline Architecture β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Data βββββΆβ Schema βββββΆβ Data βββββΆβFeature β β
β β Ingestionβ βValidationβ βTransform β βEngineeringβ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β β β β β
β βΌ βΌ βΌ βΌ β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β Raw β β Clean β βProcessed β βFeature β β
β β Data β β Data β β Data β βStore β β
β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Data Quality Monitoring β β
β β Validation | Profiling | Drift Detection | Alerts β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Data Validation
Great Expectations Pipeline
import great_expectations as ge
import pandas as pd
from typing import Dict, List, Optional
from datetime import datetime
import json
class MLDataValidator:
"""Comprehensive data validation for ML pipelines."""
def __init__(self, reference_data: pd.DataFrame = None):
self.reference_data = reference_data
self.validation_results = []
def create_expectation_suite(self, df: pd.DataFrame,
suite_name: str = "ml_data_suite") -> ge.core.ExpectationSuite:
"""Create expectation suite from data profile."""
suite = ge.core.ExpectationSuite(expectation_suite_name=suite_name)
# Add expectations for each column
for column in df.columns:
# Not null expectation
suite.add_expectation(
ge.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": column}
)
)
# Data type expectations
if df[column].dtype in ['int64', 'float64']:
# Numeric range expectation
min_val = df[column].min()
max_val = df[column].max()
suite.add_expectation(
ge.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": column,
"min_value": min_val * 0.5,
"max_value": max_val * 1.5
}
)
)
elif df[column].dtype == 'object':
# Unique values expectation
unique_values = df[column].unique().tolist()
if len(unique_values) <= 100: # Only for low cardinality
suite.add_expectation(
ge.core.ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": column,
"value_set": unique_values
}
)
)
# Table-level expectations
suite.add_expectation(
ge.core.ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": len(df) * 0.8,
"max_value": len(df) * 1.2
}
)
)
return suite
def validate_dataset(self, df: pd.DataFrame,
suite: ge.core.ExpectationSuite) -> Dict:
"""Validate dataset against expectation suite."""
ge_df = ge.from_pandas(df)
results = ge_df.validate(suite)
validation_result = {
'timestamp': datetime.now().isoformat(),
'success': results.success,
'statistics': results.statistics,
'results': []
}
for result in results.results:
validation_result['results'].append({
'expectation': result.expectation_config.expectation_type,
'success': result.success,
'result': result.result,
'kwargs': result.expectation_config.kwargs
})
self.validation_results.append(validation_result)
return validation_result
def validate_ml_specific(self, df: pd.DataFrame,
target_column: str,
feature_columns: List[str]) -> Dict:
"""ML-specific validation checks."""
checks = []
# Check target variable
if target_column in df.columns:
# Class balance check
class_counts = df[target_column].value_counts()
class_ratio = class_counts.min() / class_counts.max()
checks.append({
'check': 'class_balance',
'success': class_ratio > 0.1,
'value': float(class_ratio),
'threshold': 0.1,
'message': f"Class ratio: {class_ratio:.4f}"
})
# Check for label leakage
for col in feature_columns:
if col in df.columns and df[col].dtype in ['int64', 'float64']:
correlation = abs(df[col].corr(df[target_column]))
checks.append({
'check': 'label_leakage',
'feature': col,
'success': correlation < 0.95,
'value': float(correlation),
'threshold': 0.95,
'message': f"Correlation with target: {correlation:.4f}"
})
# Check feature correlations
if len(feature_columns) > 1:
numeric_cols = [c for c in feature_columns
if c in df.columns and df[c].dtype in ['int64', 'float64']]
if len(numeric_cols) > 1:
corr_matrix = df[numeric_cols].corr().abs()
upper_tri = corr_matrix.where(
np.triu(np.ones(corr_matrix.shape), k=1).astype(bool)
)
high_corr_pairs = [
(col, row, corr_matrix.loc[row, col])
for col in upper_tri.columns
for row in upper_tri.index
if upper_tri.loc[row, col] > 0.95
]
checks.append({
'check': 'feature_correlation',
'success': len(high_corr_pairs) == 0,
'value': len(high_corr_pairs),
'threshold': 0,
'message': f"Highly correlated pairs: {len(high_corr_pairs)}"
})
return {
'timestamp': datetime.now().isoformat(),
'checks': checks,
'success': all(check['success'] for check in checks)
}
def generate_report(self) -> str:
"""Generate HTML validation report."""
html = """
<html>
<head>
<title>Data Validation Report</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.passed { color: green; }
.failed { color: red; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; }
th { background-color: #f2f2f2; }
</style>
</head>
<body>
<h1>Data Validation Report</h1>
<p>Generated: """ + datetime.now().isoformat() + """</p>
"""
for result in self.validation_results:
status_class = "passed" if result['success'] else "failed"
html += f"""
<h2 class="{status_class}">
Validation: {'PASSED' if result['success'] else 'FAILED'}
</h2>
<table>
<tr><th>Expectation</th><th>Success</th><th>Details</th></tr>
"""
for check in result.get('results', []):
html += f"""
<tr>
<td>{check['expectation']}</td>
<td class="{'passed' if check['success'] else 'failed'}">
{'β' if check['success'] else 'β'}
</td>
<td>{check.get('result', {})}</td>
</tr>
"""
html += "</table>"
html += "</body></html>"
return html
βΉοΈ
Data validation should be the first step in any ML pipeline. Implement schema validation, statistical tests, and ML-specific checks to catch data issues early.
Data Transformation
PySpark Transformation Pipeline
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from typing import Dict, List
import numpy as np
class MLDataTransformer:
"""Data transformation pipeline using PySpark."""
def __init__(self, spark: SparkSession):
self.spark = spark
self.transformers = []
def add_numeric_transformer(self, columns: List[str],
method: str = 'standard'):
"""Add numeric transformation."""
self.transformers.append({
'type': 'numeric',
'columns': columns,
'method': method
})
return self
def add_categorical_transformer(self, columns: List[str],
method: str = 'onehot'):
"""Add categorical transformation."""
self.transformers.append({
'type': 'categorical',
'columns': columns,
'method': method
})
return self
def add_time_transformer(self, timestamp_column: str,
features: List[str] = None):
"""Add time-based feature extraction."""
self.transformers.append({
'type': 'time',
'column': timestamp_column,
'features': features or ['hour', 'dayofweek', 'month', 'year']
})
return self
def add_aggregation_transformer(self, group_columns: List[str],
agg_columns: List[str],
agg_functions: List[str] = None):
"""Add aggregation transformer."""
self.transformers.append({
'type': 'aggregation',
'group_columns': group_columns,
'agg_columns': agg_columns,
'agg_functions': agg_functions or ['mean', 'std', 'min', 'max']
})
return self
def transform(self, df):
"""Apply all transformations."""
for transformer in self.transformers:
if transformer['type'] == 'numeric':
df = self._transform_numeric(df, transformer)
elif transformer['type'] == 'categorical':
df = self._transform_categorical(df, transformer)
elif transformer['type'] == 'time':
df = self._transform_time(df, transformer)
elif transformer['type'] == 'aggregation':
df = self._transform_aggregation(df, transformer)
return df
def _transform_numeric(self, df, transformer: Dict):
"""Apply numeric transformations."""
columns = transformer['columns']
method = transformer['method']
if method == 'standard':
for col in columns:
# Calculate mean and std
stats = df.select(
F.mean(col).alias('mean'),
F.stddev(col).alias('std')
).collect()[0]
mean_val = stats['mean']
std_val = stats['std']
# Apply transformation
df = df.withColumn(
f"{col}_transformed",
(F.col(col) - mean_val) / (std_val + 1e-8)
)
elif method == 'minmax':
for col in columns:
stats = df.select(
F.min(col).alias('min'),
F.max(col).alias('max')
).collect()[0]
min_val = stats['min']
max_val = stats['max']
df = df.withColumn(
f"{col}_transformed",
(F.col(col) - min_val) / (max_val - min_val + 1e-8)
)
elif method == 'log':
for col in columns:
df = df.withColumn(
f"{col}_transformed",
F.log1p(F.abs(F.col(col)))
)
return df
def _transform_categorical(self, df, transformer: Dict):
"""Apply categorical transformations."""
columns = transformer['columns']
method = transformer['method']
if method == 'onehot':
for col in columns:
# Get unique values
unique_values = df.select(col).distinct().collect()
unique_values = [row[col] for row in unique_values]
# Create one-hot columns
for value in unique_values:
df = df.withColumn(
f"{col}_{value}",
F.when(F.col(col) == value, 1).otherwise(0)
)
elif method == 'label':
for col in columns:
# Create label encoding
distinct_values = df.select(col).distinct().collect()
value_to_idx = {
row[col]: idx
for idx, row in enumerate(distinct_values)
}
# Apply encoding
for value, idx in value_to_idx.items():
df = df.withColumn(
f"{col}_encoded",
F.when(F.col(col) == value, idx).otherwise(
F.col(f"{col}_encoded")
)
)
return df
def _transform_time(self, df, transformer: Dict):
"""Extract time-based features."""
col = transformer['column']
features = transformer['features']
# Convert to timestamp
df = df.withColumn(
f"{col}_timestamp",
F.to_timestamp(F.col(col))
)
if 'hour' in features:
df = df.withColumn(
f"{col}_hour",
F.hour(F.col(f"{col}_timestamp"))
)
if 'dayofweek' in features:
df = df.withColumn(
f"{col}_dayofweek",
F.dayofweek(F.col(f"{col}_timestamp"))
)
if 'month' in features:
df = df.withColumn(
f"{col}_month",
F.month(F.col(f"{col}_timestamp"))
)
if 'year' in features:
df = df.withColumn(
f"{col}_year",
F.year(F.col(f"{col}_timestamp"))
)
if 'is_weekend' in features:
df = df.withColumn(
f"{col}_is_weekend",
F.when(
F.dayofweek(F.col(f"{col}_timestamp")).isin([1, 7]),
1
).otherwise(0)
)
return df
def _transform_aggregation(self, df, transformer: Dict):
"""Apply aggregation transformations."""
group_cols = transformer['group_columns']
agg_cols = transformer['agg_columns']
agg_funcs = transformer['agg_functions']
# Define aggregation
agg_exprs = []
for col in agg_cols:
for func in agg_funcs:
if func == 'mean':
agg_exprs.append(F.mean(col).alias(f"{col}_mean"))
elif func == 'std':
agg_exprs.append(F.stddev(col).alias(f"{col}_std"))
elif func == 'min':
agg_exprs.append(F.min(col).alias(f"{col}_min"))
elif func == 'max':
agg_exprs.append(F.max(col).alias(f"{col}_max"))
elif func == 'count':
agg_exprs.append(F.count(col).alias(f"{col}_count"))
# Perform aggregation
agg_df = df.groupBy(group_cols).agg(*agg_exprs)
# Join with original dataframe
df = df.join(agg_df, on=group_cols, how='left')
return df
Feature Engineering Pipeline
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.decomposition import PCA
class FeatureEngineeringPipeline:
"""End-to-end feature engineering pipeline."""
def __init__(self):
self.steps = []
self.fitted = False
self.scalers = {}
self.encoders = {}
self.feature_names = []
def add_step(self, step_type: str, **kwargs):
"""Add a transformation step."""
self.steps.append({
'type': step_type,
'params': kwargs
})
return self
def fit_transform(self, X: pd.DataFrame,
y: pd.Series = None) -> pd.DataFrame:
"""Fit and transform the data."""
result = X.copy()
for step in self.steps:
result = self._apply_step(result, step, fit=True, y=y)
self.fitted = True
self.feature_names = list(result.columns)
return result
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
"""Transform data using fitted pipeline."""
if not self.fitted:
raise ValueError("Pipeline not fitted. Call fit_transform first.")
result = X.copy()
for step in self.steps:
result = self._apply_step(result, step, fit=False)
return result
def _apply_step(self, df: pd.DataFrame, step: Dict,
fit: bool = True, y: pd.Series = None) -> pd.DataFrame:
"""Apply a single transformation step."""
step_type = step['type']
params = step['params']
if step_type == 'missing_value':
return self._handle_missing(df, params, fit)
elif step_type == 'numeric_scaling':
return self._scale_numeric(df, params, fit)
elif step_type == 'categorical_encoding':
return self._encode_categorical(df, params, fit)
elif step_type == 'feature_creation':
return self._create_features(df, params)
elif step_type == 'feature_selection':
return self._select_features(df, params, fit, y)
elif step_type == 'dimensionality_reduction':
return self._reduce_dimensions(df, params, fit)
elif step_type == 'outlier_handling':
return self._handle_outliers(df, params, fit)
else:
raise ValueError(f"Unknown step type: {step_type}")
def _handle_missing(self, df: pd.DataFrame, params: Dict,
fit: bool = True) -> pd.DataFrame:
"""Handle missing values."""
method = params.get('method', 'median')
columns = params.get('columns', df.columns.tolist())
for col in columns:
if col not in df.columns:
continue
if fit:
if method == 'median':
fill_value = df[col].median()
elif method == 'mean':
fill_value = df[col].mean()
elif method == 'mode':
fill_value = df[col].mode()[0] if not df[col].mode().empty else 0
else:
fill_value = 0
self.scalers[f'missing_{col}'] = fill_value
fill_value = self.scalers.get(f'missing_{col}', 0)
df[col] = df[col].fillna(fill_value)
return df
def _scale_numeric(self, df: pd.DataFrame, params: Dict,
fit: bool = True) -> pd.DataFrame:
"""Scale numeric features."""
method = params.get('method', 'standard')
columns = params.get('columns',
df.select_dtypes(include=['int64', 'float64']).columns.tolist()
)
for col in columns:
if col not in df.columns:
continue
if fit:
if method == 'standard':
mean_val = df[col].mean()
std_val = df[col].std()
self.scalers[f'scale_{col}'] = (mean_val, std_val)
elif method == 'minmax':
min_val = df[col].min()
max_val = df[col].max()
self.scalers[f'scale_{col}'] = (min_val, max_val)
if f'scale_{col}' in self.scalers:
if method == 'standard':
mean_val, std_val = self.scalers[f'scale_{col}']
df[col] = (df[col] - mean_val) / (std_val + 1e-8)
elif method == 'minmax':
min_val, max_val = self.scalers[f'scale_{col}']
df[col] = (df[col] - min_val) / (max_val - min_val + 1e-8)
return df
def _encode_categorical(self, df: pd.DataFrame, params: Dict,
fit: bool = True) -> pd.DataFrame:
"""Encode categorical features."""
method = params.get('method', 'onehot')
columns = params.get('columns',
df.select_dtypes(include=['object', 'category']).columns.tolist()
)
for col in columns:
if col not in df.columns:
continue
if fit:
if method == 'label':
le = LabelEncoder()
le.fit(df[col].astype(str))
self.encoders[f'encode_{col}'] = le
elif method == 'onehot':
unique_values = df[col].unique().tolist()
self.encoders[f'encode_{col}'] = unique_values
if method == 'label':
le = self.encoders.get(f'encode_{col}')
if le:
df[col] = le.transform(df[col].astype(str))
elif method == 'onehot':
unique_values = self.encoders.get(f'encode_{col}', [])
for value in unique_values:
df[f'{col}_{value}'] = (df[col] == value).astype(int)
df = df.drop(columns=[col])
return df
def _create_features(self, df: pd.DataFrame, params: Dict) -> pd.DataFrame:
"""Create new features."""
feature_type = params.get('type', 'interaction')
if feature_type == 'interaction':
columns = params.get('columns', df.columns.tolist()[:5])
for i, col1 in enumerate(columns):
for col2 in columns[i+1:]:
if col1 in df.columns and col2 in df.columns:
if df[col1].dtype in ['int64', 'float64'] and \
df[col2].dtype in ['int64', 'float64']:
df[f'{col1}_x_{col2}'] = df[col1] * df[col2]
df[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-8)
elif feature_type == 'polynomial':
columns = params.get('columns', df.columns.tolist()[:5])
degree = params.get('degree', 2)
for col in columns:
if col in df.columns and df[col].dtype in ['int64', 'float64']:
for d in range(2, degree + 1):
df[f'{col}_pow{d}'] = df[col] ** d
elif feature_type == 'log':
columns = params.get('columns', df.columns.tolist()[:5])
for col in columns:
if col in df.columns and df[col].dtype in ['int64', 'float64']:
df[f'{col}_log'] = np.log1p(df[col].clip(lower=0))
return df
def _select_features(self, df: pd.DataFrame, params: Dict,
fit: bool = True, y: pd.Series = None) -> pd.DataFrame:
"""Select top features."""
method = params.get('method', 'kbest')
k = params.get('k', 10)
if y is None:
return df
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
if len(numeric_cols) <= k:
return df
if fit:
selector = SelectKBest(f_classif, k=k)
selector.fit(df[numeric_cols], y)
selected_features = [
col for col, selected
in zip(numeric_cols, selector.get_support())
if selected
]
self.scalers['selected_features'] = selected_features
selected_features = self.scalers.get('selected_features', numeric_cols[:k])
return df[selected_features +
[c for c in df.columns if c not in numeric_cols]]
def _reduce_dimensions(self, df: pd.DataFrame, params: Dict,
fit: bool = True) -> pd.DataFrame:
"""Reduce dimensionality using PCA."""
n_components = params.get('n_components', 10)
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
if len(numeric_cols) <= n_components:
return df
if fit:
pca = PCA(n_components=n_components)
pca.fit(df[numeric_cols])
self.scalers['pca'] = pca
pca = self.scalers.get('pca')
if pca:
pca_result = pca.transform(df[numeric_cols])
pca_df = pd.DataFrame(
pca_result,
columns=[f'pca_{i}' for i in range(n_components)],
index=df.index
)
# Keep non-numeric columns
non_numeric = [c for c in df.columns if c not in numeric_cols]
df = pd.concat([df[non_numeric], pca_df], axis=1)
return df
def _handle_outliers(self, df: pd.DataFrame, params: Dict,
fit: bool = True) -> pd.DataFrame:
"""Handle outliers."""
method = params.get('method', 'clip')
columns = params.get('columns',
df.select_dtypes(include=['int64', 'float64']).columns.tolist()
)
for col in columns:
if col not in df.columns:
continue
if fit:
q1 = df[col].quantile(0.25)
q3 = df[col].quantile(0.75)
iqr = q3 - q1
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr
self.scalers[f'outlier_{col}'] = (lower_bound, upper_bound)
lower_bound, upper_bound = self.scalers.get(
f'outlier_{col}',
(df[col].min(), df[col].max())
)
if method == 'clip':
df[col] = df[col].clip(lower=lower_bound, upper=upper_bound)
elif method == 'remove':
df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
return df
β οΈ
Always fit transformers on training data only, then transform both training and test data. This prevents data leakage and ensures consistent transformations.
Data Augmentation
Image Data Augmentation
import numpy as np
from typing import List, Tuple
import random
class ImageAugmenter:
"""Data augmentation for image data."""
def __init__(self, augmentations: List[str] = None):
self.augmentations = augmentations or [
'horizontal_flip', 'vertical_flip', 'rotation',
'brightness', 'contrast', 'noise', 'crop'
]
def augment(self, image: np.ndarray,
n_augmentations: int = 1) -> List[np.ndarray]:
"""Apply random augmentations to image."""
augmented_images = [image.copy()]
for _ in range(n_augmentations):
aug_image = image.copy()
# Apply random subset of augmentations
n_transforms = random.randint(1, len(self.augmentations))
selected_augs = random.sample(self.augmentations, n_transforms)
for aug in selected_augs:
if aug == 'horizontal_flip':
aug_image = self._horizontal_flip(aug_image)
elif aug == 'vertical_flip':
aug_image = self._vertical_flip(aug_image)
elif aug == 'rotation':
aug_image = self._rotate(aug_image)
elif aug == 'brightness':
aug_image = self._adjust_brightness(aug_image)
elif aug == 'contrast':
aug_image = self._adjust_contrast(aug_image)
elif aug == 'noise':
aug_image = self._add_noise(aug_image)
elif aug == 'crop':
aug_image = self._random_crop(aug_image)
augmented_images.append(aug_image)
return augmented_images
def _horizontal_flip(self, image: np.ndarray) -> np.ndarray:
"""Random horizontal flip."""
if random.random() > 0.5:
return np.flipud(image).copy()
return image
def _vertical_flip(self, image: np.ndarray) -> np.ndarray:
"""Random vertical flip."""
if random.random() > 0.5:
return np.fliplr(image).copy()
return image
def _rotate(self, image: np.ndarray,
max_angle: int = 30) -> np.ndarray:
"""Random rotation."""
from scipy.ndimage import rotate as rotate_image
angle = random.uniform(-max_angle, max_angle)
return rotate_image(image, angle, reshape=False)
def _adjust_brightness(self, image: np.ndarray,
factor_range: Tuple[float, float] = (0.7, 1.3)) -> np.ndarray:
"""Adjust brightness."""
factor = random.uniform(*factor_range)
return np.clip(image * factor, 0, 255).astype(np.uint8)
def _adjust_contrast(self, image: np.ndarray,
factor_range: Tuple[float, float] = (0.7, 1.3)) -> np.ndarray:
"""Adjust contrast."""
factor = random.uniform(*factor_range)
mean = np.mean(image)
return np.clip((image - mean) * factor + mean, 0, 255).astype(np.uint8)
def _add_noise(self, image: np.ndarray,
std_range: Tuple[float, float] = (0, 25)) -> np.ndarray:
"""Add random noise."""
std = random.uniform(*std_range)
noise = np.random.normal(0, std, image.shape)
return np.clip(image + noise, 0, 255).astype(np.uint8)
def _random_crop(self, image: np.ndarray,
crop_ratio: float = 0.8) -> np.ndarray:
"""Random crop and resize."""
h, w = image.shape[:2]
new_h, new_w = int(h * crop_ratio), int(w * crop_ratio)
top = random.randint(0, h - new_h)
left = random.randint(0, w - new_w)
cropped = image[top:top+new_h, left:left+new_w]
# Resize back to original size
from PIL import Image
pil_image = Image.fromarray(cropped)
pil_image = pil_image.resize((w, h))
return np.array(pil_image)
class TextAugmenter:
"""Data augmentation for text data."""
def __init__(self, augmentations: List[str] = None):
self.augmentations = augmentations or [
'synonym_replacement', 'random_insertion',
'random_swap', 'random_deletion'
]
def augment(self, text: str,
n_augmentations: int = 1) -> List[str]:
"""Apply random augmentations to text."""
augmented_texts = [text]
for _ in range(n_augmentations):
aug_text = text
# Apply random augmentation
aug_method = random.choice(self.augmentations)
if aug_method == 'synonym_replacement':
aug_text = self._synonym_replacement(aug_text)
elif aug_method == 'random_insertion':
aug_text = self._random_insertion(aug_text)
elif aug_method == 'random_swap':
aug_text = self._random_swap(aug_text)
elif aug_method == 'random_deletion':
aug_text = self._random_deletion(aug_text)
augmented_texts.append(aug_text)
return augmented_texts
def _synonym_replacement(self, text: str,
n_replacements: int = 1) -> str:
"""Replace words with synonyms."""
# Simplified implementation
words = text.split()
for _ in range(n_replacements):
if words:
idx = random.randint(0, len(words) - 1)
# In production, use WordNet or similar
words[idx] = f"SYN_{words[idx]}"
return ' '.join(words)
def _random_insertion(self, text: str,
n_insertions: int = 1) -> str:
"""Randomly insert words."""
words = text.split()
for _ in range(n_insertions):
new_word = "INSERTED"
idx = random.randint(0, len(words))
words.insert(idx, new_word)
return ' '.join(words)
def _random_swap(self, text: str,
n_swaps: int = 1) -> str:
"""Randomly swap words."""
words = text.split()
for _ in range(n_swaps):
if len(words) >= 2:
idx1, idx2 = random.sample(range(len(words)), 2)
words[idx1], words[idx2] = words[idx2], words[idx1]
return ' '.join(words)
def _random_deletion(self, text: str,
p: float = 0.1) -> str:
"""Randomly delete words."""
words = text.split()
if len(words) == 1:
return text
new_words = [w for w in words if random.random() > p]
if not new_words:
return random.choice(words)
return ' '.join(new_words)
class TabularAugmenter:
"""Data augmentation for tabular data."""
def __init__(self):
pass
def augment(self, df: 'pd.DataFrame',
n_augmentations: int = 1,
method: str = 'smote') -> 'pd.DataFrame':
"""Augment tabular data."""
augmented_dfs = [df]
for _ in range(n_augmentations):
if method == 'smote':
aug_df = self._smote_augment(df)
elif method == 'gaussian':
aug_df = self._gaussian_augment(df)
elif method == 'mixup':
aug_df = self._mixup_augment(df)
else:
aug_df = df.sample(frac=0.1, replace=True)
augmented_dfs.append(aug_df)
import pandas as pd
return pd.concat(augmented_dfs, ignore_index=True)
def _smote_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
"""SMOTE-like augmentation."""
# Simplified SMOTE implementation
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
# Find minority class
if 'label' in df.columns:
class_counts = df['label'].value_counts()
minority_class = class_counts.idxmin()
minority_df = df[df['label'] == minority_class]
else:
minority_df = df.sample(n=max(1, len(df) // 10))
# Generate synthetic samples
synthetic_samples = []
for _ in range(len(minority_df)):
sample1 = minority_df.sample(1)
sample2 = minority_df.sample(1)
# Interpolate
alpha = random.random()
synthetic = sample1 * alpha + sample2 * (1 - alpha)
synthetic_samples.append(synthetic)
import pandas as pd
return pd.concat(synthetic_samples, ignore_index=True)
def _gaussian_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
"""Add Gaussian noise."""
numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
aug_df = df.copy()
for col in numeric_cols:
std = df[col].std() * 0.1
aug_df[col] = df[col] + np.random.normal(0, std, len(df))
return aug_df
def _mixup_augment(self, df: 'pd.DataFrame') -> 'pd.DataFrame':
"""Mixup augmentation."""
# Randomly pair samples and interpolate
indices = np.random.permutation(len(df))
half = len(df) // 2
sample1 = df.iloc[indices[:half]]
sample2 = df.iloc[indices[half:2*half]]
alpha = random.random() * 0.5 + 0.25
mixed = sample1 * alpha + sample2 * (1 - alpha)
return mixed
βΉοΈ
Data augmentation increases training data diversity without collecting new data. Use task-appropriate augmentations and validate that augmentations don't introduce label noise.
Data Quality Monitoring
Production Data Quality System
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
import json
@dataclass
class QualityCheck:
name: str
passed: bool
value: float
threshold: float
message: str
class DataQualityMonitor:
"""Monitor data quality in production."""
def __init__(self, config: Dict):
self.config = config
self.quality_history = []
def check_quality(self, df: pd.DataFrame,
reference_df: pd.DataFrame = None) -> Dict:
"""Run comprehensive quality checks."""
checks = []
# Completeness checks
checks.extend(self._check_completeness(df))
# Consistency checks
checks.extend(self._check_consistency(df))
# Validity checks
checks.extend(self._check_validity(df))
# Uniqueness checks
checks.extend(self._check_uniqueness(df))
# Freshness checks
checks.extend(self._check_freshness(df))
# Distribution checks (if reference provided)
if reference_df is not None:
checks.extend(self._check_distribution(df, reference_df))
# Calculate overall score
passed_checks = sum(1 for c in checks if c.passed)
total_checks = len(checks)
quality_score = passed_checks / total_checks if total_checks > 0 else 0
result = {
'timestamp': datetime.now().isoformat(),
'quality_score': quality_score,
'passed_checks': passed_checks,
'total_checks': total_checks,
'checks': [
{
'name': c.name,
'passed': c.passed,
'value': c.value,
'threshold': c.threshold,
'message': c.message
}
for c in checks
],
'status': 'passed' if quality_score >= 0.9 else 'failed'
}
self.quality_history.append(result)
return result
def _check_completeness(self, df: pd.DataFrame) -> List[QualityCheck]:
"""Check data completeness."""
checks = []
# Check missing values
missing_rates = df.isna().mean()
for col, rate in missing_rates.items():
threshold = self.config.get('missing_threshold', 0.1)
checks.append(QualityCheck(
name=f'completeness_{col}',
passed=rate <= threshold,
value=float(rate),
threshold=threshold,
message=f'Missing rate for {col}: {rate:.4f}'
))
# Check overall completeness
overall_missing = df.isna().mean().mean()
threshold = self.config.get('overall_missing_threshold', 0.05)
checks.append(QualityCheck(
name='overall_completeness',
passed=overall_missing <= threshold,
value=float(overall_missing),
threshold=threshold,
message=f'Overall missing rate: {overall_missing:.4f}'
))
return checks
def _check_consistency(self, df: pd.DataFrame) -> List[QualityCheck]:
"""Check data consistency."""
checks = []
# Check data types
for col in df.columns:
if df[col].dtype == 'object':
# Check for mixed types
unique_types = df[col].apply(type).nunique()
checks.append(QualityCheck(
name=f'consistency_{col}',
passed=unique_types <= 2,
value=float(unique_types),
threshold=2,
message=f'Type consistency for {col}: {unique_types} types'
))
return checks
def _check_validity(self, df: pd.DataFrame) -> List[QualityCheck]:
"""Check data validity."""
checks = []
# Check numeric ranges
for col in df.select_dtypes(include=['int64', 'float64']).columns:
# Check for infinities
inf_count = np.isinf(df[col]).sum()
checks.append(QualityCheck(
name=f'validity_inf_{col}',
passed=inf_count == 0,
value=float(inf_count),
threshold=0,
message=f'Infinities in {col}: {inf_count}'
))
# Check for extreme values
q1 = df[col].quantile(0.01)
q99 = df[col].quantile(0.99)
extreme_count = ((df[col] < q1) | (df[col] > q99)).sum()
extreme_rate = extreme_count / len(df)
checks.append(QualityCheck(
name=f'validity_extreme_{col}',
passed=extreme_rate < 0.05,
value=float(extreme_rate),
threshold=0.05,
message=f'Extreme values in {col}: {extreme_rate:.4f}'
))
return checks
def _check_uniqueness(self, df: pd.DataFrame) -> List[QualityCheck]:
"""Check data uniqueness."""
checks = []
# Check for duplicate rows
duplicate_rate = df.duplicated().mean()
checks.append(QualityCheck(
name='uniqueness_rows',
passed=duplicate_rate < 0.1,
value=float(duplicate_rate),
threshold=0.1,
message=f'Duplicate row rate: {duplicate_rate:.4f}'
))
return checks
def _check_freshness(self, df: pd.DataFrame) -> List[QualityCheck]:
"""Check data freshness."""
checks = []
# Find timestamp columns
timestamp_cols = [col for col in df.columns
if 'time' in col.lower() or 'date' in col.lower()]
for col in timestamp_cols:
try:
df[col] = pd.to_datetime(df[col])
max_timestamp = df[col].max()
hours_old = (datetime.now() - max_timestamp).total_seconds() / 3600
threshold = self.config.get('freshness_threshold_hours', 24)
checks.append(QualityCheck(
name=f'freshness_{col}',
passed=hours_old <= threshold,
value=float(hours_old),
threshold=threshold,
message=f'Data age for {col}: {hours_old:.2f} hours'
))
except:
pass
return checks
def _check_distribution(self, df: pd.DataFrame,
reference_df: pd.DataFrame) -> List[QualityCheck]:
"""Check data distribution against reference."""
from scipy.stats import ks_2samp
checks = []
common_cols = set(df.columns) & set(reference_df.columns)
for col in common_cols:
if df[col].dtype in ['int64', 'float64'] and \
reference_df[col].dtype in ['int64', 'float64']:
# KS test
stat, p_value = ks_2samp(
df[col].dropna(),
reference_df[col].dropna()
)
threshold = self.config.get('distribution_pvalue_threshold', 0.05)
checks.append(QualityCheck(
name=f'distribution_{col}',
passed=p_value > threshold,
value=float(p_value),
threshold=threshold,
message=f'Distribution shift for {col}: p={p_value:.4f}'
))
return checks
βΉοΈ
Data quality monitoring should be automated and integrated into your ML pipeline. Set up alerts for quality degradation and maintain quality dashboards for visibility.
Summary
ML data pipelines require comprehensive validation and transformation:
- Data Validation: Schema, statistical, and ML-specific checks
- Data Transformation: Scaling, encoding, feature engineering
- Data Augmentation: Task-appropriate data synthesis
- Quality Monitoring: Automated quality checks and alerting
Implement robust data pipelines to ensure reliable ML model performance.