
Optimisation du transfert de données dans les charges de travail d’inférence AI/ML par lots
est un objectif d’optimisation du transfert de données dans les charges de travail IA/ML où nous avons démontré l’utilisation de Systèmes NVIDIA Nsight™ (nsys) dans l’étude et la résolution des goulots d’étranglement courants lors du chargement des données, c’est-à-dire les cas où le GPU reste inactif pendant qu’il attend les données d’entrée du CPU. Dans cet article, nous concentrons notre attention sur les données circulant dans la direction opposée, du périphérique GPU vers l’hôte CPU. Plus spécifiquement, nous abordons les charges de travail d’inférence AI/ML où la taille de la sortie renvoyée par le modèle est relativement élevée. Les exemples courants incluent : 1) l’exécution d’un modèle de segmentation de scène (étiquetage par pixel) sur des lots d’images haute résolution et 2) la capture d’intégrations de caractéristiques de haute dimension de séquences d’entrée à l’aide d’un modèle d’encodeur (par exemple, pour créer un base de données de vecteurs). Les deux exemples impliquent l’exécution d’un modèle sur un lot d’entrée, puis la copie du tenseur de sortie du GPU vers le CPU pour un traitement, un stockage et/ou une communication sur le réseau supplémentaires.
Les copies mémoire GPU à CPU de la sortie du modèle reçoivent généralement beaucoup moins d’attention dans les didacticiels d’optimisation que les copies CPU à GPU qui alimentent le modèle (par exemple, voir ici). Mais leur impact potentiel sur l’efficacité du modèle et les coûts d’exécution peut être tout aussi préjudiciable. De plus, même si les optimisations du chargement des données CPU vers GPU sont bien documentées et faciles à mettre en œuvre, l’optimisation de la copie des données dans la direction opposée nécessite un peu plus de travail manuel.
Dans cet article, nous appliquerons la même stratégie que celle utilisée dans notre article précédent : nous définirons un modèle de jouet et utiliserons le profileur nsys pour identifier et résoudre les goulots d’étranglement en matière de performances. Nous allons mener nos expériences sur un Amazon EC2 g6e.2xlarge exemple (avec un Nvidia L40S GPU) exécutant un AMI AWS Deep Learning (Ubuntu 24.04) avec PyTorch (2.8), profileur nsys-cli (version 2025.6.1), et le Extension des outils NVIDIA (NVTX).
Avis de non-responsabilité
Le code que nous partagerons est destiné à des fins de démonstration ; veuillez ne pas vous fier à son exactitude ou à son optimalité. Veuillez ne pas interpréter notre utilisation d’une bibliothèque, d’un outil ou d’une plateforme comme une approbation de son utilisation. L’impact des optimisations que nous aborderons peut varier considérablement en fonction des détails du modèle et de l’environnement d’exécution. Assurez-vous d’évaluer leur effet sur votre propre cas d’utilisation avant d’intégrer leur utilisation.
Un grand merci à Yitzhak Levi et Gilad Wasserman pour leurs contributions à ce post.
Un modèle de jouet PyTorch
Nous introduisons un script d’inférence par lots qui effectue une segmentation d’image sur un ensemble de données synthétiques à l’aide d’un DeepLabV3 modèle avec un ResNet-50 colonne vertébrale. Les sorties du modèle sont copiées sur la CPU pour le post-traitement et le stockage. Nous enveloppons les différentes parties de l’étape d’inférence avec un code couleur nvtx annotations :
import time, torch, nvtx
from torch.utils.data import Dataset, DataLoader
from torch.cuda import profiler
from torchvision.models.segmentation import deeplabv3_resnet50
DEVICE = "cuda"
WARMUP_STEPS = 10
PROFILE_STEPS = 3
COOLDOWN_STEPS = 1
TOTAL_STEPS = WARMUP_STEPS + PROFILE_STEPS + COOLDOWN_STEPS
BATCH_SIZE = 64
TOTAL_SAMPLES = TOTAL_STEPS * BATCH_SIZE
IMG_SIZE = 512
N_CLASSES = 21
NUM_WORKERS = 8
ASYNC_DATALOAD = True
# A synthetic Dataset with random images
class FakeDataset(Dataset):
def __len__(self):
return TOTAL_SAMPLES
def __getitem__(self, index):
img = torch.randn((3, IMG_SIZE, IMG_SIZE))
return img
# utility class for prefetching data to GPU
class DataPrefetcher:
def __init__(self, loader):
self.loader = iter(loader)
self.stream = torch.cuda.Stream()
self.next_batch = None
self.preload()
def preload(self):
try:
data = next(self.loader)
with torch.cuda.stream(self.stream):
next_data = data.to(DEVICE, non_blocking=ASYNC_DATALOAD)
self.next_batch = next_data
except:
self.next_batch = None
def __iter__(self):
return self
def __next__(self):
torch.cuda.current_stream().wait_stream(self.stream)
data = self.next_batch
self.preload()
return data
model = deeplabv3_resnet50(weights_backbone=None).to(DEVICE).eval()
data_loader = DataLoader(
FakeDataset(),
batch_size=BATCH_SIZE,
num_workers=NUM_WORKERS,
pin_memory=ASYNC_DATALOAD
)
data_iter = DataPrefetcher(data_loader)
def synchronize_all():
torch.cuda.synchronize()
def to_cpu(output):
return output.cpu()
def process_output(batch_id, logits):
# do some post processing on output
with open('/dev/null', 'wb') as f:
f.write(logits.numpy().tobytes())
with torch.inference_mode():
for i in range(TOTAL_STEPS):
if i == WARMUP_STEPS:
synchronize_all()
start_time = time.perf_counter()
profiler.start()
elif i == WARMUP_STEPS + PROFILE_STEPS:
synchronize_all()
profiler.stop()
end_time = time.perf_counter()
with nvtx.annotate(f"Batch {i}", color="blue"):
with nvtx.annotate("get batch", color="red"):
batch = next(data_iter)
with nvtx.annotate("compute", color="green"):
output = model(batch)
with nvtx.annotate("copy to CPU", color="yellow"):
output_cpu = to_cpu(output['out'])
with nvtx.annotate("process output", color="cyan"):
process_output(i, output_cpu)
total_time = end_time - start_time
throughput = PROFILE_STEPS / total_time
print(f"Throughput: {throughput:.2f} steps/sec")
Notez l’inclusion de toutes les optimisations de chargement de données CPU vers GPU évoquées dans notre article précédent.
Nous exécutons la commande suivante pour capturer une trace de profil nsys :
nsys profile \
--capture-range=cudaProfilerApi \
--trace=cuda,nvtx,osrt \
--output=baseline \
python batch_infer.py
Il en résulte un baseline.nsys-rep fichier de trace que nous copions sur notre machine de développement pour analyse.
Pour mesurer le débit d’inférence, nous augmentons le nombre d’étapes à 100. Le débit moyen de notre expérience de base est de 0,45 pas par seconde. Dans les sections suivantes, nous utiliserons les traces du profil nsys pour améliorer progressivement ce résultat.
Analyse des performances de référence
L’image ci-dessous montre la trace du profil nsys de notre expérience de base :

Dans la section GPU, nous voyons le modèle récurrent suivant :
- Un bloc de calcul du noyau (en bleu clair) qui s’exécute pendant environ 520 millisecondes.
- Un petit bloc de copie de mémoire hôte-périphérique (en vert) qui s’exécute en parallèle au calcul du noyau. Cette simultanéité a été obtenue grâce aux optimisations évoquées dans notre article précédent.
- Un bloc de copie de mémoire de périphérique à hôte (en rouge) qui s’exécute pendant environ 750 millisecondes.
- Une longue période (~ 940 millisecondes) de temps d’inactivité du GPU (espace blanc) entre deux étapes.
En regardant la barre NVTX de la section CPU, nous pouvons voir que les espaces s’alignent parfaitement avec le bloc « sortie du processus » (en cyan). Dans notre implémentation initiale, l’exécution du modèle et la fonction de stockage de sortie s’exécutent dans le même processus unique et de manière séquentielle. Cela entraîne un temps d’inactivité important sur le GPU, car le CPU attend le retour de la fonction de stockage avant d’alimenter le GPU avec le lot suivant.
Optimisation 1 : traitement des sorties multi-travailleurs
La première étape que nous prenons consiste à exécuter la fonction de stockage de sortie dans des processus de travail parallèles. Nous avons pris une mesure similaire dans notre article précédent lorsque nous avons déplacé la séquence de préparation des lots d’entrée vers des travailleurs dédiés. Cependant, alors que là-bas nous avons pu automatiser chargement de données multi-processus en réglant simplement le num_workers argument du Chargeur de données classe à une valeur non nulle, l’application du traitement de sortie multi-travailleurs nécessite une implémentation manuelle. Nous choisissons ici une solution simple à des fins démonstratives. Cela doit être personnalisé en fonction de vos besoins et de vos préférences de conception.
Multitraitement PyTorch
Nous mettons en œuvre une stratégie producteur-consommateur à l’aide du package multitraitement intégré de PyTorch, torche.multitraitement. Nous définissons une file d’attente pour stocker les lots de sortie et plusieurs travailleurs consommateurs qui traitent les lots dans la file d’attente. Nous modifions notre boucle d’inférence pour mettre les tampons de sortie dans la file d’attente de sortie. Nous mettons également à jour le synchroniser_all() utilitaire pour vider la file d’attente et ajouter une séquence de nettoyage à la fin du script.
Le bloc de code suivant contient notre implémentation initiale. Comme nous le verrons dans les sections suivantes, cela nécessitera quelques réglages afin d’atteindre des performances maximales.
import torch.multiprocessing as mp
POSTPROC_WORKERS = 8 # tune for optimal throughput
output_queue = mp.JoinableQueue(maxsize=POSTPROC_WORKERS)
def output_worker(in_q):
while True:
item = in_q.get()
if item is None: break # signal to shut down
batch_id, batch_preds = item
process_output(batch_id, batch_preds)
in_q.task_done()
processes = []
for _ in range(POSTPROC_WORKERS):
p = mp.Process(target=output_worker, args=(output_queue,))
p.start()
processes.append(p)
def synchronize_all():
torch.cuda.synchronize()
output_queue.join() # drain queue
with torch.inference_mode():
for i in range(TOTAL_STEPS):
if i == WARMUP_STEPS:
synchronize_all()
start_time = time.perf_counter()
profiler.start()
elif i == WARMUP_STEPS + PROFILE_STEPS:
synchronize_all()
profiler.stop()
end_time = time.perf_counter()
with nvtx.annotate(f"Batch {i}", color="blue"):
with nvtx.annotate("get batch", color="red"):
batch = next(data_iter)
with nvtx.annotate("compute", color="green"):
output = model(batch)
with nvtx.annotate("copy to CPU", color="yellow"):
output_cpu = to_cpu(output['out'])
with nvtx.annotate("queue output", color="cyan"):
output_queue.put((i, output_cpu))
total_time = end_time - start_time
throughput = PROFILE_STEPS / total_time
print(f"Throughput: {throughput:.2f} steps/sec")
# cleanup
for _ in range(POSTPROC_WORKERS):
output_queue.put(None)
L’optimisation du traitement de sortie multi-travailleurs aboutit à un débit de 0,71 étapes par seconde, soit une augmentation de 58 % par rapport à nos résultats de référence.
La réexécution de la commande nsys génère la trace de profil suivante :

Nous pouvons voir que la taille du bloc d’espaces a considérablement diminué (de ~940 millisecondes à ~50). Si nous devions zoomer sur l’espace restant, nous le trouverions aligné sur une opération « munmap ». Dans notre article précédent, la même constatation a éclairé notre optimisation de la copie de données asynchrone. Mais cette fois, nous franchissons une étape intermédiaire d’optimisation de la mémoire sous la forme d’un pool de tampons pré-alloués.
Optimisation 2 : pré-allocation du pool de tampons
Afin de réduire la surcharge liée à l’allocation et à la gestion d’un nouveau tenseur CPU à chaque itération, nous initialisons un pool de tenseurs pré-alloués en mémoire partagée et définissons une deuxième file d’attente pour gérer leur utilisation.
Notre code mis à jour apparaît ci-dessous :
shape = (BATCH_SIZE, N_CLASSES, IMG_SIZE, IMG_SIZE)
buffer_pool = [torch.empty(shape).share_memory_()
for _ in range(POSTPROC_WORKERS)]
buf_queue = mp.Queue()
for i in range(POSTPROC_WORKERS):
buf_queue.put(i)
def output_worker(buffer_pool, in_q, buf_q):
while True:
item = in_q.get()
if item is None: break # signal to shut down
batch_id, buf_id = item
process_output(batch_id, buffer_pool[buf_id])
buf_q.put(buf_id)
in_q.task_done()
processes = []
for _ in range(POSTPROC_WORKERS):
p = mp.Process(target=output_worker,
args=(buffer_pool,output_queue,buf_queue))
p.start()
processes.append(p)
def to_cpu(output):
buf_id = buf_queue.get()
output_cpu = buffer_pool[buf_id]
output_cpu.copy_(output)
return output_cpu, buf_id
with torch.inference_mode():
for i in range(TOTAL_STEPS):
if i == WARMUP_STEPS:
synchronize_all()
start_time = time.perf_counter()
profiler.start()
elif i == WARMUP_STEPS + PROFILE_STEPS:
synchronize_all()
profiler.stop()
end_time = time.perf_counter()
with nvtx.annotate(f"Batch {i}", color="blue"):
with nvtx.annotate("get batch", color="red"):
batch = next(data_iter)
with nvtx.annotate("compute", color="green"):
output = model(batch)
with nvtx.annotate("copy to CPU", color="yellow"):
output_cpu, buf_id = to_cpu(output['out'])
with nvtx.annotate("queue output", color="cyan"):
output_queue.put((i, buf_id))
Suite à ces changements, le débit d’inférence passe à 1,51, soit plus de 2X accélération par rapport à notre résultat précédent.
La nouvelle trace de profil apparaît ci-dessous :

Non seulement les espaces ont pratiquement disparu, mais le fonctionnement de la mémoire CUDA DtoH (en rouge) est passé d’environ 750 millisecondes à environ 110. Vraisemblablement, la grande copie de données GPU vers CPU impliquait une certaine surcharge de gestion de la mémoire que nous avons supprimée en implémentant un pool de tampons dédié.
Malgré l’amélioration considérable, si nous zoomons, nous constaterons qu’il reste environ 0,5 milliseconde d’espace blanc causé par la synchronicité de la commande de copie GPU vers CPU – tant que la copie n’est pas terminée, le CPU ne déclenche pas le calcul du noyau du lot suivant.
Optimisation 3 : copie de données asynchrone
Notre troisième optimisation consiste à modifier la copie appareil-hôte pour qu’elle soit asynchrone. Comme précédemment, nous constaterons que la mise en œuvre de ce changement est plus difficile que dans le sens CPU vers GPU.
La première étape est de réussir non_blocking=True à la commande de copie GPU vers CPU.
def to_cpu(output):
buf_id = buf_queue.get()
output_cpu = buffer_pool[buf_id]
output_cpu.copy_(output, non_blocking=True)
return output_cpu, buf_id
Cependant, comme nous l’avons vu dans notre article précédent, ce changement n’aura pas d’impact significatif à moins que nous modifiions nos tenseurs pour utiliser la mémoire épinglée :
shape = (BATCH_SIZE, N_CLASSES, IMG_SIZE, IMG_SIZE)
buffer_pool = [torch.empty(shape, pin_memory=True).share_memory_()
for _ in range(POSTPROC_WORKERS)]
Surtout, si nous appliquons uniquement ces deux modifications à notre script, le débit augmenterait mais la sortie pourrait être corrompue (par exemple, voir ici). Nous avons besoin d’un mécanisme basé sur des événements pour identifier chaque fois qu’une copie GPU à CPU est terminée afin que nous puissions procéder au traitement des données de sortie. (Notez que cela n’était pas nécessaire lors de la copie asynchrone du CPU vers le GPU. Étant donné qu’un seul flux GPU traite les commandes de manière séquentielle, le calcul du noyau ne démarre que lorsque la copie est terminée. La synchronisation n’était requise que lors de l’introduction d’un deuxième flux.)
Pour implémenter le mécanisme de notification, nous définissons un pool d’événements CUDA et une file d’attente supplémentaire pour gérer leur utilisation. Nous définissons en outre un thread d’écoute pour surveiller l’état des événements dans la file d’attente et remplir la file d’attente de sortie une fois les copies terminées.
import threading, queue
event_pool = [torch.cuda.Event() for _ in range(POSTPROC_WORKERS)]
event_queue = queue.Queue()
def event_monitor(event_pool, event_queue, output_queue):
while True:
item = event_queue.get()
if item is None: break
batch_id, buf_idx = item
event_pool[buf_idx].synchronize()
output_queue.put((batch_id, buf_idx))
event_queue.task_done()
monitor = threading.Thread(target=event_monitor,
args=(event_pool, event_queue, output_queue))
monitor.start()
La séquence d’inférence mise à jour comprend les étapes suivantes :
- Obtenez un lot d’entrée qui a été préextrait sur le GPU.
- Exécutez le modèle sur le lot d’entrée pour obtenir un tenseur de sortie sur le GPU.
- Demandez un tampon CPU vacant à la file d’attente des tampons et utilisez-le pour déclencher une copie de données asynchrone. Configurez un événement à déclencher lorsque la copie est terminée et transférez l’événement vers la file d’attente des événements.
- Le thread de surveillance attend que l’événement se déclenche, puis pousse le tenseur de sortie vers la file d’attente de sortie pour traitement.
- Un thread de travail extrait le tenseur de sortie de la file d’attente et l’enregistre sur le disque. Il libère ensuite le tampon dans la file d’attente des tampons.
Le code mis à jour apparaît ci-dessous.
def synchronize_all():
torch.cuda.synchronize()
event_queue.join()
output_queue.join()
with torch.inference_mode():
for i in range(TOTAL_STEPS):
if i == WARMUP_STEPS:
synchronize_all()
start_time = time.perf_counter()
profiler.start()
elif i == WARMUP_STEPS + PROFILE_STEPS:
synchronize_all()
profiler.stop()
end_time = time.perf_counter()
with nvtx.annotate(f"Batch {i}", color="blue"):
with nvtx.annotate("get batch", color="red"):
batch = next(data_iter)
with nvtx.annotate("compute", color="green"):
output = model(batch)
with nvtx.annotate("copy to CPU", color="yellow"):
output_cpu, buf_id = to_cpu(output['out'])
with nvtx.annotate("queue CUDA event", color="cyan"):
event_pool[buf_id].record()
event_queue.put((i, buf_id))
total_time = end_time - start_time
throughput = PROFILE_STEPS / total_time
print(f"Throughput: {throughput:.2f} steps/sec")
# cleanup
event_queue.put(None)
for _ in range(POSTPROC_WORKERS):
output_queue.put(None)
Le débit résultant est de 1,55 pas par seconde.
La nouvelle trace de profil apparaît ci-dessous :

