
IA dans plusieurs GPU : accumulation de gradient et parallélisme des données
fait partie d’une série sur l’IA distribuée sur plusieurs GPU :
Introduction
Le parallélisme des données distribuées (DDP) est la première méthode de parallélisation que nous examinerons. C’est l’approche de base qui est toujours utilisé dans les paramètres de formation distribuée, et il est généralement combiné avec d’autres techniques de parallélisation.
Un rafraîchissement rapide du réseau neuronal
Entraîner un réseau de neurones signifie exécuter une passe avant, calculer la perte, rétropropager les gradients de chaque poids par rapport à la fonction de perte et enfin mettre à jour les poids (ce que nous appelons une étape d’optimisation). Dans PyTorch, cela ressemble généralement à ceci :
import torch
def training_loop(
model: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = model(inputs) # Forward pass
loss = loss_fn(output, targets) # Compute loss
loss.backward() # Backward pass (compute gradients)
optimizer.step() # Update weights
optimizer.zero_grad() # Clear gradients for the next step
L’exécution de l’étape d’optimisation sur de grandes quantités de données d’entraînement donne généralement des estimations de gradient plus précises, conduisant à un entraînement plus fluide et à une convergence potentiellement plus rapide. Idéalement, nous franchirions donc chaque étape après avoir calculé les gradients en fonction de l’ensemble des données d’entraînement. En pratique, cela est rarement réalisable dans les scénarios de Deep Learning, car le calcul prendrait trop de temps. Au lieu de cela, nous travaillons avec de petits morceaux comme mini-lots et micro-lots.
- Lot: Fait référence à l’ensemble de l’ensemble de formation utilisé pour une étape d’optimisation.
- Mini-lot : Fait référence à un petit sous-ensemble de données d’entraînement utilisées pour une étape d’optimisation.
- Micro-lot : Fait référence à un sous-ensemble du mini-lot, nous combinons plusieurs micro-lots pour une étape d’optimisation.
C’est là que l’accumulation de gradient et le parallélisme des données entrent en jeu. Bien que nous n’utilisions pas l’intégralité de l’ensemble de données pour chaque étape, nous pouvons utiliser ces techniques pour augmenter considérablement la taille de notre mini-lot.
Accumulation de dégradé
Voici comment cela fonctionne : choisissez un gros mini-lot qui ne rentre pas dans la mémoire du GPU, puis divisez-le en micro-lots cela convient. Pour chaque micro-lot, effectuez des passes avant et arrière, en ajoutant (accumulant) les gradients calculés. Une fois tous les micro-lots traités, effectuez une seule étape d’optimisation en utilisant les gradients moyennés.
Remarquez que l’accumulation de gradient n’est pas une technique de parallélisation et ne nécessite pas plusieurs GPU.

La mise en œuvre de l’accumulation de dégradés à partir de zéro est simple. Voici à quoi cela ressemble dans une simple boucle d’entraînement :
import torch
def training_loop(
model: torch.nn.Module,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
grad_accum_steps: int,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = model(inputs)
loss = loss_fn(output, targets)
loss.backward() # Gradients get accumulated (summed)
# Only update weights after `grad_accum_steps` micro-batches
if (i+1) % grad_accum_steps == 0: # i+1 to avoid a step in the first iteration when i=0
optimizer.step()
optimizer.zero_grad()
Remarquez que nous sommes séquentiellement effectuer plusieurs passes avant et arrière avant chaque étape d’optimisation, ce qui nécessite des temps de formation plus longs. Ce serait bien si nous pouvions accélérer cela en traitant plusieurs micro-lots dans parallèle… c’est exactement ce que fait DDP !
Parallélisme de données distribuées (DDP)
Pour un nombre assez restreint de GPU (jusqu’à environ 8), le DDP évolue de manière presque linéaire, ce qui est optimal. Cela signifie que si vous doublez le nombre de GPU, vous pouvez presque réduire de moitié le temps de formation (nous en avons déjà parlé). Mise à l’échelle linéaire précédemment).
Avec DDP, plusieurs GPU travaillent ensemble pour traiter un mini-lot efficace plus grand, en gérant chaque micro-lot en parallèle. Le flux de travail ressemble à ceci :
- Répartissez le mini-lot sur les GPU.
- Chaque GPU exécute ses propres passes avant et arrière pour calculer les dégradés pour sa propre partition de données (micro-batch).
- Utilisez un Tout réduire opération (nous en avons déjà entendu parler dans Opérations collectives) pour faire la moyenne des gradients sur tous les GPU.
- Chaque GPU applique les mêmes mises à jour de poids, gardant les modèles parfaitement synchronisés.
Cela nous permet de nous entraîner avec des tailles de mini-lots efficaces beaucoup plus grandes, ce qui conduit à un entraînement plus stable et à une convergence potentiellement plus rapide.

Implémentation de DDP à partir de zéro dans PyTorch
Faisons-le étape par étape. Dans cette première itération, nous synchronisons uniquement les dégradés.
import torch
class DDPModelWrapper:
def __init__(self, model: torch.nn.Module):
self.model = model
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
def sync_gradients(self):
# Iterate over parameter matrices in the model
for param in self.model.parameters():
# Some parameters might be frozen and don't have gradients
if param.grad is not None:
# We sum and then divide since torch.distributed doesn't have an average operation
torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
# Assuming each GPU received an equally sized mini-batch, we can average
# the gradients dividing by the number of GPUs (aka world size)
# By default the loss function already averages over the mini-batch size
param.grad.data /= torch.distributed.get_world_size()
Avant de commencer la formation, nous avons évidemment besoin que notre modèle soit le même sur tous les GPU, sinon nous formerions des modèles différents ! Améliorons notre implémentation en vérifiant que tous les poids sont identiques lors de l’instanciation (si vous ne savez pas quels sont les rangs, vérifiez la premier article de blog de la série).
import torch
class DDPModelWrapper:
def __init__(self, model: torch.nn.Module):
self.model = model
for param in self.model.parameters():
# We create a new tensor so it can receive the broadcast
rank_0_param = param.data.clone()
# Initially rank_0_param contains the values for the current rank
torch.distributed.broadcast(rank_0_param, src=0)
# After the broadcast rank_0_param variable is overwritten with the parameters from rank_0
if not torch.equal(param.data, rank_0_param): # Now we compare rank_x with rank_0
raise ValueError("Model parameters are not the same across all processes.")
def __call__(self, *args, **kwargs):
return self.model(*args, **kwargs)
def sync_gradients(self):
for param in self.model.parameters():
if param.grad is not None:
torch.distributed.all_reduce(param.grad.data, op=torch.distributed.ReduceOp.SUM)
param.grad.data /= torch.distributed.get_world_size()
Combiner DDP avec GA
Vous pouvez combiner DDP avec GA pour obtenir des tailles de lots efficaces encore plus grandes. Ceci est particulièrement utile lorsque votre modèle est si grand que seuls quelques échantillons peuvent tenir par GPU.
Le principal avantage est réduction des frais de communication: au lieu de synchroniser les dégradés après chaque lot, vous ne synchronisez qu’une fois par grad_accum_steps lots. Cela signifie:
- Taille effective globale du lot =
num_gpus × micro_batch_size × grad_accum_steps - Moins de points de synchronisation = moins de temps passé sur la communication inter-GPU
Une boucle de formation utilisant notre DDPModelWrapper avec accumulation de dégradé ressemble à ceci :
def training_loop(
ddp_model: DDPModelWrapper,
dataloader: torch.utils.data.DataLoader,
optimizer: torch.optim.Optimizer,
loss_fn: callable,
grad_accum_steps: int,
):
for i, batch in enumerate(dataloader):
inputs, targets = batch
output = ddp_model(inputs)
loss = loss_fn(output, targets)
loss.backward()
if (i+1) % grad_accum_steps == 0:
# Must sync gradients across GPUs *BEFORE* the optimization step
ddp_model.sync_gradients()
optimizer.step()
optimizer.zero_grad()
Conseils de pro et utilisation avancée
- Utilisez la prélecture des données. Vous pouvez accélérer la formation en chargeant le prochain lot de données pendant que le courant est en cours de traitement. de PyTorch
DataLoaderfournit unprefetch_factorargument qui contrôle le nombre de lots à pré-extraire en arrière-plan. Tirer correctement parti de la prélecture avec CUDA peut être un peu délicat, nous le laisserons donc pour un prochain article. - Ne maximisez pas la mémoire GPU. Contre-intuitivement, laisser de la mémoire libre peut conduire à un débit d’entraînement plus rapide. Lorsque vous laissez au moins environ 15 % de la mémoire GPU libre, le GPU peut mieux gérer la mémoire en évitant la fragmentation.
- PyTorch DDP chevauche la communication avec le calcul. Par défaut, DDP communique les dégradés au fur et à mesure qu’ils sont calculés lors de la rétropropagation plutôt que d’attendre la fin de la passe arrière complète. Voici comment procéder :
- PyTorch organise les dégradés de modèles en compartiments de
bucket_cap_mbmégaoctets. Lors du passage en arrière, PyTorch marque les dégradés comme étant prêts à être réduits au fur et à mesure de leur calcul. Une fois que tous les dégradés d’un bucket sont prêts, DDP lance un processus asynchrone.allreducefaire la moyenne de ces gradients sur tous les rangs. Leloss.backward()l’appel revient seulement après toutallreduceles opérations sont terminées, appelez donc immédiatementopt.step()est sécuritaire. - Le
bucket_cap_mbLe paramètre crée un compromis : des valeurs plus petites déclenchent des déclenchements plus fréquents.allreduceopérations, mais chaque lancement de noyau de communication entraîne une surcharge qui peut nuire aux performances. Des valeurs plus élevées réduisent la fréquence de communication mais réduisent également le chevauchement ; à l’extrême, si les buckets sont trop grands, vous attendez la fin de la passe arrière avant de communiquer. La valeur optimale dépend de l’architecture et du matériel de votre modèle. Créez donc un profil avec différentes valeurs pour trouver ce qui fonctionne le mieux.
- PyTorch organise les dégradés de modèles en compartiments de

- Voici une implémentation PyTorch complète de DDP :
"""
Launch with:
torchrun --nproc_per_node=NUM_GPUS ddp.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 256),
)
def forward(self, x):
return self.net(x)
def train():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Create dummy dataset
x_data = torch.randn(1000, 1024)
y_data = torch.randn(1000, 256)
dataset = TensorDataset(x_data, y_data)
# DistributedSampler ensures each rank gets different data
sampler = DistributedSampler(dataset, shuffle=True)
dataloader = DataLoader(dataset, batch_size=64, sampler=sampler)
model = ToyModel().to(device)
# gradient_as_bucket_view: avoids an extra grad tensor copy per bucket.
ddp_model = DDP(
model,
device_ids=[rank],
bucket_cap_mb=25,
gradient_as_bucket_view=True,
)
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
for epoch in range(2):
sampler.set_epoch(epoch) # Ensures different shuffling each epoch
for batch_idx, (x, y) in enumerate(dataloader):
x, y = x.to(device), y.to(device)
optimizer.zero_grad()
output = ddp_model(x)
loss = loss_fn(output, y)
# Backward automatically overlaps with allreduce per bucket.
# By the time this returns, all allreduce ops are done.
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 5 == 0:
print(f"epoch {epoch} batch {batch_idx} loss={loss.item():.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
train()
- Voici une implémentation complète de PyTorch combinant DDP et GA :
"""
Launch with:
torchrun --nproc_per_node=NUM_GPUS ddp_ga.py
"""
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, TensorDataset
from torch.utils.data.distributed import DistributedSampler
from torch import optim
from contextlib import nullcontext
class ToyModel(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 1024), nn.ReLU(),
nn.Linear(1024, 256),
)
def forward(self, x):
return self.net(x)
def train():
dist.init_process_group(backend="nccl")
rank = dist.get_rank()
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Create dummy dataset
x_data = torch.randn(1000, 1024)
y_data = torch.randn(1000, 256)
dataset = TensorDataset(x_data, y_data)
# DistributedSampler ensures each rank gets different data
sampler = DistributedSampler(dataset, shuffle=True)
dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)
model = ToyModel().to(device)
ddp_model = DDP(
model,
device_ids=[rank],
bucket_cap_mb=25,
gradient_as_bucket_view=True,
)
optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
ACCUM_STEPS = 4
for epoch in range(2):
sampler.set_epoch(epoch) # Ensures different shuffling each epoch
optimizer.zero_grad()
for batch_idx, (x, y) in enumerate(dataloader):
x, y = x.to(device), y.to(device)
is_last_micro_step = (batch_idx + 1) % ACCUM_STEPS == 0
# no_sync() suppresses allreduce on accumulation steps.
# On the last microstep we exit no_sync() so DDP fires
# the allreduce overlapped with that backward pass.
ctx = ddp_model.no_sync() if not is_last_micro_step else nullcontext()
with ctx:
output = ddp_model(x)
loss = loss_fn(output, y) / ACCUM_STEPS
loss.backward()
if is_last_micro_step:
optimizer.step()
optimizer.zero_grad()
if rank == 0:
print(f"epoch {epoch} batch {batch_idx} loss={loss.item() * ACCUM_STEPS:.4f}")
dist.destroy_process_group()
if __name__ == "__main__":
train()
Conclusion
Suivez-moi sur X pour plus de contenu IA gratuit @l_cesconetto
Félicitations, vous êtes arrivé au bout ! Dans cet article, vous avez découvert :
- L’importance des lots de grande taille
- Comment fonctionne l’accumulation de gradient et ses limites
- Le workflow DDP et ses avantages
- Comment implémenter GA et DDP à partir de zéro dans PyTorch
- Comment combiner GA et DDP
Dans le prochain article, nous explorerons ZeRO (Zero Redundancy Optimizer), une technique plus avancée qui s’appuie sur DDP pour optimiser davantage l’utilisation de la mémoire VRAM.



