
Ray : L’informatique distribuée pour tous, partie 1
Il s’agit du premier d’une série en deux parties sur l’informatique distribuée utilisant Ray. Cette partie montre comment utiliser Ray sur votre PC local, et la partie 2 montre comment faire évoluer Ray vers des clusters multi-serveurs dans le cloud.
Vous avez acheté un nouvel ordinateur portable ou de bureau à 16 cœurs et vous avez hâte de tester sa puissance avec des calculs lourds.
Vous êtes un programmeur Python, mais pas encore un expert, vous ouvrez donc votre LLM préféré et lui demandez quelque chose comme ceci.
« Je voudrais compter le nombre de nombres premiers dans une plage d’entrée donnée. Veuillez me donner du code Python pour cela. »
Après quelques secondes, le LLM vous donne du code. Vous pourriez le modifier un peu grâce à un court va-et-vient, et finalement vous vous retrouverez avec quelque chose comme ceci :
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
Vous exécutez le programme et cela fonctionne parfaitement. Le seul problème est que l’exécution prend un peu de temps, peut-être trente à soixante secondes, selon la taille de votre plage de saisie. C’est probablement inacceptable.
Que faites-vous maintenant? Vous avez plusieurs options, les trois plus courantes étant probablement :
– Paralléliser le code à l’aide de threads ou de multi-traitements
– Réécrire le code dans un langage « rapide » comme C ou Rust
– Essayez une bibliothèque comme Cython, Numba ou NumPy
Ce sont toutes des options viables, mais chacune présente des inconvénients. Les options 1 et 3 augmentent considérablement la complexité de votre code, et l’option du milieu peut vous obliger à apprendre un nouveau langage de programmation.
Et si je vous disais qu’il existe un autre moyen ? Celui où les modifications requises à votre code existant seraient réduites au minimum absolu. Celui où votre runtime est automatiquement réparti sur tous vos cœurs disponibles.
C’est précisément ce que dit le tiers Rayon la bibliothèque promet de le faire.
Qu’est-ce que Ray ?
La bibliothèque Ray Python est un source ouverte cadre informatique distribué conçu pour le faire facile à mettre à l’échelle Programmes Python depuis un ordinateur portable vers un cluster avec un minimum de modifications de code.
Ray simplifie la mise à l’échelle et la distribution des charges de travail d’applications gourmandes en calcul (de l’apprentissage profond au traitement des données) sur des clusters d’ordinateurs distants, tout en offrant également des améliorations pratiques de l’exécution des applications sur votre ordinateur portable, votre ordinateur de bureau ou même un cluster de calcul distant basé sur le cloud.
Ray fournit un riche ensemble de bibliothèques et d’intégrations construites sur un cadre d’exécution distribué flexible, rendant l’informatique distribuée facile et accessible à tous.
En bref, Ray vous permet de paralléliser et de distribuer votre code Python avec un minimum d’effort, qu’il s’exécute localement sur un ordinateur portable ou sur un cluster géant basé sur le cloud.
Utiliser Ray
Dans le reste de cet article, je vais vous expliquer les bases de l’utilisation de Ray pour accélérer le code Python gourmand en CPU, et nous définirons quelques exemples d’extraits de code pour vous montrer à quel point il est facile d’incorporer la puissance de Ray dans vos propres charges de travail.
Pour tirer le meilleur parti de Ray, si vous êtes un data scientist ou un ingénieur en apprentissage automatique, vous devez d’abord comprendre quelques concepts clés. Ray est composé de plusieurs composants.
Données de rayon est une bibliothèque évolutive conçue pour le traitement des données dans les tâches de ML et d’IA. Il propose des API flexibles et performantes pour les tâches d’IA, notamment l’inférence par lots, le prétraitement des données et l’ingestion de données pour la formation ML.
Ray Train est une bibliothèque flexible et évolutive conçue pour la formation et le réglage précis de l’apprentissage automatique distribué.
Rayonnage est utilisé pour le réglage des hyperparamètres.
Ray Servir est une bibliothèque évolutive permettant de déployer des modèles afin de faciliter les API d’inférence en ligne.
Ray RLlib est utilisé pour l’apprentissage par renforcement évolutif
Comme vous pouvez le voir, Ray est très concentré sur les grands modèles de langage et les applications d’IA, mais il y a un dernier composant important que je n’ai pas encore mentionné, et c’est celui que j’utiliserai dans cet article.
Noyau de rayon est conçu pour faire évoluer les applications Python à usage général et gourmandes en CPU. Il est conçu pour répartir votre charge de travail Python sur tous les cœurs disponibles, quel que soit le système sur lequel vous l’exécutez.
Cet article parlera exclusivement de Ray Core.
Deux concepts essentiels à comprendre dans Ray Core sont tâches et acteurs.
Les tâches sont apatride des travailleurs ou des services implémentés à l’aide de Ray en décorant des fonctions Python régulières.
Acteurs (ou avec état Workers) sont utilisés, par exemple, lorsque vous devez suivre et maintenir l’état des variables dépendantes dans votre cluster distribué. Les acteurs sont implémentés en décorant du Python standard cours.
Les acteurs et les tâches sont définis en utilisant le même @rayon.télécommande décorateur. Une fois définies, ces tâches sont exécutées avec le spécial .télécommande() méthode fournie par Ray. Nous en verrons un exemple ensuite.
Mise en place d’un environnement de développement
Avant de commencer à coder, nous devons mettre en place un environnement de développement pour garder nos projets cloisonnés afin qu’ils n’interfèrent pas les uns avec les autres. J’utiliserai conda pour cela, mais n’hésitez pas à utiliser l’outil que vous préférez. J’exécuterai mon code à l’aide d’un notebook Jupyter dans un shell Ubuntu WSL2 sous Windows.
$ conda create -n ray-test python=3.13 -y
$ conda activate ray-test
(ray-test) $ conda install ray[default]
Exemple de code – compter les nombres premiers
Reprenons l’exemple que j’ai donné au début : compter le nombre de nombres premiers dans l’intervalle de 10 000 000 à 20 000 000.
Nous exécuterons notre code Python original et chronométrons le temps que cela prend.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
Et le rendu ?
CPUs~32, chunks=64
total=606028, time=31.17s
Maintenant, pouvons-nous améliorer cela en utilisant Ray ? Oui, en suivant ce processus simple en 4 étapes.
Étape 1 – Initialiser Ray. Ajoutez ces deux lignes au début de votre code.
import ray
ray.init()
Étape 2 – Créer notre fonction distante. C’est facile. Décorez simplement la fonction que nous souhaitons optimiser avec le décorateur @ray.remote. La fonction à décorer est celle qui demande le plus de travail. Dans notre exemple, il s’agit de la fonction count_primes.
@ray.remote(num_cpus=1)
def count_primes(start: int, end: int) -> int:
...
...
Étape 3 – Lancez les tâches parallèles. Appelez votre fonction distante à l’aide du .télécommande Directive de rayon.
refs.append(count_primes.remote(s, e))
Étape 4 – Attendez que toutes nos tâches soient terminées. Chaque tâche dans Ray renvoie un RéfObjet quand on l’a appelé. C’est une promesse de Ray. Cela signifie que Ray a désactivé l’exécution de la tâche à distance et que Ray renverra sa valeur à un moment donné dans le futur. Nous surveillons tous les ObjectRefs renvoyés par l’exécution de tâches à l’aide du ray.get() fonction. Cela bloque jusqu’à ce que toutes les tâches soient terminées.
results = ray.get(tasks)
Mettons tout cela ensemble. Comme vous le verrez, les modifications apportées à notre code d’origine sont minimes : seulement quatre lignes de code ajoutées et une instruction d’impression pour afficher le nombre de nœuds et de cœurs sur lesquels nous travaillons.
import math
import time
# -----------------------------------------
# Change No. 1
# -----------------------------------------
import ray
ray.init(auto)
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
# -----------------------------------------
# Change No. 2
# -----------------------------------------
@ray.remote(num_cpus=1) # pure-Python loop → 1 CPU per task
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 60_000_000
total_cpus = int(ray.cluster_resources().get("CPU", 1))
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"nodes={len(ray.nodes())}, CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
refs = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
# -----------------------------------------
# Change No. 3
# -----------------------------------------
refs.append(count_primes.remote(s, e))
# -----------------------------------------
# Change No. 4
# -----------------------------------------
total = sum(ray.get(refs))
print(f"total={total}, time={time.time() - t0:.2f}s")
Maintenant, est-ce que tout cela en vaut la peine ? Exécutons le nouveau code et voyons ce que nous obtenons.
2025-11-01 13:36:30,650 INFO worker.py:2004 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
/home/tom/.local/lib/python3.10/site-packages/ray/_private/worker.py:2052: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
warnings.warn(
nodes=1, CPUs~32, chunks=64
total=606028, time=3.04s
Eh bien, le résultat parle de lui-même. Le code Ray Python est 10x plus rapide que le code Python classique. Pas trop mal.
D’où vient cette augmentation de vitesse ? Eh bien, Ray peut répartir votre charge de travail sur tous les cœurs de votre système. Un cœur est comme un mini-CPU. Lorsque nous avons exécuté notre code Python d’origine, il n’utilisait qu’un seul cœur. C’est bien, mais si votre processeur possède plus d’un cœur, ce que font la plupart des PC modernes, vous laissez de l’argent sur la table, pour ainsi dire.
Dans mon cas, le processeur possède 24 cœurs, il n’est donc pas surprenant que mon code Ray soit bien plus rapide que le code non-Ray.
Emplois Ray Surveillance
Un autre point à mentionner est que Ray facilite la surveillance des exécutions de tâches via un tableau de bord. Remarquez dans le résultat que nous avons reçu lors de l’exécution de notre exemple de code Ray, nous avons vu ceci :
... -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265
Il affiche un lien URL local car je l’exécute sur mon bureau. Si vous l’exécutiez sur un cluster, l’URL pointerait vers un emplacement sur le nœud principal du cluster.
Lorsque vous cliquez sur le lien URL indiqué, vous devriez voir quelque chose de similaire à ceci :

À partir de cet écran principal, vous pouvez explorer de nombreux aspects de vos programmes Ray en utilisant les liens de menu en haut de la page.
Utiliser des acteurs Ray
J’ai déjà mentionné que les acteurs faisaient partie intégrante du traitement de base de Ray. Les acteurs sont utilisés pour coordonner et partager des données entre les tâches Ray. Par exemple, supposons que vous souhaitiez définir une limite globale pour TOUTES les tâches en cours d’exécution à laquelle elles doivent se conformer. Supposons que vous disposiez d’un pool de tâches de travail, mais que vous souhaitiez vous assurer que seules cinq de ces tâches au maximum peuvent s’exécuter simultanément. Voici un code qui pourrait fonctionner.
import math, time, os
def is_prime(n: int) -> bool:
if n < 2: return False
if n == 2: return True
if n % 2 == 0: return False
r = int(math.isqrt(n)) + 1
for i in range(3, r, 2):
if n % i == 0:
return False
return True
def count_primes(a: int, b: int) -> int:
c = 0
for n in range(a, b):
if is_prime(n):
c += 1
return c
if __name__ == "__main__":
A, B = 10_000_000, 20_000_000
total_cpus = os.cpu_count() or 1
# Start "chunky"; we can sweep this later
chunks = max(4, total_cpus * 2)
step = (B - A) // chunks
print(f"CPUs~{total_cpus}, chunks={chunks}")
t0 = time.time()
results = []
for i in range(chunks):
s = A + i * step
e = s + step if i < chunks - 1 else B
results.append(count_primes(s, e))
total = sum(results)
print(f"total={total}, time={time.time() - t0:.2f}s")
Nous avons utilisé une variable globale pour limiter le nombre de tâches en cours d’exécution, et le code est syntaxiquement correct et s’exécute sans erreur. Malheureusement, vous n’obtiendrez pas le résultat escompté. En effet, chaque tâche Ray s’exécute dans son propre espace de processus et possède sa propre copie de la variable globale. La variable globale n’est PAS partagée entre les fonctions. Ainsi, lorsque nous exécuterons le code ci-dessus, nous verrons un résultat comme celui-ci :
Total calls: 200
Intended GLOBAL_QPS: 5.0
Expected time if truly global-limited: ~40.00s
Actual time with 'global var' (broken): 3.80s
Observed cluster QPS: ~52.6 (should have been ~5.0)
Pour résoudre ce problème, nous utilisons un acteur. Rappelez-vous qu’un acteur n’est qu’une classe Python décorée par Ray. Voici le code avec les acteurs.
import time, ray
ray.init(ignore_reinit_error=True, log_to_driver=False)
# This is our actor
@ray.remote
class GlobalPacer:
"""Serialize calls so cluster-wide rate <= qps."""
def __init__(self, qps: float):
self.interval = 1.0 / qps
self.next_time = time.time()
def acquire(self):
# Wait inside the actor until we can proceed
now = time.time()
if now < self.next_time:
time.sleep(self.next_time - now)
# Reserve the next slot; guard against drift
self.next_time = max(self.next_time + self.interval, time.time())
return True
@ray.remote
def call_api_with_limit(n_calls: int, pacer):
done = 0
for _ in range(n_calls):
# Wait for global permission
ray.get(pacer.acquire.remote())
# pretend API call (no extra sleep here)
done += 1
return done
if __name__ == "__main__":
NUM_WORKERS = 10
CALLS_EACH = 20
GLOBAL_QPS = 5.0 # cluster-wide cap
total_calls = NUM_WORKERS * CALLS_EACH
expected_min_time = total_calls / GLOBAL_QPS
pacer = GlobalPacer.remote(GLOBAL_QPS)
t0 = time.time()
ray.get([call_api_with_limit.remote(CALLS_EACH, pacer) for _ in range(NUM_WORKERS)])
dt = time.time() - t0
print(f"Total calls: {total_calls}")
print(f"Global QPS cap: {GLOBAL_QPS}")
print(f"Expected time (if capped at {GLOBAL_QPS} QPS): ~{expected_min_time:.2f}s")
print(f"Actual time with actor: {dt:.2f}s")
print(f"Observed cluster QPS: ~{total_calls/dt:.1f}")
Notre code limiteur est encapsulé dans une classe (GlobalPacer) et décoré avec ray.remote, ce qui signifie qu’il s’applique à toutes les tâches en cours d’exécution. Nous pouvons voir la différence que cela fait sur le résultat en exécutant le code mis à jour.
Total calls: 200
Global QPS cap: 5.0
Expected time (if capped at 5.0 QPS): ~40.00s
Actual time with actor: 39.86s
Observed cluster QPS: ~5.0
Résumé
Cet article a présenté Rayonun framework Python open source qui facilite la faire évoluer les programmes gourmands en calcul d’un seul cœur à plusieurs cœurs ou même un cluster avec des modifications de code minimes.
J’ai brièvement mentionné les composants clés de Ray (Ray Data, Ray Train, Ray Tune, Ray Serve et Ray Core), en soulignant que Ray Core est idéal pour la mise à l’échelle du processeur à usage général.
J’ai expliqué certains des concepts essentiels de Ray Core, tels que son introduction de tâches (fonctions parallèles sans état), d’acteurs (travailleurs avec état pour l’état partagé et la coordination) et ObjectRefs (une promesse future de la valeur de retour d’une tâche).
Pour présenter les avantages de l’utilisation de Ray, j’ai commencé par un exemple simple gourmand en CPU – compter les nombres premiers sur une plage – et j’ai montré à quel point son exécution sur un seul cœur peut être lente avec une implémentation naïve de Python.
Au lieu de réécrire le code dans un autre langage ou d’utiliser des bibliothèques multitraitements complexes, Ray vous permet de paralléliser la charge de travail en seulement quatre étapes simples et quelques lignes de code supplémentaires :
- ray.init() pour démarrer Ray
- Décorez vos fonctions avec @ray.remote pour les transformer en tâches parallèles
- .remote() pour lancer des tâches simultanément, et
- ray.get() pour collecter les résultats des tâches.
Cette approche a réduit le temps d’exécution de l’exemple de comptage de nombres premiers d’environ 30 secondes à environ 3 secondes sur une machine à 24 cœurs.
J’ai également mentionné à quel point il est facile de surveiller les tâches en cours d’exécution dans Ray à l’aide de son tableau de bord intégré et j’ai montré comment y accéder.
Enfin, j’ai fourni un exemple d’utilisation d’un Ray Actor par montrant pourquoi les variables globales ne conviennent pas pour la coordination entre les tâches, puisque chaque travailleur dispose de son propre espace mémoire.
Dans la deuxième partie de cette série, nous verrons comment amener les choses à un autre niveau en permettant aux tâches Ray d’utiliser encore plus de puissance CPU à mesure que nous évoluons vers de grands serveurs multi-nœuds dans le cloud via Amazon Web Services.