Dans la ligne NVTX de la section CPU, nous pouvons voir toutes les opérations de la boucle d’inférence regroupées sur le côté gauche, ce qui implique qu’elles ont toutes été exécutées immédiatement et de manière asynchrone. Nous voyons également les appels de synchronisation d’événements (en vert clair) s’exécutant sur le thread du moniteur dédié. Dans la section GPU, nous voyons que le calcul du noyau commence immédiatement après la fin de la copie périphérique vers hôte.
Notre optimisation finale se concentrera sur l’amélioration de la parallélisation du noyau et des opérations mémoire sur le GPU.
Optimisation 4 : pipelining à l’aide de flux CUDA
Comme dans notre article précédent, nous souhaitons profiter des moteurs indépendants de copie mémoire (le DMA) et de calcul noyau (les SM). Pour ce faire, nous attribuons la copie mémoire à un flux CUDA dédié :
egress_stream = torch.cuda.Stream()
with torch.inference_mode():
for i in range(TOTAL_STEPS):
if i == WARMUP_STEPS:
synchronize_all()
start_time = time.perf_counter()
profiler.start()
elif i == WARMUP_STEPS + PROFILE_STEPS:
synchronize_all()
profiler.stop()
end_time = time.perf_counter()
with nvtx.annotate(f"Batch {i}", color="blue"):
with nvtx.annotate("get batch", color="red"):
batch = next(data_iter)
with nvtx.annotate("compute", color="green"):
output = model(batch)
# on separate stream
with torch.cuda.stream(egress_stream):
# wait for default stream to complete compute
egress_stream.wait_stream(torch.cuda.default_stream())
with nvtx.annotate("copy to CPU", color="yellow"):
output_cpu, buf_id = to_cpu(output['out'])
with nvtx.annotate("queue CUDA event", color="cyan"):
event_pool[buf_id].record(egress_stream)
event_queue.put((i, buf_id))
Cela se traduit par un débit de 1,85 pas par seconde, soit une amélioration supplémentaire de 19,3 % par rapport à notre expérience précédente.
La trace finale du profil apparaît ci-dessous :

Dans la section GPU, nous voyons un bloc continu de calcul du noyau (en bleu clair) avec à la fois l’hôte vers le périphérique (en vert clair) et le périphérique vers l’hôte (en violet) fonctionnant en parallèle. Notre boucle d’inférence est désormais liée au calcul, ce qui implique que nous avons épuisé toutes les opportunités pratiques d’optimisation du transfert de données.
Résultats
Nous résumons nos résultats dans le tableau suivant :

Grâce à l’utilisation du profileur nsys, nous avons pu augmenter l’efficacité de plus de 4X. Naturellement, l’impact des optimisations dont nous avons discuté variera en fonction des détails du modèle et de l’environnement d’exécution.
Résumé
Ceci conclut la deuxième partie de notre série d’articles sur le thème de l’optimisation du transfert de données dans les charges de travail IA/ML. La première partie s’est concentrée sur les copies d’hôte à périphérique et la deuxième partie sur les copies de périphérique à hôte. Lorsqu’il est mis en œuvre de manière naïve, le transfert de données dans les deux sens peut entraîner d’importants goulots d’étranglement en termes de performances, entraînant une pénurie de GPU et une augmentation des coûts d’exécution. À l’aide du profileur Nsight Systems, nous avons démontré comment identifier et résoudre ces goulots d’étranglement et augmenter l’efficacité de l’exécution.
Même si l’optimisation dans les deux sens impliquait des étapes similaires, les détails de mise en œuvre étaient très différents. Bien que l’optimisation du transfert de données CPU vers GPU soit bien prise en charge par les API de chargement de données de PyTorch et nécessite des modifications relativement mineures dans la boucle d’exécution, l’optimisation de la direction GPU vers CPU nécessite un peu plus d’ingénierie logicielle. Il est important de noter que les solutions que nous proposons dans cet article ont été choisies à des fins démonstratives. Votre propre solution peut différer considérablement en fonction des besoins de votre projet et de vos préférences de conception.
Après avoir couvert les copies de données CPU vers GPU et GPU vers CPU, nous tournons notre attention vers les transactions GPU vers GPU : restez à l’écoute pour un prochain article sur le thème de l’optimisation du transfert de données entre GPU dans les charges de travail de formation distribuées.



