🎉 75% of content is free forever — Unlock Premium from $10/mo →
CW
Search courses…
💼 Servicesℹ️ About✉️ ContactView Pricing Plansfrom $10

Data Cleaning: Missing Values, Imputation, Data Quality

Data Science Interview PremiumData Cleaning⭐ Premium

Advertisement

GOOGLE & MICROSOFT INTERVIEW QUESTION

Data Cleaning: Missing Values, Imputation, Data Quality

Data Preparation & Quality

The Interview Question

ℹ️

Question: You receive a dataset with 30% missing values across multiple columns, inconsistent date formats, duplicate records, and values outside expected ranges. Walk through your complete data cleaning pipeline:

  1. How do you identify and categorize different types of data quality issues?
  2. What imputation strategies would you use for different types of missing data?
  3. How do you handle duplicate records?
  4. How do you validate your cleaning process?

Detailed Answer

1. Data Quality Assessment Framework

Before cleaning, you must understand the nature and extent of data quality issues.

import pandas as pd
import numpy as np
from datetime import datetime
import re
from typing import Dict, List, Tuple
import warnings
warnings.filterwarnings('ignore')

class DataQualityAssessor:
    """Comprehensive data quality assessment"""
    
    def __init__(self, df):
        self.df = df
        self.issues = []
        self.quality_score = 0
    
    def assess_completeness(self) -> Dict:
        """Assess missing value patterns"""
        missing = self.df.isnull().sum()
        missing_pct = (missing / len(self.df) * 100).round(2)
        
        completeness_issues = {
            'total_cells': self.df.size,
            'missing_cells': missing.sum(),
            'completeness_rate': 1 - missing.sum() / self.df.size,
            'columns_with_missing': (missing > 0).sum(),
            'critical_columns': missing_pct[missing_pct > 50].to_dict(),
            'high_missing': missing_pct[(missing_pct > 20) & (missing_pct <= 50)].to_dict(),
            'medium_missing': missing_pct[(missing_pct > 5) & (missing_pct <= 20)].to_dict(),
            'low_missing': missing_pct[(missing_pct > 0) & (missing_pct <= 5)].to_dict()
        }
        
        # MCAR test (simplified)
        missing_matrix = self.df.isnull().astype(int)
        mcar_correlations = missing_matrix.corrwith(missing_matrix.sum(axis=1))
        completeness_issues['mcar_likelihood'] = 'High' if mcar_correlations.abs().mean() < 0.1 else 'Low'
        
        return completeness_issues
    
    def assess_consistency(self) -> Dict:
        """Check for consistency issues"""
        consistency_issues = {}
        
        # Check for inconsistent formats in string columns
        string_cols = self.df.select_dtypes(include=['object']).columns
        for col in string_cols:
            # Check for mixed case
            unique_vals = self.df[col].dropna().unique()
            if len(unique_vals) > 0:
                has_upper = any(str(v)[0].isupper() for v in unique_vals[:100])
                has_lower = any(str(v)[0].islower() for v in unique_vals[:100])
                if has_upper and has_lower:
                    consistency_issues[f'{col}_case'] = 'Mixed case detected'
        
        # Check for date format inconsistencies
        date_cols = self.df.select_dtypes(include=['datetime64']).columns
        for col in date_cols:
            # Check for reasonable date ranges
            min_date = self.df[col].min()
            max_date = self.df[col].max()
            if min_date.year < 1900 or max_date.year > 2100:
                consistency_issues[f'{col}_range'] = f'Unusual date range: {min_date} to {max_date}'
        
        # Check for inconsistent categories
        for col in string_cols:
            unique_count = self.df[col].nunique()
            if unique_count < 20:  # Likely categorical
                # Check for similar values (fuzzy matching)
                unique_vals = self.df[col].dropna().unique()
                for i, v1 in enumerate(unique_vals[:50]):
                    for v2 in unique_vals[i+1:50]:
                        if self._fuzzy_match(str(v1), str(v2)) > 0.8:
                            consistency_issues[f'{col}_similar'] = f'Similar values: {v1} vs {v2}'
        
        return consistency_issues
    
    def _fuzzy_match(self, s1: str, s2: str) -> float:
        """Simple fuzzy string matching"""
        from difflib import SequenceMatcher
        return SequenceMatcher(None, s1.lower(), s2.lower()).ratio()
    
    def assess_accuracy(self) -> Dict:
        """Check for accuracy issues (values outside expected ranges)"""
        accuracy_issues = {}
        
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            data = self.df[col].dropna()
            if len(data) == 0:
                continue
            
            # Check for outliers using IQR
            Q1, Q3 = data.quantile(0.25), data.quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 3 * IQR
            upper_bound = Q3 + 3 * IQR
            
            outliers = data[(data < lower_bound) | (data > upper_bound)]
            if len(outliers) > 0:
                accuracy_issues[col] = {
                    'n_outliers': len(outliers),
                    'pct_outliers': len(outliers) / len(data) * 100,
                    'range': f'{data.min():.2f} to {data.max():.2f}',
                    'expected_range': f'{lower_bound:.2f} to {upper_bound:.2f}'
                }
        
        # Check for negative values where not expected
        for col in ['age', 'quantity', 'price', 'amount']:
            if col in self.df.columns:
                negatives = (self.df[col] < 0).sum()
                if negatives > 0:
                    accuracy_issues[f'{col}_negative'] = f'{negatives} negative values found'
        
        return accuracy_issues
    
    def assess_uniqueness(self) -> Dict:
        """Check for duplicate records"""
        uniqueness_issues = {}
        
        # Exact duplicates
        exact_dupes = self.df.duplicated().sum()
        uniqueness_issues['exact_duplicates'] = exact_dupes
        uniqueness_issues['exact_duplicate_pct'] = exact_dupes / len(self.df) * 100
        
        # Potential key columns (high uniqueness)
        potential_keys = []
        for col in self.df.columns:
            if self.df[col].nunique() / len(self.df) > 0.95:
                potential_keys.append(col)
        uniqueness_issues['potential_key_columns'] = potential_keys
        
        # Near duplicates (similar but not exact)
        if len(self.df) <= 10000:  # Only for smaller datasets
            numeric_cols = self.df.select_dtypes(include=[np.number]).columns[:5]
            if len(numeric_cols) > 1:
                from sklearn.metrics.pairwise import cosine_similarity
                sample = self.df[numeric_cols].dropna().head(1000)
                if len(sample) > 0:
                    sim_matrix = cosine_similarity(sample)
                    np.fill_diagonal(sim_matrix, 0)
                    high_sim_pairs = np.where(sim_matrix > 0.95)
                    uniqueness_issues['near_duplicate_pairs'] = len(high_sim_pairs[0])
        
        return uniqueness_issues
    
    def generate_quality_report(self) -> Dict:
        """Generate comprehensive quality report"""
        report = {
            'completeness': self.assess_completeness(),
            'consistency': self.assess_consistency(),
            'accuracy': self.assess_accuracy(),
            'uniqueness': self.assess_uniqueness()
        }
        
        # Calculate overall quality score
        completeness_score = report['completeness']['completeness_rate'] * 100
        consistency_score = max(0, 100 - len(report['consistency']) * 10)
        accuracy_score = max(0, 100 - sum(v['pct_outliers'] for v in report['accuracy'].values() if isinstance(v, dict)))
        uniqueness_score = max(0, 100 - report['uniqueness']['exact_duplicate_pct'])
        
        report['overall_score'] = {
            'completeness': completeness_score,
            'consistency': consistency_score,
            'accuracy': accuracy_score,
            'uniqueness': uniqueness_score,
            'overall': (completeness_score + consistency_score + accuracy_score + uniqueness_score) / 4
        }
        
        return report

# Example usage
# quality_assessor = DataQualityAssessor(df)
# quality_report = quality_assessor.generate_quality_report()

2. Missing Value Imputation Strategies

from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings('ignore')

class MissingValueHandler:
    """Comprehensive missing value handling"""
    
    def __init__(self, df):
        self.df = df
        self.imputation_log = {}
    
    def analyze_missing_patterns(self) -> Dict:
        """Analyze patterns in missing data"""
        missing = self.df.isnull()
        
        patterns = {
            'total_missing': missing.sum().sum(),
            'complete_rows': (missing.sum(axis=1) == 0).sum(),
            'rows_with_missing': (missing.sum(axis=1) > 0).sum(),
            'columns_with_missing': missing.any().sum(),
            'missing_by_column': missing.sum().to_dict(),
            'missing_percentage': (missing.sum() / len(self.df) * 100).to_dict()
        }
        
        # Check for patterns (MCAR, MAR, MNAR)
        if patterns['total_missing'] > 0:
            # Little's test approximation
            missing_corr = missing.astype(int).corr()
            patterns['missing_correlation'] = missing_corr.mean().mean()
            
            if patterns['missing_correlation'] < 0.1:
                patterns['likely_type'] = 'MCAR (Missing Completely at Random)'
            elif patterns['missing_correlation'] < 0.3:
                patterns['likely_type'] = 'MAR (Missing at Random)'
            else:
                patterns['likely_type'] = 'MNAR (Missing Not at Random)'
        
        return patterns
    
    def impute_numeric(self, columns: List[str], strategy: str = 'auto') -> pd.DataFrame:
        """Impute numeric columns"""
        df = self.df.copy()
        
        for col in columns:
            if col not in df.columns:
                continue
            
            missing_count = df[col].isnull().sum()
            if missing_count == 0:
                continue
            
            if strategy == 'auto':
                # Choose strategy based on data distribution
                skewness = df[col].dropna().skew()
                
                if abs(skewness) < 0.5:
                    # Normal distribution - use mean
                    imputer = SimpleImputer(strategy='mean')
                    df[col] = imputer.fit_transform(df[[col]])
                    self.imputation_log[col] = f'Mean imputation (skewness: {skewness:.2f})'
                else:
                    # Skewed distribution - use median
                    imputer = SimpleImputer(strategy='median')
                    df[col] = imputer.fit_transform(df[[col]])
                    self.imputation_log[col] = f'Median imputation (skewness: {skewness:.2f})'
            
            elif strategy == 'mean':
                imputer = SimpleImputer(strategy='mean')
                df[col] = imputer.fit_transform(df[[col]])
                self.imputation_log[col] = 'Mean imputation'
            
            elif strategy == 'median':
                imputer = SimpleImputer(strategy='median')
                df[col] = imputer.fit_transform(df[[col]])
                self.imputation_log[col] = 'Median imputation'
            
            elif strategy == 'knn':
                # KNN imputation
                numeric_cols = df.select_dtypes(include=[np.number]).columns
                imputer = KNNImputer(n_neighbors=5)
                df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
                self.imputation_log[col] = 'KNN imputation'
            
            elif strategy == 'iterative':
                # MICE/Iterative imputation
                numeric_cols = df.select_dtypes(include=[np.number]).columns
                imputer = IterativeImputer(
                    estimator=RandomForestRegressor(n_estimators=100, random_state=42),
                    max_iter=10,
                    random_state=42
                )
                df[numeric_cols] = imputer.fit_transform(df[numeric_cols])
                self.imputation_log[col] = 'Iterative (MICE) imputation'
        
        self.df = df
        return self
    
    def impute_categorical(self, columns: List[str], strategy: str = 'mode') -> pd.DataFrame:
        """Impute categorical columns"""
        df = self.df.copy()
        
        for col in columns:
            if col not in df.columns:
                continue
            
            missing_count = df[col].isnull().sum()
            if missing_count == 0:
                continue
            
            if strategy == 'mode':
                mode_value = df[col].mode()[0] if not df[col].mode().empty else 'Unknown'
                df[col] = df[col].fillna(mode_value)
                self.imputation_log[col] = f'Mode imputation ({mode_value})'
            
            elif strategy == 'constant':
                df[col] = df[col].fillna('Missing')
                self.imputation_log[col] = 'Constant imputation (Missing)'
            
            elif strategy == 'category':
                df[col] = df[col].cat.add_categories('Missing').fillna('Missing')
                self.imputation_log[col] = 'New category imputation (Missing)'
        
        self.df = df
        return self
    
    def create_missing_indicators(self, columns: List[str]) -> pd.DataFrame:
        """Create indicator variables for missing values"""
        df = self.df.copy()
        
        for col in columns:
            if col in df.columns:
                missing_indicator = f'{col}_is_missing'
                df[missing_indicator] = df[col].isnull().astype(int)
                self.imputation_log[f'{missing_indicator}'] = 'Missing indicator created'
        
        self.df = df
        return self
    
    def impute_with_model(self, target_col: str, feature_cols: List[str]) -> pd.DataFrame:
        """Use model-based imputation"""
        df = self.df.copy()
        
        # Split into known and unknown
        known_mask = df[target_col].notnull()
        unknown_mask = df[target_col].isnull()
        
        if unknown_mask.sum() == 0:
            return self
        
        # Train model on known values
        X_known = df.loc[known_mask, feature_cols]
        y_known = df.loc[known_mask, target_col]
        
        # Use Random Forest
        model = RandomForestRegressor(n_estimators=100, random_state=42)
        model.fit(X_known, y_known)
        
        # Predict missing values
        X_unknown = df.loc[unknown_mask, feature_cols]
        predictions = model.predict(X_unknown)
        
        # Fill in missing values
        df.loc[unknown_mask, target_col] = predictions
        
        self.imputation_log[target_col] = f'Model-based imputation (R²: {model.score(X_known, y_known):.3f})'
        
        self.df = df
        return self

