
Comment créer un pipeline ETL météo alimenté par l’IA avec Databricks et GPT-4o : de l’API au tableau de bord
Databricks a encore une fois ébranlé le marché des données. La société a lancé son édition gratuite de la plateforme Databricks avec toutes les fonctionnalités incluses. C’est pour le moins une excellente ressource pour l’apprentissage et les tests.
Dans cet esprit, j’ai créé un projet de bout en bout pour vous aider à apprendre les principes fondamentaux des principales ressources de Databricks.
Ce projet démontre un flux de travail complet d’extraction, de transformation, de chargement (ETL) dans Databricks. Il intègre l’API OpenWeatherMap pour la récupération de données et le modèle OpenAI GPT-4o-mini pour fournir des suggestions d’habillage personnalisées en fonction de la météo.
Apprenons-en davantage.
Le projet
Le projet implémente un pipeline de données complet au sein de Databricks, en suivant ces étapes.
- Extrait: Récupère les données météorologiques actuelles pour La ville de New York via l’API OpenWeatherMap [1].
- Transformer: convertit les horodatages UTC en heure locale de New York et utilise OpenAI [2] GPT-4o-mini pour générer des suggestions d’habillage personnalisées en fonction de la température.
- Charger: conserve les données dans le catalogue Databricks Unity à la fois sous forme de fichiers JSON bruts et de table Delta structurée (Silver Layer).
- Orchestration: Le notebook avec ce code ETL est ajouté à une tâche et planifié pour s’exécuter toutes les heures dans Databricks.
- Analytique: La couche d’argent alimente un tableau de bord Databricks qui affiche des informations météorologiques pertinentes ainsi que les suggestions du LLM.
Voici l’architecture.

Super. Maintenant que nous comprenons ce que nous devons faire, passons à comment morceau de ce tutoriel.
Note: si vous n’avez toujours pas de compte dans Databricks, rendez-vous sur la page Databricks Free Edition [3]cliquez Inscrivez-vous pour l’édition gratuite et suivez les instructions à l’écran pour obtenir votre accès gratuit.
Extrait : Intégration de l’API et des Databricks
Comme je le dis habituellement, un projet de données a besoin de données pour démarrer, n’est-ce pas ? Notre tâche ici consiste donc à intégrer l’API OpenWeatherMap pour ingérer des données directement dans un notebook PySpark au sein de Databricks. Cette tâche peut paraître compliquée au premier abord, mais croyez-moi, elle ne l’est pas.
Sur la page initiale de Databricks, créez un nouveau bloc-notes à l’aide du +Nouveau puis sélectionnez Carnet de notes.

Pour le Extrait partie, nous aurons besoin de :
1. La clé API de l’API OpenWeatherMap.
Pour l’obtenir, rendez-vous sur Page d’inscription de l’API et complétez votre processus d’inscription gratuit. Une fois connecté au tableau de bord, cliquez sur le Onglet Clé APIoù vous pourrez le voir.
2. Importer des packages
# Imports
import requests
import json
Ensuite, nous allons créer un Python class pour modulariser notre code et le rendre également prêt pour la production.
- Cette classe reçoit l’API_KEY que nous venons de créer, ainsi que la ville et le pays pour la récupération météo.
- Renvoie la réponse au format JSON.
# Creating a class to modularize our code
class Weather:
# Define the constructor
def __init__(self, API_KEY):
self.API_KEY = API_KEY
# Define a method to retrieve weather data
def get_weather(self, city, country, units='imperial'):
self.city = city
self.country = country
self.units = units
# Make a GET request to an API endpoint that returns JSON data
url = f"https://api.openweathermap.org/data/2.5/weather?q={city},{country}&APPID={w.API_KEY}&units={units}"
response = requests.get(url)
# Use the .json() method to parse the response text and return
if response.status_code != 200:
raise Exception(f"Error: {response.status_code} - {response.text}")
return response.json()
Bon. Nous pouvons maintenant organiser ce cours. Remarquez que nous utilisons dbutils.widgets.get(). Cette commande regarde le Paramètres dans le travail planifié, que nous verrons plus loin dans cet article. C’est une bonne pratique de garder les secrets en sécurité.
# Get the API OpenWeatherMap key
API_KEY = dbutils.widgets.get('API_KEY')
# Instantiate the class
w = Weather(API_KEY=API_KEY)
# Get the weather data
nyc = w.get_weather(city='New York', country='US')
nyc
Voici la réponse.
{'coord': {'lon': -74.006, 'lat': 40.7143},
'weather': [{'id': 804,
'main': 'Clouds',
'description': 'overcast clouds',
'icon': '04d'}],
'base': 'stations',
'main': {'temp': 54.14,
'feels_like': 53.44,
'temp_min': 51.76,
'temp_max': 56.26,
'pressure': 992,
'humidity': 89,
'sea_level': 992,
'grnd_level': 993},
'visibility': 10000,
'wind': {'speed': 21.85, 'deg': 270, 'gust': 37.98},
'clouds': {'all': 100},
'dt': 1766161441,
'sys': {'type': 1,
'id': 4610,
'country': 'US',
'sunrise': 1766146541,
'sunset': 1766179850},
'timezone': -18000,
'id': 5128581,
'name': 'New York',
'cod': 200}
Avec cette réponse en main, nous pouvons passer à la partie Transformation de notre projet, où nous nettoierons et transformerons les données.
Transformer : formater les données
Dans cette section, nous examinerons les tâches de nettoyage et de transformation effectuées sur les données brutes. Nous commencerons par sélectionner les éléments de données nécessaires à notre tableau de bord. Il s’agit simplement d’obtenir des données à partir d’un dictionnaire (ou d’un JSON).
# Getting information
id = nyc['id']
timestamp = nyc['dt']
weather = nyc['weather'][0]['main']
temp = nyc['main']['temp']
tmin = nyc['main']['temp_min']
tmax = nyc['main']['temp_max']
country = nyc['sys']['country']
city = nyc['name']
sunrise = nyc['sys']['sunrise']
sunset = nyc['sys']['sunset']
Ensuite, transformons les horodatages en fonction du fuseau horaire de New York, car il correspond à l’heure de Greenwich.
# Transform sunrise and sunset to datetime in NYC timezone
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
import time
# Timestamp, Sunrise and Sunset to NYC timezone
target_timezone = ZoneInfo("America/New_York")
dt_utc = datetime.fromtimestamp(sunrise, tz=timezone.utc)
sunrise_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunrise time time
dt_utc = datetime.fromtimestamp(sunset, tz=timezone.utc)
sunset_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunset time time
dt_utc = datetime.fromtimestamp(timestamp, tz=timezone.utc)
time_nyc = str(dt_utc.astimezone(target_timezone))
Enfin, nous le formatons sous forme de dataframe Spark.
# Create a dataframe from the variables
df = spark.createDataFrame([[id, time_nyc, weather, temp, tmin, tmax, country, city, sunrise_nyc, sunset_nyc]], schema=['id', 'timestamp','weather', 'temp', 'tmin', 'tmax', 'country', 'city', 'sunrise', 'sunset'])

La dernière étape de cette section consiste à ajouter la suggestion d’un LLM. Dans cette étape, nous allons sélectionner certaines des données extraites de l’API et les transmettre au modèle, en lui demandant de renvoyer une suggestion sur la façon dont une personne pourrait s’habiller pour se préparer à la météo.
- Vous aurez besoin d’une clé API OpenAI.
- Passer les conditions météorologiques, maximum et min températures (
weather,tmax,tmin) - Demandez au LLM de vous faire une suggestion sur la façon de vous habiller en fonction de la météo.
- Ajoutez la suggestion au dataframe final.
%pip install openai --quiet
from openai import OpenAI
import pyspark.sql.functions as F
from pyspark.sql.functions import col
# Get OpenAI Key
OPENAI_API_KEY= dbutils.widgets.get('OPENAI_API_KEY')
client = OpenAI(
# This is the default and can be omitted
api_key=OPENAI_API_KEY
)
response = client.responses.create(
model="gpt-4o-mini",
instructions="You are a weatherman that gives suggestions about how to dress based on the weather. Answer in one sentence.",
input=f"The weather is {weather}, with max temperature {tmax} and min temperature {tmin}. How should I dress?"
)
suggestion = response.output_text
# Add the suggestion to the df
df = df.withColumn('suggestion', F.lit(suggestion))
display(df)
Cool. Nous avons presque terminé avec l’ETL. Il ne reste plus qu’à le charger. C’est la section suivante.
Charger : sauvegarder les données et créer la couche d’argent
Le dernier morceau de l’ETL est chargement les données. Nous allons le charger de deux manières différentes.
- Persistance des fichiers bruts dans un volume de catalogue Unity.
- Sauvegarde de la trame de données transformée directement dans la couche Silver, qui est une table Delta prête pour la consommation du tableau de bord.
Créons un catalogue qui contiendra toutes les données météorologiques que nous obtenons de l’API.
-- Creating a Catalog
CREATE CATALOG IF NOT EXISTS pipeline_weather
COMMENT 'This is the catalog for the weather pipeline';
Ensuite, nous créons un schéma pour la Maison du Lac. Celui-ci stockera le volume avec les fichiers JSON bruts récupérés.
-- Creating a Schema
CREATE SCHEMA IF NOT EXISTS pipeline_weather.lakehouse
COMMENT 'This is the schema for the weather pipeline';
Maintenant, nous créons le volume pour les fichiers bruts.
-- Let's create a volume
CREATE VOLUME IF NOT EXISTS pipeline_weather.lakehouse.raw_data
COMMENT 'This is the raw data volume for the weather pipeline';
Nous créons également un autre schéma pour tenir la table Delta de la couche d’argent.
--Creating Schema to hold transformed data
CREATE SCHEMA IF NOT EXISTS pipeline_weather.silver
COMMENT 'This is the schema for the weather pipeline';
Une fois que tout est configuré, voici à quoi ressemble notre catalogue.

Maintenant, enregistrons la réponse JSON brute dans notre volume brut. Pour que tout reste organisé et éviter l’écrasement, nous attachons un horodatage unique à chaque nom de fichier.
Par ajout ces fichiers sur le volume plutôt que de simplement les écraser, nous créons une « piste d’audit » fiable. Cela agit comme un filet de sécurité, ce qui signifie que si un processus en aval échoue ou si nous subissons une perte de données ultérieurement, nous pouvons toujours revenir à la source et retraiter les données d’origine chaque fois que nous en avons besoin.
# Get timestamp
stamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
# Path to save
json_path = f'/Volumes/pipeline_weather/lakehouse/raw_data/weather_{stamp}.json'
# Save the data into a json file
df.write.mode('append').json(json_path)
Bien que nous conservions le JSON brut comme « source de vérité », la véritable magie se produit en enregistrant les données nettoyées dans une table Delta de la couche Silver. En utilisant .mode(« append ») et le format Delta, nous garantissons que nos données sont structurées, appliquées par des schémas et prêtes pour des outils d’analyse ou de BI à grande vitesse. Cette couche transforme les réponses API désordonnées en une table fiable et interrogeable qui s’agrandit à chaque exécution du pipeline.
# Save the transformed data into a table (schema)
(
df
.write
.format('delta')
.mode("append")
.saveAsTable('pipeline_weather.silver.weather')
)
Beau! Une fois tout cela réglé, vérifions à quoi ressemble notre table maintenant.

Commençons maintenant à automatiser ce pipeline.
Orchestration : Planification de l’exécution automatique du bloc-notes
Poursuivant le projet, il est temps de faire fonctionner ce pipeline tout seul, avec un minimum de supervision. Pour cela, Databricks dispose du Emplois et pipelines où il est facile de planifier l’exécution de tâches.
- Cliquez sur le Emplois et pipelines onglet sur le panneau de gauche
- Trouver le bouton Créer et sélectionnez Travail
- Cliquez sur Notebook pour l’ajouter au Job.
- Configurez comme les données ci-dessous.
- Ajoutez les clés API aux paramètres.
- Cliquez Créer une tâche.
- Cliquez Courez maintenant pour tester si ça marche.


Une fois que vous avez cliqué sur le Courez maintenant bouton, il devrait commencer à exécuter le portable et afficher le Réussi message.

Si le travail fonctionne correctement, il est temps de planifier son exécution automatique.
- Cliquez sur Ajouter un déclencheur sur le côté droit de l’écran, juste en dessous de la section Horaires et déclencheurs.
- Type de déclencheur = Planifié.
- Type d’horaire : sélectionnez Avancé
- Sélectionner Toutes les 1 heure dans les listes déroulantes.
- Enregistrez-le.
Excellent. Notre pipeline est désormais en mode automatique ! Toutes les heures, le système accédera à l’API OpenWeatherMap et obtiendra de nouvelles informations météorologiques pour New York et les enregistrera dans notre Silver Layer Table.
Analytics : créer un tableau de bord pour les décisions basées sur les données
La dernière pièce de ce puzzle consiste à créer le livrable Analytics, qui affichera les informations météorologiques et fournira à l’utilisateur des informations exploitables sur la façon de s’habiller en fonction de la météo extérieure.
- Cliquez sur le Onglet Tableaux de bord sur le panneau latéral gauche.
- Cliquez sur le Créer un tableau de bord bouton
- Cela ouvrira une toile vierge sur laquelle nous pourrons travailler.

Désormais, les tableaux de bord fonctionnent sur la base des données extraites des requêtes SQL. Par conséquent, avant de commencer à ajouter du texte et des graphiques au canevas, nous devons d’abord créer des métriques qui seront les variables pour alimenter les cartes et les graphiques du tableau de bord.
Alors cliquez sur +Créer à partir de SQL bouton pour démarrer une métrique. Donnez-lui un nom. Par exemple, Emplacementpour récupérer le dernier nom de ville récupéré, je dois utiliser cette requête qui suit.
-- Get the latest city name fetched
SELECT city
FROM pipeline_weather.silver.weather
ORDER BY timestamp DESC
LIMIT 1
Et nous devons créer une requête SQL pour chaque métrique. Vous pouvez tous les voir dans le référentiel GitHub [ ].
Ensuite, nous cliquons sur l’onglet Tableau de bord et commençons à glisser et déposer des éléments sur le canevas.

Une fois que vous avez cliqué sur le texte, cela vous permet d’insérer une boîte dans le canevas et de modifier le texte. Lorsque vous cliquez sur l’élément graphique, il insère un espace réservé pour un graphique et ouvre le menu de droite pour la sélection des variables et la configuration.

D’accord. Une fois tous les éléments ajoutés, le tableau de bord ressemblera à ceci.

Tellement sympa ! Et cela conclut notre projet.
Avant de partir
Vous pouvez facilement reproduire ce projet en une heure environ, en fonction de votre expérience avec l’écosystème Databricks. Bien qu’il s’agisse d’une construction rapide, elle contient de nombreuses compétences d’ingénierie de base que vous pourrez exercer :
- Conception architecturale: Vous apprendrez à structurer un environnement Lakehouse moderne à partir de zéro.
- Intégration transparente des données: Vous comblerez le fossé entre les API Web externes et la plateforme Databricks pour l’ingestion de données en temps réel.
- Code propre et modulaire: Nous allons au-delà des simples scripts en utilisant des classes et des fonctions Python pour garder la base de code organisée et maintenable.
- Automatisation et orchestration: Vous acquerrez une expérience pratique dans la planification de tâches pour garantir que votre projet se déroule de manière fiable sur pilote automatique.
- Offrir une vraie valeur: L’objectif n’est pas seulement de déplacer des données ; c’est pour apporter de la valeur. En transformant les mesures météorologiques brutes en suggestions d’habillage exploitables via l’IA, nous transformons les « données froides » en un service utile pour l’utilisateur final.
Si vous avez aimé ce contenu, retrouvez mes contacts et plus sur moi sur mon site Internet.
Dépôt GitHub
Voici le référentiel de ce projet.
https://github.com/gurezende/Databricks-Weather-Pipeline
Références
[1. OpenWeatherMap API] (https://openweathermap.org/)
[2. Open Ai Platform] (https://platform.openai.com/)
[3. Databricks Free Edition] (https://www.databricks.com/learn/free-edition)
[4. GitHub Repository] (https://github.com/gurezende/Databricks-Weather-Pipeline)



