
PySpark pour les utilisateurs Pandas | Vers la science des données
un réel problème lorsqu’il s’agit de très grands ensembles de données. Ce que j’entends par « très volumineux », ce sont les données qui dépassent la capacité de la RAM d’une seule machine.
Certains des principaux points de friction auxquels les utilisateurs de Pandas sont confrontés incluent :
Contraintes en mémoire
Pandas nécessite que l’intégralité de l’ensemble de données qu’il traite se trouve dans la mémoire vive (RAM) de la machine. Il ne peut pas facilement traiter les données stockées sur un disque dur à moins qu’elles ne soient d’abord chargées, et si ces données sont trop volumineuses pour votre mémoire, vous rencontrez des problèmes.
Par exemple, si vous essayez de charger un fichier CSV de 100 Go dans Pandas sur un ordinateur portable standard doté de 16 Go de RAM, le code plantera immédiatement.
Et ce n’est pas seulement un rapport de 1:1. En raison des types de données et de la surcharge des objets, Pandas nécessite généralement plusieurs multiples de la RAM requise par la taille du fichier sur le disque. Avec 16 Go de RAM, la limite de taille de fichier peut être aussi basse que 3 à 4 Go.
Exécution monothread
Pandas a été conçu pour des raisons de commodité et d’analyse, et non pour une échelle de performances brutes. Par défaut, Pandas exécute des opérations sur un seul cœur de processeur. Même si un utilisateur exécute son code sur un serveur puissant doté de 64 cœurs, les Pandas n’en utiliseront en grande partie qu’un, laissant les autres inactifs.
Exécution hâtive contre évaluation paresseuse
Pandas utilise Eager Execution, ce qui signifie qu’il effectue des calculs dès que le code est exécuté. Les outils Big Data (comme Apache Spark) utilisent Lazy Evaluation. Cette dernière est souvent plus performante qu’une exécution hâtive, car lorsqu’une série d’étapes est requise pour effectuer une tâche, une évaluation paresseuse peut examiner toutes les étapes et le résultat final requis et optimiser de manière appropriée. Une exécution hâtive ne peut pas faire cela. Il exécute aveuglément chaque étape tour à tour, quoi qu’il arrive.
Limites de mise à l’échelle verticale
Pour que Pandas fonctionne avec des ensembles de données plus volumineux, vous devez vous appuyer sur la mise à l’échelle verticale (en achetant un ordinateur plus cher avec plus de RAM et un processeur plus rapide). Mais cela ne peut vous mener que jusqu’à un certain point. Par exemple, Pandas n’a pas la capacité native de « parler » avec un cluster. Il ne peut pas distribuer une trame de données sur plusieurs machines.
Alors que faire ?
Comme toujours dans le monde informatique, plusieurs solutions se présentent. Trois des alternatives les plus populaires sont : –
1/ Dask ou Ray
Il s’agit de bibliothèques tierces qui vous aident à écrire du code distribué pouvant s’exécuter sur des clusters d’ordinateurs. Bien que ceux-ci tentent d’imiter l’API Pandas, ils présentent toujours des différences et des limitations subtiles qui peuvent nécessiter une refactorisation du code.
2/ Spark : Un autre moteur de calcul distribué. Nécessite une syntaxe différente et un modèle mental différent.
3/ SGBDR : nécessite le déplacement de vos données dans une base de données et l’apprentissage de SQL.
Toutes les options ci-dessus nécessitent pas mal de travail à mettre en œuvre, mais pour le reste de cet article, je me concentrerai sur l’option 2.
Alors, disons que je vous ai convaincu, ou du moins éveillé votre intérêt, et que vous envisagez de déplacer tout ou partie de votre traitement basé sur Pandas existant vers PySpark. Quelle devrait être votre prochaine démarche ? Eh bien, vous devrez commencer à convertir tout ou partie de votre base de code. Cela pourrait être intimidant, mais ne vous inquiétez pas, je suis là pour vous.
Poursuivez votre lecture pendant que je vous présente un certain nombre d’exemples d’extraits de code qui présentent certaines opérations de traitement de données typiques, des plus simples aux plus complexes. Je suis sûr que vous reconnaîtrez certains de ces modèles dans votre propre code. Je vais vous montrer la façon de faire des Pandas et la reproduire dans PySpark, en fournissant des comparaisons de sortie et de timing entre les deux.
Configuration de l’environnement de développement
J’utilise Ubuntu sur WSL2. Tout d’abord, nous allons mettre en place un environnement de développement distinct pour ce travail, en garantissant que nos projets sont cloisonnés et n’interfèrent pas les uns avec les autres. J’utilise Conda pour cette partie, mais n’hésitez pas à utiliser la méthode à laquelle vous êtes habitué.
Installez PySpark, Pandas, etc.
(base) $ conda create -n pandas_to_pyspark python=3.11 -y
(base) $ conda activate pandas_to_pyspark
(pands_to_pyspark) $ conda install jupyter polars pyarrow pandas -y
(pands_to_pyspark) $ conda install -c conda-forge pyspark
Pour vérifier que PySpark a été correctement installé, tapez la commande pyspark dans une fenêtre de terminal.
(pands_to_pyspark) pyspark
Python 3.11.14 | packaged by conda-forge | (main, Oct 22 2025, 22:46:25) [GCC 14.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
WARNING: Using incubator modules: jdk.incubator.vector
WARNING: package sun.security.action not in java.base
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/15 16:15:21 WARN Utils: Your hostname, tpr-desktop, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/15 16:15:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/15 16:15:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARNING: A terminally deprecated method in sun.misc.Unsafe has been called
WARNING: sun.misc.Unsafe::arrayBaseOffset has been called by org.apache.spark.unsafe.Platform (file:/home/tom/miniconda3/envs/pandas_to_pyspark/lib/python3.11/site-packages/pyspark/jars/spark-unsafe_2.13-4.1.1.jar)
WARNING: Please consider reporting this to the maintainers of class org.apache.spark.unsafe.Platform
WARNING: sun.misc.Unsafe::arrayBaseOffset will be removed in a future release
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 4.1.1
/_/
Using Python version 3.11.14 (main, Oct 22 2025 22:46:25)
Spark context Web UI available at http://10.255.255.254:4040
Spark context available as 'sc' (master = local[*], app id = local-1768493723158).
SparkSession available as 'spark'.
>>>
Si vous ne voyez pas la bannière de bienvenue de Spark, c’est que quelque chose s’est mal passé et vous devez revérifier votre installation.
Obtenir notre exemple d’ensemble de données
Nous n’avons pas besoin d’un ensemble compliqué pour nos besoins. Un ensemble de données de ventes synthétiques avec le schéma suivant suffira :
- id_commande (int)
- date_commande (date)
- client_id (int)
- nom_client (str)
- id_produit (int)
- nom_produit (str)
- catégorie (str)
- quantité (int)
- prix (flottant)
- total (flottant)
Nos données d’entrée seront un fichier CSV de 30 millions d’enregistrements. Voici un programme Python pour générer les données de test :
import polars as pl
import random
from datetime import datetime, timedelta
# Generate fake data
def generate_fake_data(num_records):
random.seed(42)
product_names = ['Laptop', 'Smartphone', 'Desk', 'Chair', 'Monitor',
'Printer', 'Paper', 'Pen', 'Notebook', 'Coffee Maker']
categories = ['Electronics', 'Electronics', 'Office', 'Office', 'Electronics',
'Electronics', 'Office', 'Office', 'Office', 'Electronics']
data = {
'order_id': range(num_records),
'order_date': [datetime(2023, 1, 1) + timedelta(days=random.randint(0, 364))
for _ in range(num_records)],
'customer_id': [random.randint(100, 999) for _ in range(num_records)],
'customer_name': [f'Customer_{random.randint(0, 99999)}' for _ in range(num_records)],
'product_id': [random.randint(200, 209) for _ in range(num_records)],
'product_name': [random.choice(product_names) for _ in range(num_records)],
'category': [random.choice(categories) for _ in range(num_records)],
'quantity': [random.randint(1, 10) for _ in range(num_records)],
'price': [round(random.uniform(1.99, 999.99), 2) for _ in range(num_records)]
}
df = pl.DataFrame(data)
df = df.with_columns((pl.col('price') * pl.col('quantity')).alias('total'))
return df
# Generate 30 million records
num_records = 30000000
df = generate_fake_data(num_records)
# Save to CSV
df.write_csv('/mnt/d/sales_data/sales_data_30m.csv')
print('CSV file with fake sales data has been created.')
Voici à quoi ressemblaient les premières lignes de mon fichier de données de test.
order_id,order_date,customer_id,customer_name,product_id,product_name,category,quantity,price,total
0,2023-11-24T00:00:00.000000,434,Customer_46318,201,Notebook,Office,6,925.68,5554.08
1,2023-02-27T00:00:00.000000,495,Customer_26514,203,Coffee Maker,Office,3,676.44,2029.3200000000002
2,2023-01-13T00:00:00.000000,377,Customer_56676,204,Pen,Electronics,10,533.2,5332.0
3,2023-05-21T00:00:00.000000,272,Customer_13772,209,Notebook,Electronics,5,752.0,3760.0
4,2023-05-06T00:00:00.000000,490,Customer_23118,206,Coffee Maker,Electronics,3,747.46,2242.38
5,2023-04-25T00:00:00.000000,515,Customer_88284,202,Desk,Electronics,10,886.22,8862.2
6,2023-03-13T00:00:00.000000,885,Customer_47303,200,Desk,Electronics,1,38.97,38.97
7,2023-02-22T00:00:00.000000,598,Customer_90712,203,Desk,Electronics,5,956.31,4781.549999999999
8,2023-12-13T00:00:00.000000,781,Customer_32943,205,Coffee Maker,Electronics,7,258.25,1807.75
9,2023-10-07T00:00:00.000000,797,Customer_40215,208,Pen,Electronics,8,464.81,3718.48
10,2023-02-14T00:00:00.000000,333,Customer_18388,209,Monitor,Electronics,1,478.95,478.95
Exemples de codes
Démarrez un notebook Jupyter :
(pands_to_pyspark) $ jupyter notebook
Les données et les deux jeux de codes que je vais exécuter se trouvent sur mon ordinateur de bureau. Je montrerai les sorties des deux exécutions de code afin que vous puissiez vérifier qu’elles effectuent la même tâche, et j’inclurai les délais (en secondes) afin que vous puissiez comparer les performances. Le code et la sortie Pandas en premier, puis le code et la sortie Spark.
Les extraits de code sont courts et bien commentés, donc si vous êtes déjà un programmeur Pandas, il devrait être assez facile de suivre ce qui se passe dans le code PySpark si vous ne le connaissez pas déjà.
Pour être clair, comme l’ensemble de données d’entrée que j’utiliserai n’est PAS du « big data », les timings doivent être considérés comme étant d’importance secondaire.
Exemple 1 — Chargement de données à partir d’un CSV
Nous allons commencer par une opération simple : il suffit de lire notre fichier de données CSV d’entrée et de le trier par les colonnes order_date et order_id avant d’afficher les cinq premier et dernier enregistrements.
Voici le code Pandas.
import pandas as pd
import time
# 1. Define Path (WSL format)
file_path = "/mnt/d/sales_data/sales_data_30m.csv"
print(f"Starting process for {file_path}...")
# --- LOAD PHASE ---
start_load = time.time()
df = pd.read_csv(file_path)
end_load = time.time()
print(f"Loading complete. Time taken: {end_load - start_load:.2f} seconds")
# --- SORT PHASE ---
start_sort = time.time()
# Note: Sorting by two columns at once
df_sorted = df.sort_values(by=['order_date', 'order_id'])
end_sort = time.time()
print(f"Sorting complete. Time taken: {end_sort - start_sort:.2f} seconds")
# --- DISPLAY ---
print("\n" + "="*30)
print("TOP 5 RECORDS")
print(df_sorted.head(5))
print("\nBOTTOM 5 RECORDS")
print(df_sorted.tail(5))
print("="*30)
total_time = end_sort - start_load
print(f"\nTotal Execution Time: {total_time:.2f} seconds")
Voici le résultat.
(pands_to_pyspark) $ python ex1_pandas.py
Starting process for /mnt/d/sales_data/sales_data_30m.csv...
Loading complete. Time taken: 34.02 seconds
Sorting complete. Time taken: 7.00 seconds
==============================
TOP 5 RECORDS
order_id order_date customer_id customer_name ... category quantity price total
179 179 2023-01-01T00:00:00.000000 350 Customer_93033 ... Office 5 640.16 3200.80
520 520 2023-01-01T00:00:00.000000 858 Customer_31280 ... Electronics 3 841.21 2523.63
557 557 2023-01-01T00:00:00.000000 651 Customer_95137 ... Office 7 75.66 529.62
1080 1080 2023-01-01T00:00:00.000000 303 Customer_87422 ... Electronics 10 98.34 983.40
2023 2023 2023-01-01T00:00:00.000000 838 Customer_95193 ... Office 4 427.96 1711.84
[5 rows x 10 columns]
BOTTOM 5 RECORDS
order_id order_date customer_id customer_name ... category quantity price total
29997832 29997832 2023-12-31T00:00:00.000000 831 Customer_49372 ... Electronics 6 418.86 2513.16
29997903 29997903 2023-12-31T00:00:00.000000 449 Customer_17384 ... Office 3 494.29 1482.87
29998337 29998337 2023-12-31T00:00:00.000000 649 Customer_24018 ... Electronics 5 241.71 1208.55
29999674 29999674 2023-12-31T00:00:00.000000 105 Customer_39890 ... Office 1 94.97 94.97
29999933 29999933 2023-12-31T00:00:00.000000 572 Customer_38794 ... Office 8 375.36 3002.88
[5 rows x 10 columns]
==============================
Total Execution Time: 41.03 seconds
Voici le code Spark équivalent et le résultat du traitement.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time
import pandas as pd
start_overall = time.time()
# 1. Initialize with explicit Memory and Shuffle tuning
spark = SparkSession.builder \
.appName("OptimizedSpark") \
.config("spark.sql.shuffle.partitions", "16") \
.config("spark.driver.memory", "8g") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 2. Define Manual Schema (Skips the double-read of inferSchema)
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("customer_id", IntegerType(), True),
StructField("customer_name", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("total", DoubleType(), True)
])
file_path = "/mnt/d/sales_data/sales_data_30m.csv"
print(f"Processing {file_path} with Optimized Spark...")
# --- LOAD ---
start_load = time.time()
# No inferSchema!
df = spark.read.csv(file_path, header=True, schema=schema)
print(f"LOAD INITIATED. (Time taken: {time.time() - start_load:.2f}s)")
# --- SORT ---
start_sort = time.time()
# Sorting 30M rows
df_sorted = df.orderBy(["order_date", "order_id"])
# Force the sort with a light action (NOT cache)
row_count = df_sorted.count()
end_sort = time.time()
print(f"SORT COMPLETE. Rows: {row_count}")
print(f" Time taken: {end_sort - start_sort:.2f} seconds")
# --- DISPLAY ---
print("\n" + "="*80)
print("TOP 5 RECORDS")
print(df_sorted.limit(5).toPandas().to_string(index=False))
print("\nBOTTOM 5 RECORDS")
tail_data = df_sorted.tail(5)
print(pd.DataFrame(tail_data, columns=df.columns).to_string(index=False))
print("="*80)
print(f"\nTotal Execution Time: {time.time() - start_overall:.2f} seconds")
spark.stop()
Et la sortie.
(pands_to_pyspark) $ spark-submit ex1_spark.py 2> /dev/null
Processing /mnt/d/sales_data/sales_data_30m.csv with Optimized Spark...
LOAD INITIATED. (Time taken: 0.72s)
SORT COMPLETE. Rows: 30000000
Time taken: 5.65 seconds
================================================================================
TOP 5 RECORDS
order_id order_date customer_id customer_name product_id product_name category quantity price total
179 2023-01-01 350 Customer_93033 207 Desk Office 5 640.16 3200.80
520 2023-01-01 858 Customer_31280 201 Pen Electronics 3 841.21 2523.63
557 2023-01-01 651 Customer_95137 209 Printer Office 7 75.66 529.62
1080 2023-01-01 303 Customer_87422 204 Smartphone Electronics 10 98.34 983.40
2023 2023-01-01 838 Customer_95193 201 Paper Office 4 427.96 1711.84
BOTTOM 5 RECORDS
order_id order_date customer_id customer_name product_id product_name category quantity price total
29997832 2023-12-31 831 Customer_49372 201 Chair Electronics 6 418.86 2513.16
29997903 2023-12-31 449 Customer_17384 205 Desk Office 3 494.29 1482.87
29998337 2023-12-31 649 Customer_24018 201 Smartphone Electronics 5 241.71 1208.55
29999674 2023-12-31 105 Customer_39890 203 Chair Office 1 94.97 94.97
29999933 2023-12-31 572 Customer_38794 201 Desk Office 8 375.36 3002.88
================================================================================
Total Execution Time: 36.12 seconds
Exemple 2— Conversion d’un fichier CSV en Parquet
Dans cet exemple, nous allons lire le même fichier CSV d’entrée de 30 Mo, puis l’écrire à nouveau sous forme de fichier Parquet.
Comme précédemment, nous commencerons par le code et la sortie des pandas.
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import time
csv_file = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_file = "/mnt/d/sales_data/sales_data_pandas_30m.parquet"
chunk_size = 1_000_000 # Process 1 million rows at a time
print(f"Starting memory-efficient conversion...")
start_total = time.time()
# 1. Create a CSV reader object (this doesn't load data yet)
reader = pd.read_csv(csv_file, chunksize=chunk_size)
parquet_writer = None
for i, chunk in enumerate(reader):
start_chunk = time.time()
# Convert Pandas chunk to PyArrow Table
table = pa.Table.from_pandas(chunk)
# Initialize the writer on the first chunk
if parquet_writer is None:
parquet_writer = pq.ParquetWriter(parquet_file, table.schema, compression='snappy')
# Write this chunk to the file
parquet_writer.write_table(table)
print(f"Processed chunk {i+1} (Rows {i*chunk_size} to {(i+1)*chunk_size}) in {time.time() - start_chunk:.2f}s")
# 2. Close the writer
if parquet_writer:
parquet_writer.close()
print("\n" + "="*40)
print(f"Conversion Complete!")
print(f"Total Time: {time.time() - start_total:.2f} seconds")
print("="*40)
La sortie.
(pands_to_pyspark) $ python ex2_pandas.py
Starting memory-efficient conversion...
Processed chunk 1 (Rows 0 to 1000000) in 4.82s
Processed chunk 2 (Rows 1000000 to 2000000) in 0.40s
Processed chunk 3 (Rows 2000000 to 3000000) in 0.39s
Processed chunk 4 (Rows 3000000 to 4000000) in 0.36s
Processed chunk 5 (Rows 4000000 to 5000000) in 0.43s
Processed chunk 6 (Rows 5000000 to 6000000) in 0.45s
Processed chunk 7 (Rows 6000000 to 7000000) in 0.35s
Processed chunk 8 (Rows 7000000 to 8000000) in 0.34s
Processed chunk 9 (Rows 8000000 to 9000000) in 0.36s
Processed chunk 10 (Rows 9000000 to 10000000) in 0.36s
Processed chunk 11 (Rows 10000000 to 11000000) in 0.37s
Processed chunk 12 (Rows 11000000 to 12000000) in 0.41s
Processed chunk 13 (Rows 12000000 to 13000000) in 0.48s
Processed chunk 14 (Rows 13000000 to 14000000) in 0.43s
Processed chunk 15 (Rows 14000000 to 15000000) in 0.38s
Processed chunk 16 (Rows 15000000 to 16000000) in 0.35s
Processed chunk 17 (Rows 16000000 to 17000000) in 0.34s
Processed chunk 18 (Rows 17000000 to 18000000) in 0.35s
Processed chunk 19 (Rows 18000000 to 19000000) in 0.36s
Processed chunk 20 (Rows 19000000 to 20000000) in 0.35s
Processed chunk 21 (Rows 20000000 to 21000000) in 0.34s
Processed chunk 22 (Rows 21000000 to 22000000) in 0.34s
Processed chunk 23 (Rows 22000000 to 23000000) in 0.34s
Processed chunk 24 (Rows 23000000 to 24000000) in 0.36s
Processed chunk 25 (Rows 24000000 to 25000000) in 0.36s
Processed chunk 26 (Rows 25000000 to 26000000) in 0.35s
Processed chunk 27 (Rows 26000000 to 27000000) in 0.36s
Processed chunk 28 (Rows 27000000 to 28000000) in 0.35s
Processed chunk 29 (Rows 28000000 to 29000000) in 0.35s
Processed chunk 30 (Rows 29000000 to 30000000) in 0.34s
========================================
Conversion Complete!
Total Time: 43.30 seconds
========================================
Et maintenant pour PySpark.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
import time
# Start the overall timer immediately
start_overall = time.time()
# 1. Initialize Spark with high memory configuration
spark = SparkSession.builder \
.appName("EfficientParquetConversion") \
.config("spark.driver.memory", "8g") \
.master("local[*]") \
.getOrCreate()
# Silence logs
spark.sparkContext.setLogLevel("ERROR")
# 2. Explicitly define the Schema (Most efficient for CSV)
schema = StructType([
StructField("order_id", IntegerType(), True),
StructField("order_date", DateType(), True),
StructField("customer_id", IntegerType(), True),
StructField("customer_name", StringType(), True),
StructField("product_id", IntegerType(), True),
StructField("product_name", StringType(), True),
StructField("category", StringType(), True),
StructField("quantity", IntegerType(), True),
StructField("price", DoubleType(), True),
StructField("total", DoubleType(), True)
])
csv_path = "/mnt/d/sales_data/sales_data_30m.csv"
parquet_path = "/mnt/d/sales_data/sales_data_parquet"
print(f"Starting Spark conversion to {parquet_path}...")
# 3. Read the CSV using the defined schema
start_proc = time.time()
df = spark.read.csv(csv_path, header=True, schema=schema)
# 4. Write to Parquet (Overwrite if exists)
df.write.mode("overwrite").parquet(parquet_path)
end_proc = time.time()
print("-" * 40)
print(f"CONVERSION COMPLETE")
print(f"Processing Time (Read + Write): {end_proc - start_proc:.2f} seconds")
print(f"Total Execution Time (incl. Spark startup): {time.time() - start_overall:.2f} seconds")
print("-" * 40)
spark.stop()
Je peux confirmer que le contenu du fichier parquet créé par Pandas et Pyspark était identique.
(pands_to_pyspark) $ spark-submit --driver-memory 8g ex2_spark.py 2> /dev/null
Starting Spark conversion to /mnt/d/sales_data/sales_data_parquet...
----------------------------------------
CONVERSION COMPLETE
Processing Time (Read + Write): 21.62 seconds
Total Execution Time (incl. Spark startup): 23.26 seconds
----------------------------------------
Exemple 3 — Pivotement des données
Lisez les fichiers Parquet que nous venons de créer et calculez les ventes totales par nom_produit et par date_commande.
Des pandas.
import pandas as pd
from timeit import default_timer as timer
parquet_path = r'/mnt/d/sales_data/sales_data_pandas_30m.parquet'
start = timer()
# Read the Parquet file
df = pd.read_parquet(parquet_path)
# 1) Make order_date a proper date
# Convert to datetime then extract the date component
df["order_date"] = pd.to_datetime(df["order_date"]).dt.date
# 2) Pivot (sum)
# Pandas pivot_table handles the aggregation (sum) and the shape simultaneously
pivot = df.pivot_table(
values="total",
index="order_date",
columns="product_name",
aggfunc="sum"
)
# 3) Sort rows by date (Pandas index)
pivot = pivot.sort_index()
# 4) Enforce a consistent column order (alphabetical product columns)
# pivot_table already sorts columns by default, but we can be explicit
pivot = pivot.reindex(sorted(pivot.columns), axis=1)
# 5) (Optional) Replace nulls with 0
# pivot = pivot.fillna(0)
end = timer()
print(f"Pandas: read + standardized pivot took {end - start:.2f} seconds")
print(pivot.head(5))
Sortie Pandas.
(pandas_pysaprk) $ python ex3_pandas.py
Pandas: read + standardized pivot took 9.98 seconds
product_name Chair Coffee Maker Desk Laptop ... Paper Pen Printer Smartphone
order_date ...
2023-01-01 22041864.51 22596967.46 22228235.43 22319250.97 ... 22778128.78 22690394.34 22747419.90 22848102.42
2023-01-02 22702337.42 21960074.98 23539803.82 23332945.56 ... 22414013.44 22378123.52 22494364.89 22321919.79
2023-01-03 22626028.85 22651440.10 22930421.42 22938328.34 ... 22880161.09 21607713.73 22937117.72 22262604.28
2023-01-04 22605466.70 22652219.77 22463371.43 22506729.47 ... 23097987.72 22327386.63 22922449.38 22673066.75
2023-01-05 22581240.40 23004302.70 22511769.34 22882968.52 ... 22058769.99 22379327.80 22946133.94 22988219.48
[5 rows x 10 columns]
PySpark.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from timeit import default_timer as timer
# Initialize Spark
spark = SparkSession.builder \
.appName("SparkPivotBenchmark") \
.config("spark.driver.memory", "8g") \
.master("local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
parquet_path = '/mnt/d/sales_data/sales_data_parquet'
start = timer()
# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)
# 2. Make order_date a proper date
# We cast the column to DateType
df = df.withColumn("order_date", F.col("order_date").cast("date"))
# 3. Pivot (sum)
# Spark's pivot is much faster if you provide the unique values (product_names)
# but it can also infer them automatically as shown below
pivot_df = df.groupBy("order_date") \
.pivot("product_name") \
.agg(F.sum("total"))
# 4. Sort rows by date
pivot_df = pivot_df.orderBy("order_date")
# 5. Enforce consistent column order (alphabetical product columns)
# The first column is 'order_date', the rest are the pivoted products
columns = pivot_df.columns
product_cols = sorted([c for c in columns if c != "order_date"])
pivot_df = pivot_df.select(["order_date"] + product_cols)
# 6. Replace nulls with 0
pivot_df = pivot_df.na.fill(0)
# Trigger an action to measure actual performance (count of pivoted days)
row_count = pivot_df.count()
end = timer()
print(f"PySpark: read + standardized pivot took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")
# 7. Display top 5
pivot_df.show(5)
spark.stop()
Sortie PySpark.
(pandas_pyspark) $ spark-submit --driver-memory 8g ex3_spark.py 2> /dev/null
PySpark: read + standardized pivot took 3.54 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|order_date| Chair| Coffee Maker| Desk| Laptop| Monitor| Notebook| Paper| Pen| Printer| Smartphone|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|2023-01-01|2.2041864510000005E7|2.2596967459999997E7| 2.222823543E7|2.2319250969999995E7| 2.309861159E7|2.2687765309999995E7|2.2778128780000005E7|2.2690394339999996E7| 2.27474199E7|2.2848102419999998E7|
|2023-01-02| 2.270233742E7|2.1960074980000004E7|2.3539803819999993E7|2.3332945560000006E7|2.2441403840000004E7| 2.282151253E7| 2.241401344E7|2.2378123520000003E7| 2.249436489E7| 2.232191979E7|
|2023-01-03|2.2626028849999998E7| 2.26514401E7| 2.293042142E7| 2.293832834E7| 2.290862974E7|2.2432433990000006E7|2.2880161090000004E7|2.1607713730000008E7| 2.293711772E7| 2.226260428E7|
|2023-01-04|2.2605466699999996E7|2.2652219770000003E7| 2.246337143E7| 2.250672947000001E7|2.1930874809999995E7|2.3261865149999995E7| 2.309798772E7|2.2327386629999995E7|2.2922449380000003E7|2.2673066749999996E7|
|2023-01-05|2.2581240400000002E7|2.3004302700000003E7| 2.251176934E7|2.2882968520000003E7| 2.284090005E7| 2.272256243E7|2.2058769990000002E7|2.2379327800000004E7|2.2946133940000005E7| 2.298821948E7|
+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows
Exemple 4 — Analyse de fenêtrage avec LAG/LEAD
Pour mon dernier exemple de code, nous calculerons la SOMME de toutes les commandes par date_commande, puis utiliserons la fonctionnalité LAG/LEAD pour calculer la variation en pourcentage du total des commandes sur des dates de commande consécutives.
Des pandas.
import pandas as pd
from timeit import default_timer as timer
parquet_path = '/mnt/d/sales_data/sales_data_pandas_30m.parquet'
start = timer()
# 1. Read the Parquet file
df = pd.read_parquet(parquet_path)
# 2. Normalize order_date
# Pandas to_datetime is generally flexible enough to handle multiple formats
# automatically, which replaces the manual pl.coalesce logic.
df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce').dt.date
# 3. Group by date and aggregate
result_pandas = df.groupby("order_date")["total"].sum().reset_index()
# 4. Sort by date
result_pandas = result_pandas.sort_values("order_date")
# 5. Analytic functions (Lag and Lead)
# In Pandas, shift(1) is lag, shift(-1) is lead
result_pandas["total_lag"] = result_pandas["total"].shift(1)
result_pandas["total_lead"] = result_pandas["total"].shift(-1)
# 6. Calculate Percent Changes
# We use Series operations which handle the 'None/NaN' and 'divide by zero'
# logic similar to pl.when().otherwise()
result_pandas["percent_change_from_lag"] = (
(result_pandas["total"] - result_pandas["total_lag"]) * 100 / result_pandas["total_lag"]
)
result_pandas["percent_change_from_lead"] = (
(result_pandas["total"] - result_pandas["total_lead"]) * 100 / result_pandas["total_lead"]
)
end = timer()
print(f"Pandas: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(result_pandas.head(10).to_string(index=False))
Sortie Pandas.
(pandas_pyspark) $ python ex4_pandas.py
Pandas: read + analytic (lag/lead) took 8.99 seconds
order_date total total_lag total_lead percent_change_from_lag percent_change_from_lead
2023-01-01 226036740.71 NaN 226406499.79 NaN -0.163316
2023-01-02 226406499.79 226036740.71 226174879.26 0.163584 0.102408
2023-01-03 226174879.26 226406499.79 226441417.81 -0.102303 -0.117708
2023-01-04 226441417.81 226174879.26 226916194.65 0.117846 -0.209230
2023-01-05 226916194.65 226441417.81 226990804.43 0.209669 -0.032869
2023-01-06 226990804.43 226916194.65 225973424.85 0.032880 0.450221
2023-01-07 225973424.85 226990804.43 227894370.99 -0.448203 -0.842911
2023-01-08 227894370.99 225973424.85 227111347.09 0.850076 0.344775
2023-01-09 227111347.09 227894370.99 226271884.19 -0.343591 0.370997
2023-01-10 226271884.19 227111347.09 226635543.97 -0.369626 -0.160460
PySpark.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from timeit import default_timer as timer
# Initialize Spark
spark = SparkSession.builder \
.appName("SparkAnalyticBenchmark") \
.config("spark.driver.memory", "8g") \
.master("local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Path to the Parquet file
parquet_path = '/mnt/d/sales_data/sales_data_parquet'
start = timer()
# 1. Read the Parquet file
df = spark.read.parquet(parquet_path)
# 2. Normalize order_date
# Spark's to_date is efficient; coalesce handles multiple potential formats if needed
df = df.withColumn("order_date", F.to_date(F.col("order_date")))
# 3. Group by date and aggregate
daily_revenue = df.groupBy("order_date").agg(F.sum("total").alias("total"))
# 4. Define the Window for Analytic functions
# We must order by date for lag/lead to make sense
window_spec = Window.orderBy("order_date")
# 5. Apply Lag and Lead
# lag(col, 1) = previous row; lead(col, 1) = next row
daily_revenue = daily_revenue.withColumn("total_lag", F.lag("total", 1).over(window_spec))
daily_revenue = daily_revenue.withColumn("total_lead", F.lead("total", 1).over(window_spec))
# 6. Calculate Percent Changes
# We use F.when() to handle nulls and avoid division by zero
daily_revenue = daily_revenue.withColumn(
"percent_change_from_lag",
F.when((F.col("total_lag").isNotNull()) & (F.col("total_lag") != 0),
(F.col("total") - F.col("total_lag")) * 100 / F.col("total_lag"))
.otherwise(None)
)
daily_revenue = daily_revenue.withColumn(
"percent_change_from_lead",
F.when((F.col("total_lead").isNotNull()) & (F.col("total_lead") != 0),
(F.col("total") - F.col("total_lead")) * 100 / F.col("total_lead"))
.otherwise(None)
)
# 7. Final Sort and Action
result_spark = daily_revenue.orderBy("order_date")
# Trigger action to measure performance
row_count = result_spark.count()
end = timer()
print(f"PySpark: read + analytic (lag/lead) took {end - start:.2f} seconds")
print(f"Total days processed: {row_count}")
# Display top 10
result_spark.show(10)
spark.stop()
Sortie PySpark.
(pandas_pyspark) $ spark-submit --driver-memory 8g ex4_spark.py 2> /dev/null
PySpark: read + analytic (lag/lead) took 4.05 seconds
Total days processed: 365
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|order_date| total| total_lag| total_lead|percent_change_from_lag|percent_change_from_lead|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
|2023-01-01| 2.2603674071E8| NULL|2.2640649979000002E8| NULL| -0.16331645970543143|
|2023-01-02|2.2640649979000002E8| 2.2603674071E8| 2.2617487926E8| 0.16358361868011784| 0.10240771687724477|
|2023-01-03| 2.2617487926E8|2.2640649979000002E8|2.2644141781000003E8| -0.1023029507610723| -0.11770750800707579|
|2023-01-04|2.2644141781000003E8| 2.2617487926E8|2.2691619464999998E8| 0.11784622185810545| -0.2092300378702583|
|2023-01-05|2.2691619464999998E8|2.2644141781000003E8|2.2699080442999995E8| 0.20966872782889678| -0.03286907599068832|
|2023-01-06|2.2699080442999995E8|2.2691619464999998E8| 2.259734248499999E8| 0.032879883304517334| 0.45022089684898775|
|2023-01-07| 2.259734248499999E8|2.2699080442999995E8|2.2789437099000004E8| -0.4482029933127909| -0.8429107448575048|
|2023-01-08|2.2789437099000004E8| 2.259734248499999E8|2.2711134708999988E8| 0.8500761278788644| 0.344775331586518|
|2023-01-09|2.2711134708999988E8|2.2789437099000004E8|2.2627188419000003E8| -0.34359071555765364| 0.37099744097899573|
|2023-01-10|2.2627188419000003E8|2.2711134708999988E8|2.2663554396999997E8| -0.3696261374678007| -0.1604601703817825|
+----------+--------------------+--------------------+--------------------+-----------------------+------------------------+
only showing top 10 rows
Résumé
Dans cet article, j’ai expliqué qu’il existe de nombreuses façons de mettre à niveau vos systèmes si les données que vous traitez commencent à empiéter sur le territoire du « big data », de sorte qu’il devient difficile (voire impossible) de les traiter à l’aide de votre base de code Pandas existante.
J’ai cité trois alternatives courantes : des bibliothèques distribuées telles que dask ou ray, déplacer vos données vers un SGBDR et les interroger avec SQL, ou utiliser la bibliothèque de calcul distribuée – Spark.
En me concentrant sur ce dernier point, j’ai présenté le cas de PySpark, puis j’ai utilisé quatre exemples concrets de tâches de traitement de données typiques pour lesquelles Pandas est régulièrement utilisé, ainsi que le code PySpark équivalent pour chacune.
Bien que les références temporelles aient montré une certaine amélioration des temps d’exécution de PySpark par rapport à Pandas, celles-ci n’étaient pas l’objectif principal. Après tout, avec des ensembles de données encore plus volumineux, les Pandas ne seraient tout simplement pas en mesure de les traiter, encore moins dans un laps de temps précis.
Au lieu de cela, l’objectif principal de cet article était de vous montrer à quel point il est relativement simple de :
- Obtenez un environnement Spark opérationnel rapidement.
- Répliquez les opérations de données Pandas courantes dans le langage PySpark pour vous donner l’assurance que le Big Data ne doit pas limiter vos capacités de traitement.
En comblant le fossé entre l’analyse monothread et le traitement évolutif du Big Data, sachez que vous pouvez faire évoluer vos flux de travail en toute confiance à mesure que vos données dépassent la taille de votre matériel local.



