
Mise à l’échelle de l’inférence ML sur Databricks : liquide ou partitionné ? Salé ou pas ?
Introduction
une variable continue pour quatre produits différents. Le pipeline d’apprentissage automatique a été construit dans Databricks et comporte deux composants principaux.
- Préparation des fonctionnalités en SQL avec calcul sans serveur.
- Inférence sur un ensemble de plusieurs centaines de modèles utilisant des clusters de tâches pour contrôler la puissance de calcul.
Lors de notre première tentative, un cluster de 420 cœurs a passé près de 10 heures à traiter seulement 18 partitions.
L’objectif est de ajustez le flux de données pour maximiser l’utilisation du cluster et garantir l’évolutivité. L’inférence est effectuée sur quatre ensembles de modèles ML, un ensemble par produit. Cependant, nous nous concentrerons sur comment les données sont enregistrées comme il le présentera quel degré de parallélisme pouvons-nous exploiter pour inférence. Nous ne nous concentrerons pas sur le fonctionnement interne de l’inférence elle-même.
S’il y a trop peu de partitions de fichiers, le cluster mettra beaucoup de temps à analyser les fichiers volumineux et à ce stade, à moins d’être réparti (cela signifie une latence réseau et un brassage de données supplémentaires), vous pourriez également faire des inférences sur un grand ensemble de lignes dans chaque partition. Cela entraîne également des temps d’exécution longs.

Cependant, les entreprises ont une patience limitée pour expédier des pipelines ML ayant un impact direct sur l’organisation. Les tests sont donc limités.
Dans cet article, nous passerons en revue notre paysage de données de fonctionnalités, puis fournirons un aperçu de l’inférence ML et présenterons les résultats et les discussions sur les performances de l’inférence sur la base de quatre scénarios de traitement d’ensembles de données :
- Table partitionnée, pas de sel, pas de limite de lignes dans les partitions (non salé et cloisonné)
- Table partitionnée, salée, avec limite de 1 million de lignes (salé et cloisonné)
- Table clusterisée par liquide, pas de sel, pas de limite de lignes dans les partitions (non salé et liquide)
- Table regroupée en liquide, salée, avec limite de 1 million de lignes (salé et liquide)
Paysage de données
L’ensemble de données contient des fonctionnalités que l’ensemble de modèles ML utilise pour l’inférence. Il comporte environ 550 millions de lignes et contient quatre produits identifiés dans l’attribut ProductLine:
- Produit A : ~10,45 millions (1,9%)
- Produit B : ~4,4M (0,8%)
- Produit C : ~100M (17,6%)
- Produit D : ~354M (79,7%)
Il a alors un autre attribut de faible cardinalité attrB, qui ne contient que deux valeurs distinctes et est utilisé comme filtre pour extraire des sous-ensembles de l’ensemble de données pour chaque partie du système ML.
De plus, RunDate enregistre la date à laquelle les fonctionnalités ont été générées. Ils sont uniquement en annexe. Enfin, l’ensemble de données est lu à l’aide de la requête suivante :
SELECT
Id,
ProductLine,
AttrB,
AttrC,
RunDate,
{model_features}
FROM
catalog.schema.FeatureStore
WHERE
ProductLine = :product AND
AttrB = :attributeB AND
RunDate = :RunDate
Implémentation du sel
Le salage ici est généré dynamiquement. Son but est de répartir les données selon les volumes. Cela signifie que les gros produits reçoivent plus de seaux et les produits plus petits reçoivent moins de seaux. Par exemple, le produit D devrait recevoir environ 80 % des catégories, compte tenu des proportions dans le paysage des données.
Nous faisons cela afin de pouvoir avoir des temps d’exécution d’inférence prévisibles et maximiser l’utilisation du cluster.
# Calculate percentage of each (ProductLine, AttrB) based on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
"ProductLine", "AttrB"
).count()
total_count = df_demand_price_grid_load.count()
brand_cat_percents = brand_cat_counts.withColumn(
"percent", F.col("count") / F.lit(total_count)
)
# Collect percentages as dicts with string keys (this will later determine
# the number of salt buckets each product receives
brand_cat_percent_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['percent']
for row in brand_cat_percents.collect()
}
# Collect counts as dicts with string keys (this will help
# to add an additional bucket if counts is not divisible by the number of
# buckets for the product
brand_cat_count_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['count']
for row in brand_cat_percents.collect()
}
# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
expr = []
for k, v in d.items():
expr.append(F.lit(k))
expr.append(F.lit(v))
return expr
percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))
# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"product_cat_key",
F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"percent", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
"product_count", count_case.getItem(F.col("product_cat_key"))
)
# Set min/max buckets
min_buckets = 10
max_buckets = 1160
# Calculate buckets per row based on (BrandName, price_delta_cat) percentage
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets_base",
(F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).cast("int")
)
# Add an extra bucket if brand_count is not divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets",
F.when(
(F.col("product_count") % F.col("buckets_base")) != 0,
F.col("buckets_base") + 1
).otherwise(F.col("buckets_base"))
)
# Generate salt per row based on (ProductLine, AttrB) bucket count
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"salt",
(F.rand(seed=42) * F.col("buckets")).cast("int")
)
# Perform the repartition using the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")
Enfin, nous enregistrons notre ensemble de données dans la table des fonctionnalités et ajoutons un nombre maximum de lignes par partition. Cela permet d’empêcher Spark de générer des partitions avec trop de lignes, ce qu’il peut faire même si nous avons déjà calculé le sel.
Pourquoi imposons-nous 1 million de lignes ? L’accent principal est mis sur le temps d’inférence du modèle, et non sur la taille du fichier. Après quelques tests avec 1M, 1,5M, 2M, le premier donne les meilleures performances dans notre cas. Encore une fois, ce projet est très limité en termes de budget et de temps, nous devons donc tirer le meilleur parti de nos ressources.
df_demand_price_grid_load.write\
.mode("overwrite")\
.option("replaceWhere", f"RunDate = '{params['RunDate']}'")\
.option("maxRecordsPerFile", 1_000_000) \
.partitionBy("RunDate", "price_delta_cat", "BrandName") \
.saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")
Pourquoi ne pas simplement compter sur l’exécution adaptative de requêtes (AQE) de Spark ?
Rappelez-vous que l’accent est mis principalement sur les temps d’inférence, et non sur les mesures adaptées aux requêtes Spark SQL classiques, comme la taille des fichiers. Utiliser uniquement l’AQE était en fait notre première tentative. Comme vous le verrez dans les résultats, les temps d’exécution étaient très indésirables et n’ont pas maximisé l’utilisation du cluster compte tenu des proportions de nos données.
Inférence d’apprentissage automatique
Il existe un pipeline avec 4 tâches, une par produit. Chaque tâche effectue les étapes générales suivantes :
- Charge les fonctionnalités du produit correspondant
- Charge le sous-ensemble de modèles ML pour le produit correspondant
- Effectue une inférence dans la moitié du sous-ensemble découpé par
AttrB - Effectue une inférence dans l’autre moitié découpée par
AttrB - Enregistre les données dans le tableau des résultats
Nous nous concentrerons sur l’une des étapes d’inférence pour ne pas surcharger cet article de chiffres, bien que l’autre étape soit très similaire dans sa structure et ses résultats. De plus, vous pouvez voir le DAG pour l’inférence à évaluer sur la figure 2.

