πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Building a Data Lake on GCP with GCS and BigQuery

🟒 Free Lesson

Advertisement

Building a Data Lake on GCP with GCS and BigQuery

Data Lake Architecture on GCPBronze LayerRaw Data in GCSSilver LayerCleansed DataGold LayerBusiness DataBigQueryAnalytics & ReportingLookerBI & VisualizationLifecycle RulesCost OptimizationObject VersioningData ProtectionExternal TablesBigQuery IntegrationAuthorized ViewsData Access Control

Data Lake Architecture

A data lake is a centralized repository for storing structured, semi-structured, and unstructured data at any scale. On GCP, Cloud Storage serves as the foundation for data lakes.

Data Lake Layers

Bronze Layer (Raw):

  • Original data in native format
  • Immutable, append-only
  • Schema-on-read
  • Historical archive

Silver Layer (Cleansed):

  • Validated and cleansed data
  • Standardized formats
  • Deduplicated
  • Enriched with metadata

Gold Layer (Business):

  • Business-ready datasets
  • Aggregated and transformed
  • Optimized for analytics
  • Curated for specific use cases

Bucket Structure

Hierarchical Bucket Design

# Create bucket structure
gsutil mb -l us-central1 -c STANDARD gs://my-data-lake

# Create folder structure
gsutil mkdir gs://my-data-lake/bronze/raw/sales/2024/01/15
gsutil mkdir gs://my-data-lake/bronze/raw/sales/2024/01/16
gsutil mkdir gs://my-data-lake/silver/cleansed/sales
gsutil mkdir gs://my-data-lake/gold/curated/sales_daily_summary
gsutil mkdir gs://my-data-lake/looker/reports

Bucket Naming Conventions

πŸ—οΈ GCP Data Engineering Reference Architecture
DATA SOURCESπŸ—ƒοΈOn-Prem DB☁️SaaS APIsπŸ“‘IoT SensorsπŸ“±Mobile AppsπŸ”ŒREST APIsINGESTION LAYERDataflow (CDC)Pub/SubCloud TasksStorage TransferTransfer ApplianceRAW DATA ZONE (Cloud Storage)landing/Ingested databronze/Unvalidatedarchive/Historicalraw/Original formatstaging/Temp processingPROCESSING LAYERDataflowStream + BatchDataprocSpark/HadoopCloud FunctionsEvent-drivenData PrepVisual ETLCloud ComposerOrchestrateCURATED DATA ZONEsilver/Cleaned, validatedgold/Business-readyaggregates/Pre-computedfeatures/ML featuresBigQuery (Warehouse)Looker (BI)Vertex AI (ML)Data StudioDataplex
Interview Tip: GCP's data engineering stack is serverless-first. Dataflow (Apache Beam) handles both streaming and batch. BigQuery is the flagship analytics service.

Path Naming Patterns

# Path naming conventions
path_patterns = {
    'date_partitioned': 'gs://bucket/bronze/raw/{domain}/{yyyy}/{mm}/{dd}/',
    'hourly_partitioned': 'gs://bucket/bronze/raw/{domain}/{yyyy}/{mm}/{dd}/{hh}/',
    'monthly_partitioned': 'gs://bucket/silver/cleansed/{domain}/{yyyy}/{mm}/',
    'versioned': 'gs://bucket/gold/curated/{domain}/v{version}/',
    'archived': 'gs://bucket/archive/{domain}/{yyyy}/{mm}/{dd}/',
}

# Example paths
paths = {
    'sales_data': 'gs://my-data-lake/bronze/raw/sales/2024/01/15/',
    'user_events': 'gs://my-data-lake/bronze/raw/events/2024/01/15/10/',
    'daily_summary': 'gs://my-data-lake/gold/curated/sales_daily_summary/2024/01/',
}

Lifecycle Rules

Lifecycle rules automatically transition or delete objects based on age or other criteria.

⚠️ Cost Alert

Always monitor your BigQuery costs using INFORMATION_SCHEMA. Set up budget alerts at 50%, 80%, and 100% thresholds.

Cost Optimization Rules

# Create lifecycle configuration
cat > lifecycle.json << EOF
{
  "rule": [
    {
      "action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
      "condition": {"age": 30, "matchesPrefix": ["bronze/"]}
    },
    {
      "action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
      "condition": {"age": 90, "matchesPrefix": ["bronze/"]}
    },
    {
      "action": {"type": "SetStorageClass", "storageClass": "ARCHIVE"},
      "condition": {"age": 365, "matchesPrefix": ["bronze/"]}
    },
    {
      "action": {"type": "Delete"},
      "condition": {"age": 730, "matchesPrefix": ["bronze/raw/"]}
    },
    {
      "action": {"type": "SetStorageClass", "storageClass": "NEARLINE"},
      "condition": {"age": 60, "matchesPrefix": ["silver/"]}
    },
    {
      "action": {"type": "SetStorageClass", "storageClass": "COLDLINE"},
      "condition": {"age": 180, "matchesPrefix": ["silver/"]}
    }
  ]
}
EOF

