Cloud: AWS SageMaker + GCP Vertex AI
Overview
Cloud ML platforms provide managed infrastructure for building, training, and deploying machine learning models at scale. This lesson covers AWS SageMaker and Google Cloud Vertex AI — the two leading platforms for production ML. They abstract away infrastructure management, enabling data scientists to focus on modeling rather than ops.
Cloud ML Platforms Comparison
Why Cloud ML?
On-Premises vs Cloud:
On-Premises:
- Fixed capacity (buy hardware upfront)
- High upfront cost
- Maintenance burden
- Slow scaling
- Team manages everything
Cloud ML:
- Elastic capacity (scale up/down)
- Pay-per-use
- Managed infrastructure
- Auto-scaling
- Focus on ML, not ops
Platform Comparison
| Feature | AWS SageMaker | GCP Vertex AI |
|---|---|---|
| Managed Training | Yes | Yes |
| AutoML | Yes | Yes |
| Model Registry | Yes | Yes |
| Feature Store | Yes | Yes |
| Experiment Tracking | Yes | Yes |
| Pipelines | Yes | Yes |
| Edge Deployment | Yes | Yes |
| Custom Containers | Yes | Yes |
| Pricing Model | Per instance-hour | Per node-hour |
Architecture Comparison
AWS SageMaker:
Data (S3) --> Processing --> Training --> Model Registry --> Endpoint
| | |
Processing Jobs Training Jobs Hosting
GCP Vertex AI:
Data (GCS) --> Pipelines --> Training --> Model Registry --> Endpoint
| | |
Pipeline Runs Custom Jobs Predictions API
AWS SageMaker
Core Components
SageMaker Components:
+--------------------------------------------------------------+
| Studio IDE |
| +-- Notebooks |
| +-- Experiments |
| +-- Model Registry |
| +-- Pipelines |
+--------------------------------------------------------------+
| Built-in Algorithms |
| +-- XGBoost, Random Cut Forest |
| +-- Linear Learner, Factorization Machines |
| +-- K-Means, PCA, Object Detection |
| +-- Sequence-to-Sequence, BlazingText |
+--------------------------------------------------------------+
| Training & Tuning |
| +-- Managed Spot Training |
| +-- Automatic Model Tuning |
| +-- Distributed Training |
+--------------------------------------------------------------+
| Deployment |
| +-- Real-time Endpoints |
| +-- Serverless Inference |
| +-- Batch Transform |
| +-- Edge Deployment (Neo) |
+--------------------------------------------------------------+
SageMaker Python SDK
import sagemaker
import boto3
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.tuner import (
HyperparameterTuner,
IntegerParameter,
ContinuousParameter,
CategoricalParameter
)
# Session and role
sess = sagemaker.Session()
role = get_execution_role() # or specify ARN
bucket = sess.default_bucket()
# Upload data to S3
train_path = sess.upload_data(
path="data/train.csv",
bucket=bucket,
key_prefix="datasets/train"
)
# Define XGBoost estimator
xgb_estimator = XGBoost(
entry_point="train.py",
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
framework_version="1.5-1",
hyperparameters={
"objective": "binary:logistic",
"num_round": 100,
"max_depth": 5,
"eta": 0.2,
"eval_metric": "auc"
},
output_path=f"s3://{bucket}/models/",
sagemaker_session=sess
)
# Train
xgb_estimator.fit({"train": train_path})
# Deploy
predictor = xgb_estimator.deploy(
initial_instance_count=1,
instance_type="ml.t2.medium",
endpoint_name="xgboost-churn-prediction"
)
# Predict
import numpy as np
test_data = np.random.randn(10, 5).astype(np.float32)
predictions = predictor.predict(test_data)
print(predictions)
Hyperparameter Tuning
from sagemaker.tuner import (
HyperparameterTuner,
IntegerParameter,
ContinuousParameter,
CategoricalParameter
)
# Define hyperparameter ranges
hyperparameter_ranges = {
"max_depth": IntegerParameter(3, 10),
"eta": ContinuousParameter(0.01, 0.3),
"num_round": IntegerParameter(50, 500),
"subsample": ContinuousParameter(0.5, 1.0),
"colsample_bytree": ContinuousParameter(0.5, 1.0),
"min_child_weight": IntegerParameter(1, 10)
}
# Objective metric
objective_metric_name = "validation:auc"
# Create tuner
tuner = HyperparameterTuner(
estimator=xgb_estimator,
objective_metric_name=objective_metric_name,
hyperparameter_ranges=hyperparameter_ranges,
max_jobs=20,
max_parallel_jobs=4,
objective_type="Maximize",
early_stopping_type="Auto"
)
# Run tuning job
tuner.fit({"train": train_path, "validation": validation_path})
# Wait for completion
tuner.wait()
# Get best training job
best_job = tuner.best_training_job()
print(f"Best job: {best_job}")
# Deploy best model
best_predictor = tuner.deploy(
initial_instance_count=1,
instance_type="ml.t2.medium"
)
SageMaker Pipelines
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import (
ProcessingStep,
TrainingStep,
CreateModelStep
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.processing import ScriptProcessor
from sagemaker.sklearn.processing import SKLearnProcessor
# Define processing step
sklearn_processor = SKLearnProcessor(
framework_version="1.0-1",
role=role,
instance_type="ml.m5.xlarge",
instance_count=1
)
processing_step = ProcessingStep(
name="PreprocessData",
processor=sklearn_processor,
inputs=[
sagemaker.processing.ProcessingInput(
source=train_path,
destination="/opt/ml/processing/input"
)
],
outputs=[
sagemaker.processing.ProcessingOutput(
output_name="train",
source="/opt/ml/processing/output/train"
),
sagemaker.processing.ProcessingOutput(
output_name="validation",
source="/opt/ml/processing/output/validation"
)
],
code="preprocess.py"
)
# Define training step
training_step = TrainingStep(
name="TrainModel",
estimator=xgb_estimator,
inputs={
"train": sagemaker.inputs.TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig
.Outputs["train"].S3Output.S3Uri
),
"validation": sagemaker.inputs.TrainingInput(
s3_data=processing_step.properties.ProcessingOutputConfig
.Outputs["validation"].S3Output.S3Uri
)
}
)
# Define registration step
register_step = RegisterModel(
name="RegisterModel",
estimator=xgb_estimator,
model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name="ChurnPredictionModels"
)
# Create pipeline
pipeline = Pipeline(
name="ChurnPredictionPipeline",
steps=[processing_step, training_step, register_step],
sagemaker_session=sess
)
# Execute pipeline
execution = pipeline.start()
execution.wait()
GCP Vertex AI
Core Components
Vertex AI Components:
+--------------------------------------------------------------+
| Workbench (Notebooks) |
+--------------------------------------------------------------+
| Data Labeling |
| +-- Labeling for images, text, video, audio |
| +-- Managed workforce or auto-labeling |
+--------------------------------------------------------------+
| Training |
| +-- Custom Training |
| +-- AutoML (Tables, Image, Video, Text) |
| +-- Distributed Training |
| +-- Pre-built Containers |
+--------------------------------------------------------------+
| Model Management |
| +-- Model Registry |
| +-- Versioning |
| +-- Lineage tracking |
+--------------------------------------------------------------+
| Deployment |
| +-- Online Prediction |
| +-- Batch Prediction |
| +-- Edge TPU |
| +-- Pipelines (Kubeflow-based) |
+--------------------------------------------------------------+
Vertex AI Python SDK
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import kfp
from kfp import dsl
# Initialize Vertex AI
aiplatform.init(
project="my-project",
location="us-central1",
staging_bucket="gs://my-bucket"
)
# --- Custom Training Job ---
job = aiplatform.CustomTrainingJob(
display_name="churn-prediction-training",
script_path="train.py",
container_uri="us-docker.pkg.dev/vertex-ai/training/scikit-learn-gpu.1-0:latest",
requirements=["scikit-learn==1.0.2", "pandas==1.3.5"],
model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest"
)
# Train model
model = job.run(
replica_count=1,
machine_type="n1-standard-8",
args=["--train-path", "gs://my-bucket/data/train.csv"],
model_display_name="churn-prediction-model"
)
# Deploy to endpoint
endpoint = model.deploy(
deployed_model_display_name="churn-prediction-endpoint",
machine_type="n1-standard-4",
min_replica_count=1,
max_replica_count=3,
traffic_percentage=100
)
# Predict
import numpy as np
instances = np.random.randn(5, 10).tolist()
predictions = endpoint.predict(instances=instances)
print(predictions.predictions)
Vertex AI Pipelines
from kfp import dsl
from google.cloud import aiplatform
@dsl.pipeline(
name="churn-prediction-pipeline",
pipeline_root="gs://my-bucket/pipeline-root"
)
def churn_pipeline(
training_data_uri: str,
target_column: str = "churn"
):
# Step 1: Preprocess
preprocess_op = dsl.ContainerOp(
name="preprocess",
image="gcr.io/my-project/preprocess:latest",
arguments=[
"--input", training_data_uri,
"--output", "/tmp/preprocessed"
]
)
# Step 2: Train
train_op = dsl.ContainerOp(
name="train",
image="gcr.io/my-project/train:latest",
arguments=[
"--train-data", preprocess_op.outputs["output"],
"--target", target_column
]
)
# Step 3: Evaluate
evaluate_op = dsl.ContainerOp(
name="evaluate",
image="gcr.io/my-project/evaluate:latest",
arguments=[
"--model", train_op.outputs["model"],
"--test-data", preprocess_op.outputs["test"]
]
)
# Step 4: Register model (conditional)
with dsl.Condition(
evaluate_op.outputs["auc"] > 0.85
):
register_op = dsl.ContainerOp(
name="register",
image="gcr.io/my-project/register:latest",
arguments=[
"--model", train_op.outputs["model"],
"--metrics", evaluate_op.outputs["metrics"]
]
)
# Compile pipeline
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=churn_pipeline,
package_path="churn_pipeline.json"
)
# Run pipeline
job = aiplatform.PipelineJob(
display_name="churn-prediction-run",
template_path="churn_pipeline.json",
pipeline_root="gs://my-bucket/pipeline-root",
parameter_values={
"training_data_uri": "gs://my-bucket/data/train.csv"
}
)
job.submit()
job.wait()
Model Deployment Strategies
Deployment Patterns
Deployment Strategies:
1. Blue-Green Deployment:
Blue (current) Green (new)
[A] [B] [C] --> [A] [B] [C]
| |
v v
[A] [B] [C] [D] [E] [F] (new version)
Switch traffic gradually
2. Canary Deployment:
100% ----[A]----+
|
95% ----[A]--+ |
5% ----[B]--+-+ (canary)
Gradually increase new version traffic
3. Shadow Deployment:
Production: [A] --> Response to user
Shadow: [A] --> Response logged (not sent)
Compare performance without risk
4. Multi-Armed Bandit:
Auto-route more traffic to better version
Serverless Inference
# AWS SageMaker Serverless
from sagemaker.serverless import ServerlessInferenceConfig
serverless_config = ServerlessInferenceConfig(
memory_size_in_mb=2048,
max_concurrency=10
)
# Deploy serverless endpoint
predictor = xgb_estimator.deploy(
initial_instance_count=1,
instance_type="ml.t2.medium", # Required but not used
serverless_inference_config=serverless_config
)
# GCP Vertex AI - Online Prediction
from google.cloud import aiplatform
model = aiplatform.Model.upload(
display_name="my-model",
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-0:latest",
serving_container_predict_route="/predict",
serving_container_health_route="/health"
)
endpoint = model.deploy(
machine_type="n1-standard-4",
min_replica_count=0, # Scale to zero
max_replica_count=5
)
Batch Prediction
# AWS SageMaker Batch Transform
from sagemaker.sklearn.estimator import SKLearn
batch_transformer = xgb_estimator.transformer(
instance_count=1,
instance_type="ml.m5.xlarge",
strategy="MultiRecord",
max_payload=6,
accept="text/csv"
)
batch_transformer.transform(
data="s3://my-bucket/batch-input/",
content_type="text/csv",
split_type="Line"
)
batch_transformer.wait()
# GCP Vertex AI Batch Prediction
batch_prediction_job = model.batch_predict(
job_display_name="churn-batch-prediction",
gcs_source_input_uris=["gs://my-bucket/batch-input/"],
gcs_destination_output_uri_prefix="gs://my-bucket/batch-output/",
machine_type="n1-standard-4",
starting_replica_count=1,
max_replica_count=5
)
batch_prediction_job.wait()
Cost Optimization
Pricing Models
Cost Optimization Strategies:
1. Spot/Preemptible Instances:
- AWS: Spot instances (70-90% savings)
- GCP: Preemptible VMs (60-80% savings)
- Risk: Can be interrupted
2. Reserved Capacity:
- 1-year commitment: 30-40% savings
- 3-year commitment: 50-70% savings
3. Right-Sizing:
- Match instance type to workload
- Use auto-scaling
4. Auto-Scaling:
- Scale down during off-peak
- Scale to zero when not in use
Cost Optimization Implementation
class CloudMLOptimizer:
"""Cloud ML cost optimization toolkit."""
def __init__(self, provider='aws'):
self.provider = provider
def estimate_training_cost(
self,
instance_type: str,
hours: float,
use_spot: bool = False
) -> dict:
"""Estimate training job cost."""
# AWS pricing (approximate, us-east-1)
aws_prices = {
'ml.m5.xlarge': 0.23,
'ml.m5.2xlarge': 0.46,
'ml.p3.2xlarge': 3.83,
'ml.p3.8xlarge': 14.69,
'ml.g4dn.xlarge': 0.74,
'ml.g4dn.4xlarge': 2.72,
}
# GCP pricing (approximate, us-central1)
gcp_prices = {
'n1-standard-4': 0.19,
'n1-standard-8': 0.38,
'n1-standard-16': 0.77,
'n1-highmem-4': 0.26,
'n1-highmem-8': 0.52,
}
prices = aws_prices if self.provider == 'aws' else gcp_prices
hourly_rate = prices.get(instance_type, 0.23)
spot_discount = 0.7 if use_spot else 1.0
total_cost = hourly_rate * hours * spot_discount
return {
'instance_type': instance_type,
'hours': hours,
'hourly_rate': hourly_rate,
'spot_enabled': use_spot,
'estimated_cost': total_cost,
'savings_from_spot': hourly_rate * hours * 0.3 if use_spot else 0
}
def recommend_instance(
self,
memory_gb: float,
gpu_required: bool = False
) -> str:
"""Recommend instance type based on requirements."""
if gpu_required:
if memory_gb <= 16:
return 'ml.g4dn.xlarge' if self.provider == 'aws' else 'n1-standard-8'
elif memory_gb <= 32:
return 'ml.g4dn.2xlarge' if self.provider == 'aws' else 'n1-standard-16'
else:
return 'ml.p3.2xlarge' if self.provider == 'aws' else 'n1-highmem-16'
else:
if memory_gb <= 16:
return 'ml.m5.xlarge' if self.provider == 'aws' else 'n1-standard-4'
elif memory_gb <= 32:
return 'ml.m5.2xlarge' if self.provider == 'aws' else 'n1-standard-8'
else:
return 'ml.m5.4xlarge' if self.provider == 'aws' else 'n1-standard-16'
def auto_scale_config(
self,
min_replicas: int = 0,
max_replicas: int = 10,
target_latency_ms: int = 100
) -> dict:
"""Configure auto-scaling for endpoints."""
return {
'min_replicas': min_replicas,
'max_replicas': max_replicas,
'target_latency': target_latency_ms,
'scale_down_delay': '300s',
'scale_up_delay': '60s',
'metric': 'average_latency'
}
def optimize_hyperparameters(
self,
budget_hours: float,
time_per_trial_minutes: float = 30
) -> dict:
"""Optimize hyperparameter search within budget."""
max_trials = int((budget_hours * 60) / time_per_trial_minutes)
return {
'max_trials': max_trials,
'parallel_jobs': min(max_trials // 5, 4),
'early_stopping': True,
'objective': 'maximize'
}
# Usage example
optimizer = CloudMLOptimizer(provider='aws')
# Estimate costs
training_cost = optimizer.estimate_training_cost(
instance_type='ml.g4dn.xlarge',
hours=2.5,
use_spot=True
)
print(f"Training cost: ${training_cost['estimated_cost']:.2f}")
print(f"Savings from spot: ${training_cost['savings_from_spot']:.2f}")
# Get recommendation
instance = optimizer.recommend_instance(memory_gb=24, gpu_required=True)
print(f"Recommended instance: {instance}")
# Optimize HPO job
hpo_config = optimizer.optimize_hyperparameters(
budget_hours=10,
time_per_trial_minutes=15
)
print(f"Max trials: {hpo_config['max_trials']}")
Complete Example: End-to-End ML Pipeline
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.pipeline import PipelineModel
import boto3
import json
class SageMakerMLEndpoint:
"""Complete SageMaker ML pipeline."""
def __init__(self, region='us-east-1'):
self.sess = sagemaker.Session()
self.role = get_execution_role()
self.bucket = self.sess.default_bucket()
self.region = region
self.sm_client = boto3.client('sagemaker', region_name=region)
def upload_data(self, local_path, s3_prefix):
"""Upload data to S3."""
return self.sess.upload_data(
path=local_path,
bucket=self.bucket,
key_prefix=s3_prefix
)
def train_sklearn_model(self, train_path, test_path):
"""Train sklearn model."""
sklearn = SKLearn(
entry_point="sklearn_train.py",
role=self.role,
instance_count=1,
instance_type="ml.m5.xlarge",
framework_version="1.0-1",
hyperparameters={
"n_estimators": 100,
"max_depth": 5
},
sagemaker_session=self.sess
)
sklearn.fit({"train": train_path, "test": test_path})
return sklearn
def train_xgboost_with_tuning(self, train_path, val_path):
"""Train XGBoost with hyperparameter tuning."""
xgb = XGBoost(
entry_point="xgb_train.py",
role=self.role,
instance_count=1,
instance_type="ml.m5.xlarge",
framework_version="1.5-1",
output_path=f"s3://{self.bucket}/models/",
sagemaker_session=self.sess
)
tuner = HyperparameterTuner(
estimator=xgb,
objective_metric_name="validation:auc",
hyperparameter_ranges={
"max_depth": IntegerParameter(3, 10),
"eta": ContinuousParameter(0.01, 0.3),
"num_round": IntegerParameter(100, 500)
},
max_jobs=10,
max_parallel_jobs=2,
objective_type="Maximize"
)
tuner.fit({"train": train_path, "validation": val_path})
tuner.wait()
return tuner
def create_ensemble_model(self, models, weights=None):
"""Create ensemble from multiple models."""
if weights is None:
weights = [1/len(models)] * len(models)
model_data = []
for model, weight in zip(models, weights):
if hasattr(model, 'model_data'):
model_data.append({
'model_data': model.model_data,
'weight': weight
})
elif hasattr(model, 'best_training_job'):
model_data.append({
'model_data': model.best_training_job()['ModelArtifacts']['S3ModelArtifacts'],
'weight': weight
})
# Create pipeline model
pipeline_model = PipelineModel(
name="ensemble-model",
role=self.role,
models=[m['model_data'] for m in model_data]
)
return pipeline_model
def deploy_endpoint(self, model, instance_type='ml.t2.medium'):
"""Deploy model to endpoint."""
predictor = model.deploy(
initial_instance_count=1,
instance_type=instance_type,
endpoint_name=f"churn-prediction-{self.sess.timestamp()}"
)
return predictor
def monitor_endpoint(self, endpoint_name):
"""Monitor endpoint metrics."""
cloudwatch = boto3.client('cloudwatch', region_name=self.region)
response = cloudwatch.get_metric_statistics(
Namespace='AWS/SageMaker',
MetricName='Invocations',
Dimensions=[
{'Name': 'EndpointName', 'Value': endpoint_name},
{'Name': 'VariantName', 'Value': 'AllTraffic'}
],
StartTime=boto3.utils.timestamp.datetime.datetime.now() - timedelta(hours=1),
EndTime=boto3.utils.timestamp.datetime.datetime.now(),
Period=300,
Statistics=['Sum', 'Average']
)
return response['Datapoints']
# Usage
pipeline = SageMakerMLEndpoint()
# Upload data
train_path = pipeline.upload_data("data/train.csv", "datasets/train")
val_path = pipeline.upload_data("data/val.csv", "datasets/val")
# Train with tuning
tuner = pipeline.train_xgboost_with_tuning(train_path, val_path)
# Deploy best model
predictor = pipeline.deploy_endpoint(tuner)
# Monitor
metrics = pipeline.monitor_endpoint(predictor.endpoint_name)
print(f"Endpoint invocations: {metrics}")
Key Takeaways
📋Summary: Cloud ML Platforms
- Cloud ML platforms abstract infrastructure complexity — enabling data scientists to focus on modeling
- SageMaker is AWS's end-to-end ML platform with managed training, tuning, and deployment
- Vertex AI is GCP's unified ML platform with deep integration into BigQuery and TensorFlow ecosystem
- AutoML enables non-experts to build models — useful for baselines and rapid prototyping
- Pipelines automate ML workflows — enabling reproducible, version-controlled training
- Cost optimization requires spot instances, right-sizing, and auto-scaling — can reduce costs 50-80%
- Deployment strategies balance speed, safety, and cost — canary and blue-green reduce risk
Practice Exercises
Exercise 1: SageMaker Training
Train an XGBoost model on SageMaker with hyperparameter tuning. Compare spot vs on-demand pricing.
Exercise 2: Vertex AI AutoML
Use AutoML Tables to build a classification model. Evaluate performance metrics.
Exercise 3: Pipeline Automation
Build a complete ML pipeline with preprocessing, training, evaluation, and registration steps.
Exercise 4: Cost Analysis
Estimate costs for training and deploying a model on both platforms. Which is more cost-effective?
Discussion Questions
- When would you choose SageMaker over Vertex AI?
- How do you handle model drift in production?
- What are the security considerations for cloud ML?