3. Duplicate Detection and Removal

class DuplicateHandler:
    """Handle duplicate records"""
    
    def __init__(self, df):
        self.df = df
        self.duplicate_log = {}
    
    def detect_exact_duplicates(self) -> pd.DataFrame:
        """Find exact duplicate rows"""
        duplicate_mask = self.df.duplicated(keep='first')
        
        self.duplicate_log['exact_duplicates'] = {
            'count': duplicate_mask.sum(),
            'percentage': duplicate_mask.sum() / len(self.df) * 100,
            'indices': self.df[duplicate_mask].index.tolist()
        }
        
        return self.df[duplicate_mask]
    
    def detect_partial_duplicates(self, key_columns: List[str]) -> pd.DataFrame:
        """Find duplicates based on key columns"""
        duplicate_mask = self.df.duplicated(subset=key_columns, keep='first')
        
        self.duplicate_log['partial_duplicates'] = {
            'key_columns': key_columns,
            'count': duplicate_mask.sum(),
            'percentage': duplicate_mask.sum() / len(self.df) * 100
        }
        
        return self.df[duplicate_mask]
    
    def detect_near_duplicates(self, columns: List[str], threshold: float = 0.95) -> List[Tuple]:
        """Find near-duplicate records using similarity"""
        from sklearn.feature_extraction.text import TfidfVectorizer
        from sklearn.metrics.pairwise import cosine_similarity
        
        # Combine columns into single string
        combined = self.df[columns].apply(lambda x: ' '.join(x.astype(str)), axis=1)
        
        # Vectorize
        vectorizer = TfidfVectorizer()
        tfidf_matrix = vectorizer.fit_transform(combined)
        
        # Calculate similarity (sample for large datasets)
        sample_size = min(1000, len(self.df))
        sample_idx = np.random.choice(len(self.df), sample_size, replace=False)
        sample_matrix = tfidf_matrix[sample_idx]
        
        similarity = cosine_similarity(sample_matrix)
        
        # Find high similarity pairs
        near_duplicates = []
        for i in range(sample_size):
            for j in range(i + 1, sample_size):
                if similarity[i, j] > threshold:
                    near_duplicates.append((
                        self.df.index[sample_idx[i]],
                        self.df.index[sample_idx[j]],
                        similarity[i, j]
                    ))
        
        self.duplicate_log['near_duplicates'] = {
            'count': len(near_duplicates),
            'threshold': threshold,
            'pairs': near_duplicates[:10]  # Sample
        }
        
        return near_duplicates
    
    def remove_duplicates(self, strategy: str = 'keep_first', 
                         key_columns: List[str] = None) -> pd.DataFrame:
        """Remove duplicates based on strategy"""
        df = self.df.copy()
        
        if strategy == 'keep_first':
            df = df.drop_duplicates(keep='first')
        
        elif strategy == 'keep_last':
            df = df.drop_duplicates(keep='last')
        
        elif strategy == 'keep_most_complete':
            # Keep row with fewest missing values
            df['missing_count'] = df.isnull().sum(axis=1)
            df = df.sort_values('missing_count').drop_duplicates(
                subset=key_columns if key_columns else df.columns.tolist(),
                keep='first'
            )
            df = df.drop('missing_count', axis=1)
        
        elif strategy == 'aggregate':
            # Aggregate duplicates
            if key_columns:
                agg_dict = {}
                for col in df.columns:
                    if col not in key_columns:
                        if df[col].dtype in ['int64', 'float64']:
                            agg_dict[col] = 'mean'
                        else:
                            agg_dict[col] = 'first'
                
                df = df.groupby(key_columns).agg(agg_dict).reset_index()
        
        self.duplicate_log['removal'] = {
            'strategy': strategy,
            'rows_before': len(self.df),
            'rows_after': len(df),
            'rows_removed': len(self.df) - len(df)
        }
        
        self.df = df
        return self
    
    def merge_duplicates(self, key_columns: List[str], 
                        value_columns: Dict[str, str]) -> pd.DataFrame:
        """Merge duplicate rows with custom aggregation"""
        df = self.df.copy()
        
        # Define aggregation
        agg_dict = {}
        for col, method in value_columns.items():
            if method == 'first':
                agg_dict[col] = 'first'
            elif method == 'last':
                agg_dict[col] = 'last'
            elif method == 'mean':
                agg_dict[col] = 'mean'
            elif method == 'sum':
                agg_dict[col] = 'sum'
            elif method == 'max':
                agg_dict[col] = 'max'
            elif method == 'min':
                agg_dict[col] = 'min'
        
        # Group and aggregate
        df_merged = df.groupby(key_columns).agg(agg_dict).reset_index()
        
        self.duplicate_log['merge'] = {
            'key_columns': key_columns,
            'value_columns': value_columns,
            'rows_before': len(df),
            'rows_after': len(df_merged)
        }
        
        self.df = df_merged
        return self