# Apply lifecycle rules
gsutil lifecycle set lifecycle.json gs://my-data-lake

Lifecycle Rule Types

# Common lifecycle rule patterns
lifecycle_rules = {
    'transition_to_nearline': {
        'action': {'type': 'SetStorageClass', 'storageClass': 'NEARLINE'},
        'condition': {'age': 30}
    },
    'transition_to_coldline': {
        'action': {'type': 'SetStorageClass', 'storageClass': 'COLDLINE'},
        'condition': {'age': 90}
    },
    'transition_to_archive': {
        'action': {'type': 'SetStorageClass', 'storageClass': 'ARCHIVE'},
        'condition': {'age': 365}
    },
    'delete_old_data': {
        'action': {'type': 'Delete'},
        'condition': {'age': 730}
    },
    'abort_incomplete_multipart': {
        'action': {'type': 'AbortIncompleteMultipartUpload'},
        'condition': {'age': 7}
    }
}

# Apply lifecycle to specific prefixes
for prefix in ['bronze/', 'silver/', 'gold/']:
    bucket_name = f'gs://my-data-lake/{prefix}'
    print(f'Applying lifecycle to {bucket_name}')

Object Versioning

Object versioning protects against accidental deletion and provides data recovery capabilities.

Enabling Versioning

# Enable versioning on bucket
gsutil versioning set on gs://my-data-lake

# Check versioning status
gsutil versioning get gs://my-data-lake

# List object versions
gsutil ls -la gs://my-data-lake/bronze/raw/sales/

Versioning Best Practices

# Versioning configuration
versioning_config = {
    'enabled': True,
    'metadata': 'preserve',
    'temporary_holds': True,
    'object_retention': True
}

# Lifecycle rules for versioned objects
versioned_lifecycle = {
    'rule': [
        {
            'action': {'type': 'Delete'},
            'condition': {
                'age': 365,
                'isLive': False  # Only delete non-current versions
            }
        },
        {
            'action': {'type': 'Delete'},
            'condition': {
                'age': 90,
                'isLive': True  # Keep current versions for 90 days
            }
        }
    ]
}

BigQuery External Tables

External tables allow BigQuery to query data directly in Cloud Storage.

Creating External Tables

-- Create external table for CSV files
CREATE EXTERNAL TABLE `analytics.external_sales`
OPTIONS (
  format = 'CSV',
  uris = ['gs://my-data-lake/bronze/raw/sales/*.csv'],
  skip_leading_rows = 1,
  field_delimiter = ',',
  null_marker = '',
  ignore_unknown_values = true
);

-- Create external table for Parquet files
CREATE EXTERNAL TABLE `analytics.external_events`
OPTIONS (
  format = 'PARQUET',
  uris = ['gs://my-data-lake/bronze/raw/events/*.parquet']
);

-- Create external table for JSON files
CREATE EXTERNAL TABLE `analytics.external_logs`
OPTIONS (
  format = 'JSON',
  uris = ['gs://my-data-lake/bronze/raw/logs/*.json']
);

External Table Configuration

-- External table with partitioning
CREATE EXTERNAL TABLE `analytics.external_sales_partitioned`
PARTITION BY DATE(_PARTITIONTIME)
OPTIONS (
  format = 'PARQUET',
  uris = ['gs://my-data-lake/bronze/raw/sales/*']
);

-- Query external table with partition filter
SELECT *
FROM `analytics.external_sales_partitioned`
WHERE DATE(_PARTITIONTIME) = '2024-01-15';

-- Create table from external table
CREATE TABLE `analytics.sales`
AS
SELECT * FROM `analytics.external_sales`
WHERE amount > 0;

Authorized Views

Authorized views provide controlled access to data without granting direct table access.

Creating Authorized Views

-- Create authorized view
CREATE VIEW `analytics.sales_summary_view`
OPTIONS (
  description = 'Sales summary view for analysts',
  security = 'INVOKER'
)
AS
SELECT
  DATE(sale_date) as sale_date,
  region,
  product_category,
  COUNT(*) as transaction_count,
  SUM(amount) as total_amount
FROM `analytics.sales`
GROUP BY 1, 2, 3;

-- Grant access to the view
GRANT `roles/bigquery.dataViewer`
ON VIEW `analytics.sales_summary_view`
TO 'user:analyst@company.com';

-- Create dataset with authorized views
CREATE SCHEMA `analytics_views`
OPTIONS (
  description = 'Dataset for authorized views',
  access = [
    {'role': 'READER', 'specialGroup': 'projectReaders'},
    {'role': 'WRITER', 'specialGroup': 'projectWriters'},
    {'role': 'OWNER', 'specialGroup': 'projectOwners'}
  ]
);

View Security Patterns

# Security patterns for authorized views
security_patterns = {
    'row_level_security': """
        CREATE VIEW `analytics.filtered_view`
        AS
        SELECT * FROM `analytics.sales`
        WHERE region = 'US'
    """,
    'column_level_security': """
        CREATE VIEW `analytics.safe_view`
        AS
        SELECT
          sale_id,
          sale_date,
          region,
          amount
        FROM `analytics.sales`
        -- Exclude PII columns
    """,
    'aggregated_view': """
        CREATE VIEW `analytics.aggregated_view`
        AS
        SELECT
          DATE(sale_date) as sale_date,
          region,
          SUM(amount) as total_amount
        FROM `analytics.sales`
        GROUP BY 1, 2
    """
}

Data Ingestion Patterns

Batch Ingestion

# Batch ingestion with Dataflow
import apache_beam as beam

def run_batch_ingestion():
    """Batch ingestion pipeline."""
    with beam.Pipeline() as pipeline:
        (
            pipeline
            | 'Read from Source' >> beam.io.ReadFromText('gs://source-bucket/*.csv')
            | 'Parse CSV' >> beam.Map(parse_csv)
            | 'Validate Data' >> beam.Filter(validate_record)
            | 'Write to Bronze' >> beam.io.WriteToText(
                'gs://my-data-lake/bronze/raw/sales/2024/01/15/',
                file_name_suffix='.parquet'
            )
        )

Streaming Ingestion

# Streaming ingestion with Pub/Sub and Dataflow
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

def run_streaming_ingestion():
    """Streaming ingestion pipeline."""
    with beam.Pipeline() as pipeline:
        (
            pipeline
            | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(topic='projects/my-project/topics/sales')
            | 'Window' >> beam.WindowInto(FixedWindows(60))
            | 'Write to Bronze' >> beam.io.WriteToText(
                'gs://my-data-lake/bronze/raw/sales/',
                file_name_suffix='.parquet'
            )
        )

Data Quality

Data Validation

# Data quality validation
import pandas as pd
from great_expectations.core import ExpectationSuite, ExpectationConfiguration

def validate_data_quality(file_path):
    """Validate data quality."""
    df = pd.read_parquet(file_path)
    
    # Define expectations
    expectations = ExpectationSuite('sales_data')
    expectations.add_expectation(
        ExpectationConfiguration(
            expectation_type='expect_column_values_to_not_be_null',
            kwargs={'column': 'sale_id'}
        )
    )
    expectations.add_expectation(
        ExpectationConfiguration(
            expectation_type='expect_column_values_to_be_unique',
            kwargs={'column': 'sale_id'}
        )
    )
    expectations.add_expectation(
        ExpectationConfiguration(
            expectation_type='expect_column_values_to_be_between',
            kwargs={'column': 'amount', 'min_value': 0, 'max_value': 1000000}
        )
    )
    
    # Validate data
    result = df.validate(expectations)
    return result.success

Data Lineage

# Track data lineage
def track_lineage(source_path, destination_path, transformation_type):
    """Track data lineage."""
    lineage = {
        'source': source_path,
        'destination': destination_path,
        'transformation': transformation_type,
        'timestamp': datetime.now().isoformat(),
        'pipeline_id': 'pipeline_001'
    }
    
    # Store lineage metadata
    store_lineage_metadata(lineage)

Monitoring and Alerting

Monitor Data Lake Metrics

# Monitor data lake metrics
from google.cloud import monitoring_v3
import time

def monitor_data_lake(project_id, bucket_name):
    """Monitor data lake metrics."""
    client = monitoring_v3.MetricServiceClient()
    project_name = f"projects/{project_id}"
    
    # Monitor bucket size
    interval = monitoring_v3.TimeInterval()
    interval.end_time = time.time()
    interval.start_time = time.time() - 3600
    
    results = client.list_time_series(
        request={
            'name': project_name,
            'filter': f'resource.label.bucket_name = "{bucket_name}" AND metric.type = "storage.googleapis.com/storage/total_bytes"',
            'interval': interval,
            'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
        }
    )
    
    for result in results:
        print(f'Bucket size: {result.points[0].value.int64_value / 1024 / 1024 / 1024:.2f} GB')

Best Practices

  1. Implement proper layering - Bronze, Silver, Gold architecture
  2. Use lifecycle rules - Optimize storage costs automatically
  3. Enable versioning - Protect against data loss
  4. Create external tables - Enable direct querying in BigQuery
  5. Use authorized views - Control data access securely
  6. Monitor data quality - Implement validation and alerting
  7. Track data lineage - Maintain data provenance information
⭐

Premium Content

Building a Data Lake on GCP with GCS and BigQuery

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert GCP Data Engineering Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement