Vertex AI for ML Pipelines on GCP
Vertex AI Architecture
Vertex AI is Google Cloud's unified AI/ML platform that provides end-to-end tools for building, deploying, and managing ML models.
Core Components
Data Layer:
- BigQuery for structured data
- Cloud Storage for unstructured data
- Data labeling services
- Data versioning and lineage
Feature Store:
- Centralized feature repository
- Online and offline serving
- Feature versioning and monitoring
- Point-in-time correctness
Training:
- AutoML for automated model training
- Custom training with your own code
- Distributed training support
- Hyperparameter tuning
Model Registry:
- Model versioning and metadata
- Model lineage tracking
- A/B testing support
- Model approval workflows
Prediction:
- Online prediction endpoints
- Batch prediction jobs
- Auto-scaling and monitoring
- Explainable AI
AutoML
AutoML enables automated model training with minimal code.
Creating AutoML Models
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(
project='my-project',
location='us-central1',
staging_bucket='gs://my-staging-bucket'
)
# Create Tabular Dataset
dataset = aiplatform.TabularDataset.create(
display_name='sales-prediction',
bq_source='bq://my-project.analytics.sales_data'
)
# Train AutoML model
model = aiplatform.AutoMLTabularTrainingJob(
display_name='sales-prediction-model',
optimization_prediction_type='regression',
optimization_objective='minimize-rmse',
).run(
dataset=dataset,
target_column='amount',
training_fraction_split=0.8,
validation_fraction_split=0.1,
test_fraction_split=0.1,
budget_milli_node_hours=1000,
model_display_name='sales-prediction-v1',
disable_early_stopping=False,
)
AutoML Configuration
# AutoML training configuration
training_config = {
'display_name': 'sales-prediction-model',
'dataset_id': 'sales-dataset',
'target_column': 'amount',
'training_fraction_split': 0.8,
'validation_fraction_split': 0.1,
'test_fraction_split': 0.1,
'budget_milli_node_hours': 1000,
'model_display_name': 'sales-prediction-v1',
'optimization_prediction_type': 'regression',
'optimization_objective': 'minimize-rmse',
'early_stopping': True,
'additional_experiments': ['QUICK_DEPLOY'],
}
# Create and run training job
job = aiplatform.AutoMLTabularTrainingJob(
display_name=training_config['display_name'],
optimization_prediction_type=training_config['optimization_prediction_type'],
optimization_objective=training_config['optimization_objective'],
)
model = job.run(
dataset=aiplatform.TabularDataset(training_config['dataset_id']),
target_column=training_config['target_column'],
training_fraction_split=training_config['training_fraction_split'],
validation_fraction_split=training_config['validation_fraction_split'],
test_fraction_split=training_config['test_fraction_split'],
budget_milli_node_hours=training_config['budget_milli_node_hours'],
model_display_name=training_config['model_display_name'],
)
Custom Training
Custom training allows you to run your own ML code on Vertex AI.
Custom Training Job
from google.cloud import aiplatform
# Define training script
training_script = """
import argparse
import tensorflow as tf
from tensorflow import keras
def train_model(args):
# Load data
train_data = tf.data.experimental.make_csv_dataset(
args.train_data,
batch_size=args.batch_size,
label_name='amount'
)
# Build model
model = keras.Sequential([
keras.layers.Dense(64, activation='relu'),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dense(1)
])
model.compile(
optimizer='adam',
loss='mse',
metrics=['mae']
)
# Train model
model.fit(
train_data,
epochs=args.epochs,
validation_split=0.2
)
# Save model
model.save(args.model_dir)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--train-data', type=str, required=True)
parser.add_argument('--model-dir', type=str, required=True)
parser.add_argument('--batch-size', type=int, default=32)
parser.add_argument('--epochs', type=int, default=10)
args = parser.parse_args()
train_model(args)
"""
# Create custom training job
job = aiplatform.CustomTrainingJob(
display_name='custom-sales-model',
script_path='train.py',
container_uri='us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-12:latest',
requirements=['tensorflow>=2.12'],
model_serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-12:latest',
)
# Run training job
model = job.run(
replica_count=1,
machine_type='n1-standard-8',
accelerator_type='NVIDIA_TESLA_T4',
accelerator_count=1,
args=['--train-data', 'gs://my-bucket/train.csv', '--model-dir', 'gs://my-bucket/model'],
)
Custom Container Training
# Dockerfile for custom training
FROM us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-12:latest
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY train.py .
COPY config.yaml .
ENTRYPOINT ["python", "train.py"]
# config.yaml
training:
batch_size: 64
epochs: 20
learning_rate: 0.001
validation_split: 0.2
model:
architecture: "resnet50"
pretrained: true
fine_tune_layers: 10
Feature Store
Feature Store provides a centralized repository for storing, serving, and managing ML features.
Creating Feature Store
from google.cloud import aiplatform
# Create Feature Store
featurestore = aiplatform.Featurestore.create(
featurestore_id='sales_features',
online_store_fixed_node_count=3,
)
# Create Feature Group
feature_group = featurestore.create_feature_group(
feature_group_id='customer_features',
source=aiplatform.featurestore.FeatureGroupSource(
bigquery_source=aiplatform.featurestore.FeatureGroupSource.BigQuerySource(
input_uri='bq://my-project.analytics.customer_features'
)
),
)
# Create Features
feature_group.create_feature(
feature_id='customer_lifetime_value',
value_type='DOUBLE',
)
feature_group.create_feature(
feature_id='purchase_frequency',
value_type='DOUBLE',
)
# Ingest features
feature_group.ingest_from_bq(
feature_ids=['customer_lifetime_value', 'purchase_frequency'],
feature_time='feature_timestamp',
bq_source_uri='bq://my-project.analytics.customer_features',
)
Feature Serving
# Online feature serving
featurestore = aiplatform.Featurestore('sales_features')
# Get feature values
feature_values = featurestore.read(
entity_type_ids='customers',
feature_ids=['customer_lifetime_value', 'purchase_frequency'],
entity_ids=['customer_123', 'customer_456'],
)
# Batch feature serving
batch_features = featurestore.batch_serve(
entity_type_ids='customers',
feature_ids=['customer_lifetime_value', 'purchase_frequency'],
serving_feature_timestamp=aiplatform.featurestore.FeatureTimestamp(
feature_time='feature_timestamp'
),
)
ML Pipelines
ML Pipelines orchestrate the end-to-end ML workflow.
Creating ML Pipelines
from kfp import dsl
from google_cloud_pipeline_components.v1 import (
BigQueryDatasetCreateOp,
TabularTrainingJobPredictHyperparameterTuningOp,
ModelDeployOp,
)
@dsl.pipeline(
name='sales-prediction-pipeline',
description='End-to-end sales prediction pipeline',
)
def sales_prediction_pipeline(
project_id: str,
region: str,
dataset_id: str,
bq_source: str,
target_column: str,
):
# Create BigQuery Dataset
create_dataset = BigQueryDatasetCreateOp(
project=project_id,
dataset_id=dataset_id,
location='US',
)
# Train model with hyperparameter tuning
training_op = TabularTrainingJobPredictHyperparameterTuningOp(
project=project_id,
location=region,
display_name='sales-prediction-training',
dataset=create_dataset.outputs['dataset'],
target_column=target_column,
prediction_type='regression',
optimization_objective='minimize-rmse',
hyperparameter_ranges={
'learning_rate': dsl.V2HyperparameterRange(min_value=0.001, max_value=0.1),
'num_layers': dsl.V2HyperparameterRange(min_value=2, max_value=5),
},
max_trial_count=10,
parallel_trial_count=2,
search_algorithm='RANDOM',
)
# Deploy model to endpoint
deploy_op = ModelDeployOp(
model=training_op.outputs['model'],
endpoint=endpoint,
deployed_model_display_name='sales-prediction-deployed',
machine_type='n1-standard-4',
min_replica_count=1,
max_replica_count=3,
)
# Compile pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=sales_prediction_pipeline,
package_path='sales_prediction_pipeline.json',
)
Running ML Pipelines
from google.cloud import aiplatform
# Initialize Vertex AI
aiplatform.init(
project='my-project',
location='us-central1',
staging_bucket='gs://my-staging-bucket'
)
# Create pipeline job
job = aiplatform.PipelineJob(
display_name='sales-prediction-run',
template_path='sales_prediction_pipeline.json',
pipeline_root='gs://my-pipeline-root',
parameter_values={
'project_id': 'my-project',
'region': 'us-central1',
'dataset_id': 'sales_dataset',
'bq_source': 'bq://my-project.analytics.sales_data',
'target_column': 'amount',
},
)
# Run pipeline
job.submit()
# Wait for completion
job.wait()
# Check results
print(f'Pipeline state: {job.state}')
print(f'Pipeline resource name: {job.resource_name}')
Model Monitoring
Model monitoring detects data drift and model performance degradation.
Setting Up Monitoring
from google.cloud import aiplatform
# Deploy model with monitoring
endpoint = model.deploy(
deployed_model_display_name='sales-prediction-deployed',
machine_type='n1-standard-4',
min_replica_count=1,
max_replica_count=3,
monitoring_config=aiplatform.ModelDeploymentMonitoringConfig(
trigger_config=aiplatform.ModelDeploymentMonitoringTriggerConfig(
monitoring_interval_seconds=3600,
monitoring_alert_config=aiplatform.ModelDeploymentMonitoringAlertConfig(
email_alert_config=aiplatform.ModelDeploymentMonitoringAlertConfig.EmailAlertConfig(
user_emails=['ml-team@company.com']
)
),
),
data_sampling_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig(
training_dataset=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.TrainingDataset(
bq_source='bq://my-project.analytics.training_data'
),
training_prediction_skew_detection_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.TrainingPredictionSkewDetectionConfig(
skew_thresholds={
'feature_1': 0.3,
'feature_2': 0.3,
}
),
prediction_drift_detection_config=aiplatform.ModelDeploymentMonitoringConfig.DataSamplingConfig.PredictionDriftDetectionConfig(
drift_thresholds={
'feature_1': 0.3,
'feature_2': 0.3,
}
),
),
),
)
Monitoring Metrics
# Monitor model metrics
from google.cloud import monitoring_v3
import time
def monitor_model_performance(project_id, endpoint_id):
"""Monitor model performance metrics."""
client = monitoring_v3.MetricServiceClient()
project_name = f"projects/{project_id}"
# Monitor prediction latency
interval = monitoring_v3.TimeInterval()
interval.end_time = time.time()
interval.start_time = time.time() - 3600
results = client.list_time_series(
request={
'name': project_name,
'filter': f'resource.label.endpoint_id = "{endpoint_id}" AND metric.type = "aiplatform.googleapis.com/prediction/latency"',
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL
}
)
for result in results:
print(f'Average latency: {result.points[0].value.double_value}ms')
Explainable AI
Explainable AI helps understand model predictions.
Getting Explanations
from google.cloud import aiplatform
# Deploy model with explanations
endpoint = model.deploy(
deployed_model_display_name='sales-prediction-explainable',
machine_type='n1-standard-4',
min_replica_count=1,
max_replica_count=3,
explain_config=aiplatform.ExplainConfig(
featured_attributions=aiplatform.ExplainConfig.FeatureAttribution(
top_k_features=10,
),
),
)
# Get prediction with explanation
instances = [
{'feature_1': 1.0, 'feature_2': 2.0, 'feature_3': 3.0}
]
predictions = endpoint.predict(
instances=instances,
parameters={'excluded_features': ['feature_4']},
)
# Access explanations
for prediction in predictions.predictions:
explanation = prediction.explanation
print(f'Attribution scores: {explanation.feature_attributions}')
Batch Prediction
Batch prediction runs inference on large datasets.
Running Batch Prediction
from google.cloud import aiplatform
# Create batch prediction job
job = aiplatform.BatchPredictionJob.create(
display_name='sales-prediction-batch',
model_name=model.resource_name,
instances_format='bigquery',
predictions_format='bigquery',
input_config=aiplatform.BatchPredictionJob.InputConfig(
bigquery_source=aiplatform.BatchPredictionJob.InputConfig.BigQuerySource(
input_uri='bq://my-project.analytics.batch_input'
)
),
output_config=aiplatform.BatchPredictionJob.OutputConfig(
bigquery_destination_prefix='bq://my-project.analytics.batch_output'
),
model_parameters={'confidence_threshold': 0.5},
machine_type='n1-standard-4',
starting_replica_count=1,
max_replica_count=5,
)
# Wait for completion
job.wait()
# Check results
print(f'Job state: {job.state}')
print(f'Output location: {job.output_info.bigquery_output_table}')
Best Practices
- Use Feature Store - Centralize feature management
- Implement ML Pipelines - Automate end-to-end workflows
- Enable Model Monitoring - Detect drift and performance issues
- Use Explainable AI - Understand model predictions
- Optimize costs - Use appropriate machine types and auto-scaling
- Version models - Track model lineage and versions
- Implement A/B testing - Compare model performance