πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

Training Infrastructure

🟒 Free Lesson

Advertisement

Training Infrastructure

Data ShardsDistributed DataAcross GPUsGPU 0Data ParallelGPU 1Data ParallelGPU 2Data ParallelGradient SyncAllReduceNCCL CommunicationTensor ParallelSplit LayersPipeline ParallelSplit StagesFSDPFull ShardingModel CheckpointDistributed StorageResume Training

Training infrastructure for large language models requires distributed computing strategies to handle massive datasets and model parameters across multiple GPUs.

Data Parallel Training

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

class DataParallelTrainer:
    def __init__(self, model, rank, world_size):
        self.rank = rank
        self.world_size = world_size
        self.setup_distributed()
        self.model = DDP(model.to(rank), device_ids=[rank])

    def setup_distributed(self):
        dist.init_process_group("nccl", rank=self.rank, world_size=self.world_size)
        torch.cuda.set_device(self.rank)

    def create_dataloader(self, dataset, batch_size):
        sampler = DistributedSampler(dataset, num_replicas=self.world_size, rank=self.rank)
        return DataLoader(dataset, batch_size=batch_size, sampler=sampler)

    def train_epoch(self, dataloader, optimizer, criterion):
        self.model.train()
        total_loss = 0
        for batch_idx, (data, target) in enumerate(dataloader):
            data, target = data.to(self.rank), target.to(self.rank)
            optimizer.zero_grad()
            output = self.model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        return total_loss / len(dataloader)

    def cleanup(self):
        dist.destroy_process_group()

# Launch with: torchrun --nproc_per_node=4 train.py

Tensor Parallel Training

class TensorParallelLayer:
    def __init__(self, linear_layer, world_size, rank):
        self.world_size = world_size
        self.rank = rank
        weight = linear_layer.weight.data
        chunk_size = weight.shape[0] // world_size
        self.local_weight = weight[rank * chunk_size:(rank + 1) * chunk_size].clone()

    def forward(self, x):
        local_output = torch.matmul(x, self.local_weight.T)
        dist.all_reduce(local_output, op=dist.ReduceOp.SUM)
        return local_output

class TensorParallelAttention:
    def __init__(self, attention_layer, world_size, rank):
        self.num_heads = attention_layer.num_heads
        self.heads_per_gpu = self.num_heads // world_size
        self.rank = rank

    def split_attention(self, q, k, v):
        head_dim = q.shape[-1] // self.num_heads
        start = self.rank * self.heads_per_gpu
        end = start + self.heads_per_gpu
        q_split = q[:, :, start*head_dim:end*head_dim]
        k_split = k[:, :, start*head_dim:end*head_dim]
        v_split = v[:, :, start*head_dim:end*head_dim]
        return q_split, k_split, v_split

# Usage
tp_layer = TensorParallelLayer(original_linear, world_size=4, rank=local_rank)

Pipeline Parallel Training

class PipelineParallelStage:
    def __init__(self, model_layers, stage_id, num_stages):
        self.layers = model_layers
        self.stage_id = stage_id
        self.num_stages = num_stages

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class PipelineScheduler:
    def __init__(self, stages, micro_batch_size):
        self.stages = stages
        self.micro_batch_size = micro_batch_size

    def gpipe_schedule(self, batch):
        micro_batches = torch.split(batch, self.micro_batch_size)
        num_micro = len(micro_batches)

        forward_schedule = []
        backward_schedule = []

        for step in range(num_micro + len(self.stages) - 1):
            for stage_idx, stage in enumerate(self.stages):
                fwd_step = step - stage_idx
                if 0 <= fwd_step < num_micro:
                    forward_schedule.append((stage_idx, fwd_step))
                bwd_step = step - stage_idx - len(self.stages) + 1
                if 0 <= bwd_step < num_micro:
                    backward_schedule.append((stage_idx, bwd_step))

        return forward_schedule, backward_schedule

# Usage
stages = [
    PipelineParallelStage(model[:10], stage_id=0, num_stages=4),
    PipelineParallelStage(model[10:20], stage_id=1, num_stages=4),
    PipelineParallelStage(model[20:30], stage_id=2, num_stages=4),
    PipelineParallelStage(model[30:], stage_id=3, num_stages=4)
]
scheduler = PipelineScheduler(stages, micro_batch_size=4)

Mixed Precision Training

from torch.cuda.amp import autocast, GradScaler

class MixedPrecisionTrainer:
    def __init__(self, model, optimizer):
        self.model = model
        self.optimizer = optimizer
        self.scaler = GradScaler()

    def train_step(self, data, target):
        self.optimizer.zero_grad()

        with autocast():
            output = self.model(data)
            loss = torch.nn.functional.cross_entropy(output, target)

        self.scaler.scale(loss).backward()
        self.scaler.step(self.optimizer)
        self.scaler.update()

        return loss.item()

    def train_epoch(self, dataloader):
        self.model.train()
        total_loss = 0
        for data, target in dataloader:
            data, target = data.cuda(), target.cuda()
            total_loss += self.train_step(data, target)
        return total_loss / len(dataloader)

# Usage
trainer = MixedPrecisionTrainer(model, optimizer)
loss = trainer.train_epoch(train_loader)

Key Takeaways

  • Data parallelism replicates model across GPUs with different data
  • Tensor parallelism splits individual layers across GPUs
  • Pipeline parallelism splits model stages across GPUs
  • FSDP shards parameters, gradients, and optimizer states
  • Mixed precision reduces memory and increases speed with FP16/BF16
⭐

Premium Content

Training Infrastructure

Unlock this lesson and 900+ advanced tutorials with a Premium plan.

🎯End-to-end Projects
πŸ’ΌInterview Prep
πŸ“œCertificates
🀝Community Access

Already a member? Log in

Need Expert Generative AI Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement