
IA dans plusieurs GPU : opérations point à point et collectives
fait partie d’une série sur l’IA distribuée sur plusieurs GPU :
- Partie 1 : Comprendre le paradigme de l’hôte et du périphérique
- Partie 2 : Opérations point à point et collectives (cet article)
- Partie 3 : Comment les GPU communiquent (à venir)
- Partie 4 : Accumulation de gradient et parallélisme des données distribuées (DDP) (à venir)
- Partie 5 : ZÉRO (à venir)
- Partie 6 : Parallélisme tensoriel (à venir)
Introduction
Dans l’article précédent, nous avons établi le paradigme hôte-périphérique et introduit le concept de classement pour les charges de travail multi-GPU. Nous allons maintenant explorer les modèles de communication spécifiques fournis par PyTorch. torch.distributed module pour coordonner le travail et échanger des données entre ces rangs. Ces opérations, connues sous le nom collectifssont les éléments constitutifs des charges de travail distribuées.
Bien que PyTorch expose ces opérations, il appelle finalement un framework backend qui implémente réellement la communication. Pour les GPU NVIDIA, c’est NCCL (NVIDIA Collective Communications Library), tandis que pour AMD c’est RCCL (ROCm Communication Collectives Library).
NCCL implémente des primitives de communication multi-GPU et multi-nœuds optimisées pour les GPU et les réseaux NVIDIA. Il détecte automatiquement la topologie actuelle (canaux de communication comme PCIe, NVLink, InfiniBand) et sélectionne la plus efficace.
Avis de non-responsabilité 1 : les GPU NVIDIA étant les plus courants, nous nous concentrerons sur les
NCCLbackend pour ce post.
Avertissement 2 : par souci de concision, le code présenté ci-dessous ne fournit que les principaux arguments de chaque méthode au lieu de tous les arguments disponibles.
Avertissement 3 : par souci de simplicité, nous ne montrons pas la désallocation de mémoire des tenseurs, mais des opérations comme
scatterne libérera pas automatiquement la mémoire du rang source (si vous ne comprenez pas ce que je veux dire, ce n’est pas grave, cela deviendra clair très bientôt).
Communication : blocage ou non blocage
Pour fonctionner ensemble, les GPU doivent échanger des données. Le CPU initie la communication en mettant les noyaux NCCL en file d’attente dans les flux CUDA (si vous ne savez pas ce que sont les flux CUDA, consultez le premier article de blog de cette série), mais le transfert de données réel s’effectue directement entre les GPU via l’interconnexion, contournant entièrement la mémoire principale du CPU. Idéalement, les GPU sont connectés avec une interconnexion haut débit comme NVLink ou InfiniBand (ces interconnexions sont couvertes dans le troisième article de cette série).
Cette communication peut être synchrone (bloquante) ou asynchrone (non bloquante), que nous explorons ci-dessous.
Communication synchrone (bloquante)
- Comportement: Lorsque vous appelez une méthode de communication synchrone, le processus hôte s’arrête et attend jusqu’à ce que le noyau NCCL soit mis en file d’attente avec succès sur le flux CUDA actif actuel. Une fois mise en file d’attente, la fonction revient. C’est généralement simple et fiable. Notez que l’hôte n’attend pas la fin du transfert, mais simplement que l’opération soit mise en file d’attente. Par contre, ça bloque ce flux spécifique du passage à l’opération suivante jusqu’à ce que le noyau NCCL soit exécuté complètement.
Communication asynchrone (non bloquante)
- Comportement: Lorsque vous appelez une méthode de communication asynchrone, l’appel renvoie immédiatementet l’opération de mise en file d’attente se produit en arrière-plan. Il n’est pas mis en file d’attente dans le flux actif actuel, mais plutôt dans un flux NCCL interne dédié par appareil. Cela permet à votre processeur de continuer avec d’autres tâches, une technique connue sous le nom de chevauchement du calcul avec la communication. L’API asynchrone est plus complexe car elle peut conduire à un comportement indéfini si vous n’utilisez pas correctement
.wait()(expliqué ci-dessous) et modifier les données pendant leur transfert. Cependant, sa maîtrise est essentielle pour obtenir des performances maximales dans les formations distribuées à grande échelle.
Point à point (un à un)
Ces opérations ne sont pas considérées collectifsmais ce sont des primitives de communication fondamentales. Ils facilitent le transfert direct de données entre deux rangs spécifiques et sont fondamentaux pour les tâches dans lesquelles un GPU doit envoyer des informations spécifiques à un autre.
- Synchrone (blocage): Le processus hôte attend que l’opération soit mise en file d’attente dans le flux CUDA avant de continuer. Le noyau est mis en file d’attente dans le flux actif actuel.
torch.distributed.send(tensor, dst): Envoie un tenseur à un rang de destination spécifié.torch.distributed.recv(tensor, src): Reçoit un tenseur d’un rang source. Le tenseur de réception doit être pré-attribué avec la forme correcte etdtype.
- Asynchrone (non bloquant): Le processus hôte lance l’opération de mise en file d’attente et poursuit immédiatement d’autres tâches. Le noyau est mis en file d’attente dans un flux NCCL interne dédié par périphérique, ce qui permet de chevaucher la communication avec le calcul. Ces opérations renvoient un
request(techniquement unWorkobjet) qui peut être utilisé pour suivre l’état de la mise en file d’attente.request = torch.distributed.isend(tensor, dst): lance une opération d’envoi asynchrone.request = torch.distributed.irecv(tensor, src): lance une opération de réception asynchrone.request.wait(): Bloque l’hôte uniquement jusqu’à ce que l’opération soit réussie mis en file d’attente sur le flux CUDA. Cependant, cela empêche le flux CUDA actuellement actif d’exécuter les noyaux ultérieurs jusqu’à ce que cette opération asynchrone spécifique soit terminée.request.wait(timeout): Si vous fournissez un argument de délai d’attente, le comportement de l’hôte change. Ce sera bloquer le processeur thread jusqu’à ce que le travail NCCL soit terminé ou expire (déclenchant une exception). Dans des cas normaux, les utilisateurs n’ont pas besoin de définir le délai d’attente.request.is_completed(): RetoursTruesi l’opération a été réussie mis en file d’attente sur un flux CUDA. Il peut être utilisé pour des sondages. Cela ne garantit pas que les données réelles ont été transférées.
Lorsque PyTorch lance un noyau NCCL, il insère automatiquement un dépendance (c’est-à-dire force une synchronisation) entre votre flux actif actuel et le flux NCCL. Cela signifie que le flux NCCL ne démarrera pas tant que tous les travaux précédemment mis en file d’attente sur le flux actif ne seront pas terminés, ce qui garantit que le tenseur envoyé contient déjà les valeurs finales.
De même, appeler req.wait() insère une dépendance dans l’autre sens. Tout travail que vous mettez en file d’attente sur le flux actuel après req.wait() ne s’exécutera pas tant que le L’opération NCCL est terminéevous pouvez donc utiliser en toute sécurité les tenseurs reçus.
Principaux « pièges » dans NCCL
Alors que send et recv sont étiquetés « synchrones », leur comportement dans NCCL peut prêter à confusion. Un appel synchrone sur un tenseur CUDA bloque le thread CPU hôte uniquement jusqu’à ce que le noyau de transfert de données soit mis en file d’attente dans le flux, pas avant la fin du transfert de données. Le processeur est alors libre de mettre en file d’attente d’autres tâches.
Il y a une exception : le tout premier appeler à torch.distributed.recv() dans un processus est vraiment bloquant et attend la fin du transfert, probablement en raison d’un NCCL interne procédures d’échauffement. Les appels suivants ne seront bloqués que jusqu’à ce que l’opération soit mise en file d’attente.
Prenons cet exemple où rank 1 se bloque car le CPU tente d’accéder à un tenseur que le GPU n’a pas encore reçu :
rank = torch.distributed.get_rank()
if rank == 0:
t = torch.tensor([1,2,3], dtype=torch.float32, device=device)
# torch.distributed.send(t, dst=1) # No send operation is performed
else: # rank == 1 (assuming only 2 ranks)
t = torch.empty(3, dtype=torch.float32, device=device)
torch.distributed.recv(t, src=0) # Blocks only until enqueued (after first run)
print("This WILL print if NCCL is warmed-up")
print
print("This will NOT print")
Le processus CPU à rank 1 reste coincé print
Si vous exécutez ce code plusieurs fois, notez que
This WILL print if NCCL is warmed-upne sera pas imprimé lors des exécutions ultérieures, car le processeur est toujours bloqué à
Collectifs
Chaque fonction d'opération collective prend en charge les opérations synchronisées et asynchrones via le async_op argument. La valeur par défaut est False, ce qui signifie des opérations synchrones.
Collectifs individuels
Ces opérations impliquent qu'un rang envoie des données à tous les autres rangs du groupe.
Diffuser
torch.distributed.broadcast(tensor, src): Copie un tenseur à partir d'un seul rang source (src) à tous les autres rangs. Chaque processus se termine par une copie identique du tenseur. LetensorLe paramètre sert à deux fins : (1) lorsque le rang du processus correspond ausrcletensorles données sont-elles envoyées ? (2) sinon,tensorest utilisé pour sauvegarder les données reçues.
rank = torch.distributed.get_rank()
if rank == 0: # source rank
tensor = torch.tensor([1,2,3], dtype=torch.int64, device=device)
else: # destination ranks
tensor = torch.empty(3, dtype=torch.int64, device=device)
torch.distributed.broadcast(tensor, src=0)

Dispersion
torch.distributed.scatter(tensor, scatter_list, src): distribue des morceaux de données d'un rang source sur tous les rangs. Lescatter_listsur le rang source contient plusieurs tenseurs, et chaque rang (y compris la source) reçoit un tenseur de cette liste dans sontensorvariable. Les classements de destination passent justeNonepour lescatter_list.
# The scatter_list must be None for all non-source ranks.
scatter_list = None if rank != 0 else [torch.tensor([i, i+1]).to(device) for i in range(0,4,2)]
tensor = torch.empty(2, dtype=torch.int64).to(device)
torch.distributed.scatter(tensor, scatter_list, src=0)
print(f'Rank {rank} received: {tensor}')

Collectifs tout-à-un
Ces opérations rassemblent des données de tous les rangs et les consolident sur un seul rang de destination.
Réduire
torch.distributed.reduce(tensor, dst, op): Prend un tenseur de chaque rang, applique une opération de réduction (commeSUM,MAX,MIN), et stocke le résultat final sur le rang de destination (dst) seulement.
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], device=device)
torch.distributed.reduce(tensor, dst=0, op=torch.distributed.ReduceOp.SUM)
print(tensor)

Rassembler
torch.distributed.gather(tensor, gather_list, dst): Rassemble un tenseur de chaque rang dans une liste de tenseurs sur le rang de destination. Legather_listdoit être une liste de tenseurs (correctement dimensionnés et typés) sur la destination etNonepartout ailleurs.
# The gather_list must be None for all non-destination ranks.
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
gather_list = None if rank != 0 else [torch.zeros(3, dtype=torch.int64).to(device) for _ in range(world_size)]
t = torch.tensor([0+rank, 1+rank, 2+rank], dtype=torch.int64).to(device)
torch.distributed.gather(t, gather_list, dst=0)
print(f'After op, Rank {rank} has: {gather_list}')
La variable world_size est le nombre total de rangs. On peut l'obtenir avec torch.distributed.get_world_size(). Mais ne vous inquiétez pas des détails d’implémentation pour l’instant, le plus important est de comprendre les concepts.

Collectifs tout-à-tous
Dans ces opérations, chaque rang envoie et reçoit des données de tous les autres rangs.
Tout réduire
torch.distributed.all_reduce(tensor, op): Identique àreducemais le résultat est stocké sur chaqueclassement au lieu d’une seule destination.
# Example for torch.distributed.all_reduce
rank = torch.distributed.get_rank()
tensor = torch.tensor([rank+1, rank+2, rank+3], dtype=torch.float32, device=device)
torch.distributed.all_reduce(tensor, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} after all_reduce: {tensor}")

Tous se rassemblent
torch.distributed.all_gather(tensor_list, tensor): Identique àgathermais la liste rassemblée des tenseurs est disponible sur chaque rang.
# Example for torch.distributed.all_gather
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_tensor = torch.tensor([rank], dtype=torch.float32, device=device)
tensor_list = [torch.empty(1, dtype=torch.float32, device=device) for _ in range(world_size)]
torch.distributed.all_gather(tensor_list, input_tensor)
print(f"Rank {rank} gathered: {[t.item() for t in tensor_list]}")

Réduire la dispersion
torch.distributed.reduce_scatter(output, input_list): Équivalent à effectuer une opération de réduction sur une liste de tenseurs puis à disperser les résultats. Chaque rang reçoit une partie différente de la production réduite.
# Example for torch.distributed.reduce_scatter
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
input_list = [torch.tensor([rank + i], dtype=torch.float32, device=device) for i in range(world_size)]
output = torch.empty(1, dtype=torch.float32, device=device)
torch.distributed.reduce_scatter(output, input_list, op=torch.distributed.ReduceOp.SUM)
print(f"Rank {rank} received reduced value: {output.item()}")

Synchronisation
Les deux opérations les plus fréquemment utilisées sont request.wait() et torch.cuda.synchronize(). Il est crucial de comprendre la différence entre ces deux :
request.wait(): Ceci est utilisé pour les opérations asynchrones. Il synchronise le flux CUDA actuellement actif pour cette opération, garantissant que le flux attend la fin de la communication avant de continuer. En d'autres termes, il bloque le flux CUDA actuellement actif jusqu'à la fin du transfert de données. Du côté de l'hôte, cela oblige uniquement l'hôte à attendre que le noyau soit mis en file d'attente ; l'hôte fait pas attendez la fin du transfert de données.torch.cuda.synchronize(): Il s'agit d'une commande plus puissante qui met en pause le thread du processeur hôte jusqu'à ce que tous les tâches précédemment mises en file d'attente sur le GPU sont terminées. Il garantit que le GPU est complètement inactif avant que le CPU ne passe à autre chose, mais il peut créer des goulots d'étranglement en termes de performances s'il est mal utilisé. Chaque fois que vous avez besoin d'effectuer des mesures de référence, vous devez l'utiliser pour vous assurer de capturer le moment exact où les GPU sont terminés.
Conclusion
Félicitations, vous êtes arrivé au bout ! Dans cet article, vous avez découvert :
- Opérations point à point
- Synchronisation et asynchrone dans NCCL
- Opérations collectives
- Méthodes de synchronisation
Dans le prochain article de blog, nous aborderons PCIe, NVLink et d'autres mécanismes qui permettent la communication dans un environnement distribué !