4. Data Type Conversion and Validation

class DataTypeHandler:
    """Handle data type conversions and validation"""
    
    def __init__(self, df):
        self.df = df
        self.conversion_log = {}
    
    def convert_dtypes(self, type_mapping: Dict[str, str]) -> pd.DataFrame:
        """Convert columns to specified types"""
        df = self.df.copy()
        
        for col, dtype in type_mapping.items():
            if col not in df.columns:
                continue
            
            try:
                if dtype == 'datetime':
                    # Try multiple date formats
                    date_formats = [
                        '%Y-%m-%d', '%m/%d/%Y', '%d/%m/%Y',
                        '%Y-%m-%d %H:%M:%S', '%m/%d/%Y %H:%M:%S'
                    ]
                    for fmt in date_formats:
                        try:
                            df[col] = pd.to_datetime(df[col], format=fmt)
                            self.conversion_log[col] = f'Datetime (format: {fmt})'
                            break
                        except ValueError:
                            continue
                
                elif dtype == 'numeric':
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                    self.conversion_log[col] = 'Numeric'
                
                elif dtype == 'category':
                    df[col] = df[col].astype('category')
                    self.conversion_log[col] = 'Category'
                
                elif dtype == 'string':
                    df[col] = df[col].astype(str)
                    self.conversion_log[col] = 'String'
                
                elif dtype == 'bool':
                    # Handle various boolean representations
                    bool_map = {
                        'true': True, 'false': False,
                        'yes': True, 'no': False,
                        '1': True, '0': False,
                        't': True, 'f': False
                    }
                    df[col] = df[col].str.lower().map(bool_map)
                    self.conversion_log[col] = 'Boolean'
            
            except Exception as e:
                self.conversion_log[col] = f'Conversion failed: {str(e)}'
        
        self.df = df
        return self
    
    def validate_ranges(self, validation_rules: Dict[str, Dict]) -> pd.DataFrame:
        """Validate values are within expected ranges"""
        df = self.df.copy()
        
        for col, rules in validation_rules.items():
            if col not in df.columns:
                continue
            
            if 'min' in rules:
                mask = df[col] < rules['min']
                if mask.any():
                    df.loc[mask, col] = np.nan
                    self.conversion_log[f'{col}_min_violation'] = f'{mask.sum()} values below minimum'
            
            if 'max' in rules:
                mask = df[col] > rules['max']
                if mask.any():
                    df.loc[mask, col] = np.nan
                    self.conversion_log[f'{col}_max_violation'] = f'{mask.sum()} values above maximum'
            
            if 'allowed_values' in rules:
                mask = ~df[col].isin(rules['allowed_values'])
                if mask.any():
                    df.loc[mask, col] = np.nan
                    self.conversion_log[f'{col}_invalid'] = f'{mask.sum()} invalid values'
        
        self.df = df
        return self
    
    def standardize_text(self, columns: List[str], 
                        operations: List[str] = None) -> pd.DataFrame:
        """Standardize text columns"""
        df = self.df.copy()
        
        if operations is None:
            operations = ['lowercase', 'strip', 'remove_special']
        
        for col in columns:
            if col not in df.columns:
                continue
            
            for op in operations:
                if op == 'lowercase':
                    df[col] = df[col].str.lower()
                
                elif op == 'uppercase':
                    df[col] = df[col].str.upper()
                
                elif op == 'title':
                    df[col] = df[col].str.title()
                
                elif op == 'strip':
                    df[col] = df[col].str.strip()
                
                elif op == 'remove_special':
                    df[col] = df[col].str.replace(r'[^a-zA-Z0-9\s]', '', regex=True)
                
                elif op == 'normalize_whitespace':
                    df[col] = df[col].str.replace(r'\s+', ' ', regex=True)
                
                elif op == 'fix_encoding':
                    # Fix common encoding issues
                    encoding_fixes = {
                        '’': "'", '“': '"', 'â€': '"',
                        'é': 'é', 'è': 'è', 'à ': 'à'
                    }
                    for wrong, right in encoding_fixes.items():
                        df[col] = df[col].str.replace(wrong, right)
            
            self.conversion_log[col] = f'Text standardized ({", ".join(operations)})'
        
        self.df = df
        return self