Cela semble très simple, mais les temps d’exécution peuvent varier en fonction de la manière dont vos données sont enregistrées et de la taille de votre cluster.
Configuration des clusters
Pour l’étape d’inférence que nous analysons, il existe un cluster par produit, adapté aux limites de l’infrastructure du projet, ainsi qu’à la distribution des données :
- Produit A : 35 travailleurs (Standard_DS14v2, 420 cœurs)
- Produit B : 5 workers (Standard_DS14v2, 70 cœurs)
- Produit C : 1 travailleur (Standard_DS14v2, 14 cœurs)
- Produit D : 1 travailleur (Standard_DS14v2, 14 cœurs)
De plus, AdaptiveQueryExecution est activé par défaut, ce qui permettra à Spark de décider de la meilleure façon de sauvegarder les données en fonction du contexte que vous fournissez.
Résultats et discussion
Vous verrez pour chaque scénario une représentation du nombre de partitions de fichiers par produit et du nombre moyen de lignes par partition pour vous donner une indication du nombre de lignes que le système ML effectuera l’inférence par tâche Spark. De plus, nous présentons les métriques de Spark UI pour observer les performances d’exécution et rechercher la distribution des données au moment de l’inférence. Nous ferons la partie Spark UI uniquement pour le produit D, qui est le plus important, afin de ne pas inclure un excès d’informations. De plus, selon le scénario, l’inférence sur le produit D devient un goulot d’étranglement au moment de l’exécution. Une autre raison pour laquelle c’était l’objectif principal des résultats.
Non salé et Partitionné
Vous pouvez voir sur la figure 3 qu’une partition de fichier moyenne comporte des dizaines de millions de lignes, ce qui signifie un temps d’exécution considérable pour un seul exécuteur. Le plus grand en moyenne est le produit C avec plus de 45 millions de lignes dans une seule partition. Le plus petit est le produit B avec environ 12 millions de lignes en moyenne.

La figure 4 représente le nombre de partitions par produit, avec un total de 26 pour l’ensemble. En vérifiant le produit D, 18 partitions sont très inférieures aux 420 cœurs dont nous disposons et en moyenne, chaque partition effectuera une inférence sur environ 40 millions de lignes.

Jetez un œil à la figure 5. Au total, le cluster a passé 9,9 heures et il n’était toujours pas terminé, car nous avons dû arrêter le travail, car cela devenait coûteux et bloquait les tests des autres.

À partir des statistiques récapitulatives de la figure 6 pour les tâches terminées, nous pouvons voir qu’il y avait une forte asymétrie dans les partitions pour le produit D. La taille d’entrée maximale était d’environ 56 Mo et la durée d’exécution était de 7,8 heures.

Non salé et liquide
Dans ce scénario, nous pouvons observer des résultats très similaires en termes de nombre moyen de lignes par partition de fichier et de nombre de partitions par produit, comme le montrent respectivement les figures 7 et 8.

Le produit D possède 19 partitions de fichiers, soit encore très peu de 420 cœurs.

Nous pouvons déjà prévoir que cette expérience allait coûter très cher, j’ai donc décidé de sauter le test d’inférence pour ce scénario. Encore une fois, dans une situation idéale, nous le reportons, mais il y a un arriéré de tickets dans mon tableau.
Salé et Partitionné
Après avoir appliqué le processus de salage et de répartition, nous obtenons environ 2,5 millions d’enregistrements moyens par partition pour les produits A et B, et environ 1 million pour les produits C et D, comme le montre la figure 9.

De plus, nous pouvons voir sur la figure 10 que le nombre de partitions de fichiers est passé à environ 860 pour le produit D, ce qui donne 430 pour chaque étape d’inférence.

Cela se traduit par un temps d’exécution de 3 heures pour inférer le produit D avec 360 tâches, comme le montre la figure 11.

En vérifiant les statistiques récapitulatives de la figure 12, la distribution semble équilibrée avec des temps d’exécution d’environ 1,7, mais une tâche maximale prenant 3 heures, ce qui mérite une étude plus approfondie à l’avenir.

Un grand avantage est que le sel répartit les données en fonction des proportions des produits. Si nous avions plus de ressources disponibles, nous pourrions augmenter le nombre de partitions aléatoires dans repartition() et ajoutez des travailleurs en fonction des proportions des données. Cela garantit que notre processus évolue de manière prévisible.
Salé et liquide
Ce scénario combine les deux leviers les plus puissants que nous avons explorés jusqu’à présent :
le salage pour contrôler la taille et le parallélisme des fichiers, et le clustering liquide pour conserver les données associées au même endroit sans limites de partition rigides.
Après avoir appliqué la même stratégie de salage et une limite de 1 million de lignes par partition, le tableau groupé de liquides montre une taille de partition moyenne très similaire au cas salé et partitionné, comme le montre la figure 13. Les produits C et D restent proches de l’objectif de 1 million de lignes, tandis que les produits A et B se stabilisent légèrement au-dessus de ce seuil.

Cependant, la principale différence apparaît dans la manière dont ces partitions sont distribuées et consommées par Spark. Comme le montre la figure 14, le produit D atteint à nouveau un nombre élevé de partitions de fichiers, fournissant suffisamment de parallélisme pour saturer les cœurs disponibles lors de l’inférence.

Contrairement à son homologue partitionné, le clustering liquide permet à Spark d’adapter la disposition des fichiers au fil du temps tout en bénéficiant du sel. Cela se traduit par une répartition plus uniforme du travail entre les exécuteurs, avec moins de valeurs aberrantes extrêmes en termes de taille d’entrée et de durée des tâches.
À partir des statistiques récapitulatives de la figure 15, nous observons que la majorité des tâches sont terminées dans une fenêtre d’exécution étroite et que la durée maximale de la tâche est inférieure à celle du scénario salé et partitionné. Cela indique une réduction de l’asymétrie et un meilleur équilibrage de charge sur l’ensemble du cluster.


Un effet secondaire important est que le clustering liquide préserve la localité des données pour les colonnes filtrées sans imposer de limites de partition strictes. Cela permet à Spark de continuer à bénéficier du saut de données, tandis que le sel garantit qu’aucun exécuteur n’est submergé par des dizaines de millions de lignes.
Dans l’ensemble, salé et liquide apparaît comme la configuration la plus robuste : elle maximise le parallélisme, minimise les biais et réduit le risque opérationnel lorsque les charges de travail d’inférence augmentent ou que les configurations du cluster changent.
Points clés à retenir
- L’évolutivité de l’inférence est souvent limitée par la disposition des données et non par la complexité du modèle. Des partitions de fichiers de mauvaise taille peuvent laisser des centaines de cœurs inactifs tandis que quelques exécuteurs traitent des dizaines de millions de lignes.
- Le partitionnement seul ne suffit pas pour une inférence à grande échelle. Sans contrôler la taille des fichiers, les tables partitionnées peuvent toujours produire des partitions massives qui conduisent à des tâches asymétriques et de longue durée.
- Le salage est un outil efficace pour débloquer le parallélisme. L’introduction d’une clé salt et l’application d’une limite de lignes par partition augmentent considérablement le nombre de tâches exécutables et stabilisent les temps d’exécution.
- Le regroupement de liquides complète le salage en réduisant les biais sans limites rigides. Cela permet à Spark d’adapter la disposition des fichiers au fil du temps, rendant le système plus résilient à mesure que les données augmentent.



