Training Infrastructure
Training infrastructure for large language models requires distributed computing strategies to handle massive datasets and model parameters across multiple GPUs.
Data Parallel Training
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
class DataParallelTrainer:
def __init__(self, model, rank, world_size):
self.rank = rank
self.world_size = world_size
self.setup_distributed()
self.model = DDP(model.to(rank), device_ids=[rank])
def setup_distributed(self):
dist.init_process_group("nccl", rank=self.rank, world_size=self.world_size)
torch.cuda.set_device(self.rank)
def create_dataloader(self, dataset, batch_size):
sampler = DistributedSampler(dataset, num_replicas=self.world_size, rank=self.rank)
return DataLoader(dataset, batch_size=batch_size, sampler=sampler)
def train_epoch(self, dataloader, optimizer, criterion):
self.model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.to(self.rank), target.to(self.rank)
optimizer.zero_grad()
output = self.model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
def cleanup(self):
dist.destroy_process_group()
# Launch with: torchrun --nproc_per_node=4 train.py
Tensor Parallel Training
class TensorParallelLayer:
def __init__(self, linear_layer, world_size, rank):
self.world_size = world_size
self.rank = rank
weight = linear_layer.weight.data
chunk_size = weight.shape[0] // world_size
self.local_weight = weight[rank * chunk_size:(rank + 1) * chunk_size].clone()
def forward(self, x):
local_output = torch.matmul(x, self.local_weight.T)
dist.all_reduce(local_output, op=dist.ReduceOp.SUM)
return local_output
class TensorParallelAttention:
def __init__(self, attention_layer, world_size, rank):
self.num_heads = attention_layer.num_heads
self.heads_per_gpu = self.num_heads // world_size
self.rank = rank
def split_attention(self, q, k, v):
head_dim = q.shape[-1] // self.num_heads
start = self.rank * self.heads_per_gpu
end = start + self.heads_per_gpu
q_split = q[:, :, start*head_dim:end*head_dim]
k_split = k[:, :, start*head_dim:end*head_dim]
v_split = v[:, :, start*head_dim:end*head_dim]
return q_split, k_split, v_split
# Usage
tp_layer = TensorParallelLayer(original_linear, world_size=4, rank=local_rank)
Pipeline Parallel Training
class PipelineParallelStage:
def __init__(self, model_layers, stage_id, num_stages):
self.layers = model_layers
self.stage_id = stage_id
self.num_stages = num_stages
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
class PipelineScheduler:
def __init__(self, stages, micro_batch_size):
self.stages = stages
self.micro_batch_size = micro_batch_size
def gpipe_schedule(self, batch):
micro_batches = torch.split(batch, self.micro_batch_size)
num_micro = len(micro_batches)
forward_schedule = []
backward_schedule = []
for step in range(num_micro + len(self.stages) - 1):
for stage_idx, stage in enumerate(self.stages):
fwd_step = step - stage_idx
if 0 <= fwd_step < num_micro:
forward_schedule.append((stage_idx, fwd_step))
bwd_step = step - stage_idx - len(self.stages) + 1
if 0 <= bwd_step < num_micro:
backward_schedule.append((stage_idx, bwd_step))
return forward_schedule, backward_schedule
# Usage
stages = [
PipelineParallelStage(model[:10], stage_id=0, num_stages=4),
PipelineParallelStage(model[10:20], stage_id=1, num_stages=4),
PipelineParallelStage(model[20:30], stage_id=2, num_stages=4),
PipelineParallelStage(model[30:], stage_id=3, num_stages=4)
]
scheduler = PipelineScheduler(stages, micro_batch_size=4)
Mixed Precision Training
from torch.cuda.amp import autocast, GradScaler
class MixedPrecisionTrainer:
def __init__(self, model, optimizer):
self.model = model
self.optimizer = optimizer
self.scaler = GradScaler()
def train_step(self, data, target):
self.optimizer.zero_grad()
with autocast():
output = self.model(data)
loss = torch.nn.functional.cross_entropy(output, target)
self.scaler.scale(loss).backward()
self.scaler.step(self.optimizer)
self.scaler.update()
return loss.item()
def train_epoch(self, dataloader):
self.model.train()
total_loss = 0
for data, target in dataloader:
data, target = data.cuda(), target.cuda()
total_loss += self.train_step(data, target)
return total_loss / len(dataloader)
# Usage
trainer = MixedPrecisionTrainer(model, optimizer)
loss = trainer.train_epoch(train_loader)
Key Takeaways
- Data parallelism replicates model across GPUs with different data
- Tensor parallelism splits individual layers across GPUs
- Pipeline parallelism splits model stages across GPUs
- FSDP shards parameters, gradients, and optimizer states
- Mixed precision reduces memory and increases speed with FP16/BF16