5. Complete Data Cleaning Pipeline

class DataCleaningPipeline:
    """Complete data cleaning pipeline"""
    
    def __init__(self, df):
        self.df = df
        self.cleaning_log = []
        self.quality_before = None
        self.quality_after = None
    
    def run_pipeline(self) -> pd.DataFrame:
        """Execute complete cleaning pipeline"""
        print("Starting Data Cleaning Pipeline")
        print("=" * 60)
        
        # Step 1: Initial assessment
        print("\nStep 1: Initial Quality Assessment")
        assessor = DataQualityAssessor(self.df)
        self.quality_before = assessor.generate_quality_report()
        print(f"Quality Score: {self.quality_before['overall_score']['overall']:.1f}/100")
        
        # Step 2: Remove exact duplicates
        print("\nStep 2: Removing Duplicates")
        dup_handler = DuplicateHandler(self.df)
        exact_dupes = dup_handler.detect_exact_duplicates()
        print(f"Found {len(exact_dupes)} exact duplicates")
        self.df = dup_handler.remove_duplicates(strategy='keep_most_complete')
        self.cleaning_log.append(f"Removed {len(exact_dupes)} exact duplicates")
        
        # Step 3: Standardize text
        print("\nStep 3: Standardizing Text")
        dtype_handler = DataTypeHandler(self.df)
        string_cols = self.df.select_dtypes(include=['object']).columns
        self.df = dtype_handler.standardize_text(string_cols)
        self.cleaning_log.append(f"Standardized {len(string_cols)} text columns")
        
        # Step 4: Convert data types
        print("\nStep 4: Converting Data Types")
        type_mapping = {
            'age': 'numeric',
            'signup_date': 'datetime',
            'is_active': 'bool'
        }
        self.df = dtype_handler.convert_dtypes(type_mapping)
        self.cleaning_log.append(f"Converted {len(type_mapping)} columns")
        
        # Step 5: Handle missing values
        print("\nStep 5: Handling Missing Values")
        missing_handler = MissingValueHandler(self.df)
        
        # Create missing indicators
        missing_cols = self.df.columns[self.df.isnull().any()].tolist()
        self.df = missing_handler.create_missing_indicators(missing_cols)
        
        # Impute numeric columns
        numeric_cols = self.df.select_dtypes(include=[np.number]).columns
        self.df = missing_handler.impute_numeric(numeric_cols, strategy='auto')
        
        # Impute categorical columns
        categorical_cols = self.df.select_dtypes(include=['object', 'category']).columns
        self.df = missing_handler.impute_categorical(categorical_cols, strategy='mode')
        
        self.cleaning_log.append(f"Imputed {len(missing_cols)} columns with missing values")
        
        # Step 6: Validate ranges
        print("\nStep 6: Validating Ranges")
        validation_rules = {
            'age': {'min': 0, 'max': 120},
            'total_purchases': {'min': 0},
            'total_spend': {'min': 0}
        }
        self.df = dtype_handler.validate_ranges(validation_rules)
        self.cleaning_log.append(f"Validated {len(validation_rules)} columns")
        
        # Step 7: Final assessment
        print("\nStep 7: Final Quality Assessment")
        assessor_final = DataQualityAssessor(self.df)
        self.quality_after = assessor_final.generate_quality_report()
        print(f"Quality Score: {self.quality_after['overall_score']['overall']:.1f}/100")
        
        # Summary
        print("\n" + "=" * 60)
        print("CLEANING SUMMARY")
        print("=" * 60)
        print(f"Rows: {len(self.df)}")
        print(f"Columns: {len(self.df.columns)}")
        print(f"Quality improvement: {self.quality_before['overall_score']['overall']:.1f} → {self.quality_after['overall_score']['overall']:.1f}")
        
        return self.df
    
    def generate_report(self) -> Dict:
        """Generate cleaning report"""
        return {
            'cleaning_steps': self.cleaning_log,
            'quality_before': self.quality_before,
            'quality_after': self.quality_after,
            'rows_processed': len(self.df),
            'columns_processed': len(self.df.columns)
        }

💡

Pro Tip: Always document your cleaning decisions. Future you (and your team) will thank you when you need to understand why certain transformations were applied.

6. Common Follow-Up Questions

Follow-up 1: How do you handle missing data in time series?

def clean_time_series_missing(df, date_column, value_column):
    """Handle missing values in time series data"""
    df = df.sort_values(date_column)
    
    # Method 1: Forward fill (for sequential data)
    df[f'{value_column}_ffill'] = df[value_column].ffill()
    
    # Method 2: Backward fill
    df[f'{value_column}_bfill'] = df[value_column].bfill()
    
    # Method 3: Interpolation
    df[f'{value_column}_interpolate'] = df[value_column].interpolate(method='time')
    
    # Method 4: Seasonal decomposition and fill
    from statsmodels.tsa.seasonal import seasonal_decompose
    
    # Fill with seasonal pattern
    if len(df) > 24:  # Need enough data for decomposition
        decomposition = seasonal_decompose(df[value_column].fillna(method='ffill'), model='additive', period=24)
        seasonal = decomposition.seasonal
        df[f'{value_column}_seasonal_fill'] = df[value_column].fillna(seasonal)
    
    # Method 5: Rolling statistics
    df[f'{value_column}_rolling_mean'] = df[value_column].rolling(window=24, min_periods=1).mean()
    
    return df

Follow-up 2: How do you handle data cleaning at scale?

# Distributed data cleaning with Spark
def clean_with_spark(spark_df):
    """Example of data cleaning with PySpark"""
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    # Remove duplicates
    spark_df = spark_df.dropDuplicates()
    
    # Handle missing values
    numeric_cols = [c for c, t in spark_df.dtypes if t in ['int', 'double']]
    for col in numeric_cols:
        spark_df = spark_df.withColumn(
            col,
            F.when(F.col(col).isNull(), F.mean(col).over(Window.partitionBy())).otherwise(F.col(col))
        )
    
    # Standardize text
    string_cols = [c for c, t in spark_df.dtypes if t == 'string']
    for col in string_cols:
        spark_df = spark_df.withColumn(col, F.lower(F.trim(F.col(col))))
    
    return spark_df

Company-Specific Tips

ℹ️

Google Tips:

  • Google values reproducible cleaning pipelines
  • Be prepared to discuss distributed data cleaning
  • Know how to handle data cleaning in BigQuery
  • Understand privacy-preserving cleaning techniques

Microsoft Tips:

  • Microsoft focuses on enterprise data quality
  • Know how to use Azure Data Factory for cleaning
  • Be comfortable with SQL-based cleaning
  • Understand data governance and lineage

Quiz Section


Related Topics

Advertisement