
Implémentation de la recherche DRIFT avec Neo4j et LlamaIndex
Implémentation de GraphRAG de Microsoft a été l’un des premiers systèmes et a introduit de nombreuses fonctionnalités innovantes. Il combine à la fois la phase d’indexation, au cours de laquelle les entités, les relations et les communautés hiérarchiques sont extraites et résumées, avec des fonctionnalités avancées de temps de requête. Cette approche permet au système de répondre à des questions thématiques générales en exploitant des résumés précalculés d’entités, de relations et de communautés, allant au-delà des limites traditionnelles de récupération de documents des systèmes RAG standard.

J’ai couvert la phase d’indexation ainsi que les mécanismes de recherche globaux et locaux dans des articles de blog précédents (ici et ici), nous allons donc ignorer ces détails dans cette discussion. Cependant, nous n’avons pas encore exploré Recherche DÉRIVEqui sera le sujet de cet article de blog. DRIFT est une approche plus récente qui combine les caractéristiques des méthodes de recherche globales et locales. La technique commence par exploiter les informations de la communauté via une recherche vectorielle pour établir un point de départ large pour les requêtes, puis utilise ces informations sur la communauté pour affiner la question d’origine en requêtes de suivi détaillées. Cela permet à DRIFT de parcourir dynamiquement le graphe de connaissances pour récupérer des informations spécifiques sur les entités, les relations et d’autres détails localisés, équilibrant ainsi l’efficacité du calcul et la qualité complète des réponses.

La mise en œuvre utilise Flux de travail LlamaIndex pour orchestrer le processus de recherche DRIFT à travers plusieurs étapes clés. Cela commence par Génération HyDEcréant une réponse hypothétique basée sur un exemple de rapport de communauté pour améliorer la représentation des requêtes.
Le étape de recherche dans la communauté utilise ensuite la similarité vectorielle pour identifier les rapports de la communauté les plus pertinents, fournissant ainsi un contexte large pour la requête. Le système analyse ces résultats pour générer une première réponse intermédiaire et un ensemble de requêtes de suivi pour une enquête plus approfondie.
Ces requêtes de suivi sont exécutées en parallèle pendant la phase de recherche localerécupérant des informations ciblées, notamment des morceaux de texte, des entités, des relations et des rapports de communauté supplémentaires à partir du graphique de connaissances. Ce processus peut être répété jusqu’à une profondeur maximale, chaque cycle pouvant potentiellement générer de nouvelles requêtes de suivi.
Enfin, le étape de génération de réponse synthétise toutes les réponses intermédiaires recueillies tout au long du processus, combinant des informations générales au niveau communautaire avec des conclusions locales détaillées pour produire une réponse globale. Cette approche équilibre l’ampleur et la profondeur, en commençant largement par le contexte communautaire et en approfondissant progressivement les détails.
Il s’agit de mon implémentation de la recherche DRIFT, adaptée aux workflows LlamaIndex et Neo4j. J’ai procédé à une ingénierie inverse de l’approche en examinant le code GraphRAG de Microsoft. Il peut donc y avoir quelques différences par rapport à l’implémentation d’origine.
Le code est disponible sur GitHub.
Ensemble de données
Pour cet article de blog, nous utiliserons Les aventures d’Alice au pays des merveilles de Lewis Carroll, un texte classique disponible gratuitement sur Projet Gutenberg. Cet ensemble de données richement narratif avec ses personnages, lieux et événements interconnectés en fait un excellent choix pour démontrer les capacités de GraphRAG.
Ingestion
Pour le processus d’ingestion, nous réutiliserons le Implémentation de l’indexation Microsoft GraphRAG J’ai développé pour un article de blog précédentadapté dans un workflow LlamaIndex.

Le pipeline d’ingestion suit l’approche standard GraphRAG avec trois étapes principales :
class MSGraphRAGIngestion(Workflow):
@step
async def entity_extraction(self, ev: StartEvent) -> EntitySummarization:
chunks = splitter.split_text(ev.text)
await ms_graph.extract_nodes_and_rels(chunks, ev.allowed_entities)
return EntitySummarization()
@step
async def entity_summarization(
self, ev: EntitySummarization
) -> CommunitySummarization:
await ms_graph.summarize_nodes_and_rels()
return CommunitySummarization()
@step
async def community_summarization(
self, ev: CommunitySummarization
) -> CommunityEmbeddings:
await ms_graph.summarize_communities()
return CommunityEmbeddings()
Le flux de travail extrait les entités et les relations des morceaux de texte, génère des résumés pour les nœuds et les relations, puis crée des résumés de communauté hiérarchique.
Après résumé, nous générons des intégrations vectorielles pour les communautés et les entités afin de permettre la recherche de similarité. Voici l’étape d’intégration de la communauté :
@step
async def community_embeddings(self, ev: CommunityEmbeddings) -> EntityEmbeddings:
# Fetch all communities from the graph database
communities = ms_graph.query(
"""
MATCH (c:__Community__)
WHERE c.summary IS NOT NULL AND c.rating > $min_community_rating
RETURN coalesce(c.title, "") + " " + c.summary AS community_description, c.id AS community_id
""",
params={"min_community_rating": MIN_COMMUNITY_RATING},
)
if communities:
# Generate vector embeddings from community descriptions
response = await client.embeddings.create(
input=[c["community_description"] for c in communities],
model=TEXT_EMBEDDING_MODEL,
)
# Store embeddings in the graph and create vector index
embeds = [
{
"community_id": community["community_id"],
"embedding": embedding.embedding,
}
for community, embedding in zip(communities, response.data)
]
ms_graph.query(
"""UNWIND $data as row
MATCH (c:__Community__ {id: row.community_id})
CALL db.create.setNodeVectorProperty(c, 'embedding', row.embedding)""",
params={"data": embeds},
)
ms_graph.query(
"CREATE VECTOR INDEX community IF NOT EXISTS FOR (c:__Community__) ON c.embedding"
)
return EntityEmbeddings()
Le même processus est appliqué aux intégrations d’entités, créant les indices vectoriels nécessaires à la récupération basée sur la similarité de la recherche DRIFT.
Recherche DÉRIVE
La recherche DRIFT est une approche intuitive de la recherche d’informations : commencez par comprendre la situation dans son ensemble, puis approfondissez les détails si nécessaire. Plutôt que de rechercher immédiatement des correspondances exactes au niveau du document ou de l’entité, DRIFT consulte d’abord les résumés de la communauté, qui sont des aperçus de haut niveau qui capturent les principaux thèmes et sujets du graphe de connaissances.
Une fois que DRIFT identifie les informations pertinentes de niveau supérieur, il génère intelligemment des requêtes de suivi pour récupérer des informations précises sur des entités, des relations et des documents sources spécifiques. Cette approche en deux phases reflète la façon dont les humains recherchent naturellement des informations : nous nous orientons d’abord avec un aperçu général, puis posons des questions ciblées pour compléter les détails. En combinant la couverture complète de la recherche mondiale avec la précision de la recherche locale, DRIFT atteint à la fois l’étendue et la profondeur sans les dépenses informatiques liées au traitement de chaque rapport ou document communautaire.
Passons en revue chaque étape de la mise en œuvre.
Le code est disponible sur GitHub.
Recherche communautaire
DRIFT utilise HyDE (Hypothetical Document Embeddings) pour améliorer la précision de la recherche vectorielle. Au lieu d’intégrer directement la requête de l’utilisateur, HyDE génère d’abord une réponse hypothétique, puis l’utilise pour la recherche de similarité. Cela fonctionne parce que les réponses hypothétiques sont sémantiquement plus proches des résumés réels de la communauté que ne le sont les requêtes brutes.
@step
async def hyde_generation(self, ev: StartEvent) -> CommunitySearch:
# Fetch a random community report to use as a template for HyDE generation
random_community_report = driver.execute_query(
"""
MATCH (c:__Community__)
WHERE c.summary IS NOT NULL
RETURN coalesce(c.title, "") + " " + c.summary AS community_description""",
result_transformer_=lambda r: r.data(),
)
# Generate a hypothetical answer to improve query representation
hyde = HYDE_PROMPT.format(
query=ev.query, template=random_community_report[0]["community_description"]
)
hyde_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": hyde}],
reasoning={"effort": "low"},
)
return CommunitySearch(query=ev.query, hyde_query=hyde_response.output_text)
Ensuite, nous intégrons la requête HyDE et récupérons les 5 rapports communautaires les plus pertinents via la similarité vectorielle. Il invite ensuite le LLM à générer une réponse intermédiaire à partir de ces rapports et à identifier les requêtes de suivi pour une enquête plus approfondie. La réponse intermédiaire est stockée et toutes les requêtes de suivi sont envoyées en parallèle pour la phase de recherche locale.
@step
async def community_search(self, ctx: Context, ev: CommunitySearch) -> LocalSearch:
# Create embedding from the HyDE-enhanced query
embedding_response = await client.embeddings.create(
input=ev.hyde_query, model=TEXT_EMBEDDING_MODEL
)
embedding = embedding_response.data[0].embedding
# Find top 5 most relevant community reports via vector similarity
community_reports = driver.execute_query(
"""
CALL db.index.vector.queryNodes('community', 5, $embedding) YIELD node, score
RETURN 'community-' + node.id AS source_id, node.summary AS community_summary
""",
result_transformer_=lambda r: r.data(),
embedding=embedding,
)
# Generate initial answer and identify what additional info is needed
initial_prompt = DRIFT_PRIMER_PROMPT.format(
query=ev.query, community_reports=community_reports
)
initial_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": initial_prompt}],
reasoning={"effort": "low"},
)
response_json = json_repair.loads(initial_response.output_text)
print(f"Initial intermediate response: {response_json['intermediate_answer']}")
# Store the initial answer and prepare for parallel local searches
async with ctx.store.edit_state() as ctx_state:
ctx_state["intermediate_answers"] = [
{
"intermediate_answer": response_json["intermediate_answer"],
"score": response_json["score"],
}
]
ctx_state["local_search_num"] = len(response_json["follow_up_queries"])
# Dispatch follow-up queries to run in parallel
for local_query in response_json["follow_up_queries"]:
ctx.send_event(LocalSearch(query=ev.query, local_query=local_query))
return None
Cela établit l’approche principale de DRIFT : commencer par une recherche communautaire améliorée par HyDE, puis approfondir avec des requêtes de suivi ciblées.
Recherche locale
La phase de recherche locale exécute des requêtes de suivi en parallèle pour explorer des détails spécifiques. Chaque requête récupère le contexte ciblé via une recherche vectorielle basée sur les entités, puis génère une réponse intermédiaire et potentiellement d’autres requêtes de suivi.
@step(num_workers=5)
async def local_search(self, ev: LocalSearch) -> LocalSearchResults:
print(f"Running local query: {ev.local_query}")
# Create embedding for the local query
response = await client.embeddings.create(
input=ev.local_query, model=TEXT_EMBEDDING_MODEL
)
embedding = response.data[0].embedding
# Retrieve relevant entities and gather their associated context:
# - Text chunks where entities are mentioned
# - Community reports the entities belong to
# - Relationships between the retrieved entities
# - Entity descriptions
local_reports = driver.execute_query(
"""
CALL db.index.vector.queryNodes('entity', 5, $embedding) YIELD node, score
WITH collect(node) AS nodes
WITH
collect {
UNWIND nodes as n
MATCH (n)<-[:MENTIONS]->(c:__Chunk__)
WITH c, count(distinct n) as freq
RETURN {chunkText: c.text, source_id: 'chunk-' + c.id}
ORDER BY freq DESC
LIMIT 3
} AS text_mapping,
collect {
UNWIND nodes as n
MATCH (n)-[:IN_COMMUNITY*]->(c:__Community__)
WHERE c.summary IS NOT NULL
WITH c, c.rating as rank
RETURN {summary: c.summary, source_id: 'community-' + c.id}
ORDER BY rank DESC
LIMIT 3
} AS report_mapping,
collect {
UNWIND nodes as n
MATCH (n)-[r:SUMMARIZED_RELATIONSHIP]-(m)
WHERE m IN nodes
RETURN {descriptionText: r.summary, source_id: 'relationship-' + n.name + '-' + m.name}
LIMIT 3
} as insideRels,
collect {
UNWIND nodes as n
RETURN {descriptionText: n.summary, source_id: 'node-' + n.name}
} as entities
RETURN {Chunks: text_mapping, Reports: report_mapping,
Relationships: insideRels,
Entities: entities} AS output
""",
result_transformer_=lambda r: r.data(),
embedding=embedding,
)
# Generate answer based on the retrieved context
local_prompt = DRIFT_LOCAL_SYSTEM_PROMPT.format(
response_type=DEFAULT_RESPONSE_TYPE,
context_data=local_reports,
global_query=ev.query,
)
local_response = await client.responses.create(
model="gpt-5-mini",
input=[{"role": "user", "content": local_prompt}],
reasoning={"effort": "low"},
)
response_json = json_repair.loads(local_response.output_text)
# Limit follow-up queries to prevent exponential growth
response_json["follow_up_queries"] = response_json["follow_up_queries"][:LOCAL_TOP_K]
return LocalSearchResults(results=response_json, query=ev.query)
L’étape suivante orchestre le processus d’approfondissement itératif. Il attend la fin de toutes les recherches parallèles en utilisant collect_eventsdécide ensuite s’il faut poursuivre l’exploration. Si la profondeur actuelle n’a pas atteint le maximum (nous utilisons une profondeur maximale = 2), il extrait les requêtes de suivi de tous les résultats, stocke les réponses intermédiaires et envoie le prochain cycle de recherches parallèles.
@step
async def local_search_results(
self, ctx: Context, ev: LocalSearchResults
) -> LocalSearch | FinalAnswer:
local_search_num = await ctx.store.get("local_search_num")
# Wait for all parallel searches to complete
results = ctx.collect_events(ev, [LocalSearchResults] * local_search_num)
if results is None:
return None
intermediate_results = [
{
"intermediate_answer": event.results["response"],
"score": event.results["score"],
}
for event in results
]
current_depth = await ctx.store.get("local_search_depth", default=1)
query = [ev.query for ev in results][0]
# Continue drilling down if we haven't reached max depth
if current_depth < MAX_LOCAL_SEARCH_DEPTH:
await ctx.store.set("local_search_depth", current_depth + 1)
follow_up_queries = [
query
for event in results
for query in event.results["follow_up_queries"]
]
# Store intermediate answers and dispatch next round of searches
async with ctx.store.edit_state() as ctx_state:
ctx_state["intermediate_answers"].extend(intermediate_results)
ctx_state["local_search_num"] = len(follow_up_queries)
for local_query in follow_up_queries:
ctx.send_event(LocalSearch(query=query, local_query=local_query))
return None
else:
return FinalAnswer(query=query)
Cela crée une boucle de raffinement itérative dans laquelle chaque niveau de profondeur s’appuie sur les découvertes précédentes. Une fois la profondeur maximale atteinte, la génération de la réponse finale est déclenchée.
Réponse finale
La dernière étape synthétise toutes les réponses intermédiaires collectées tout au long du processus de recherche DRIFT en une réponse complète. Cela inclut la réponse initiale de la recherche communautaire et toutes les réponses générées lors des itérations de recherche locale.
@step
async def final_answer_generation(self, ctx: Context, ev: FinalAnswer) -> StopEvent:
# Retrieve all intermediate answers collected throughout the search process
intermediate_answers = await ctx.store.get("intermediate_answers")
# Synthesize all findings into a comprehensive final response
answer_prompt = DRIFT_REDUCE_PROMPT.format(
response_type=DEFAULT_RESPONSE_TYPE,
context_data=intermediate_answers,
global_query=ev.query,
)
answer_response = await client.responses.create(
model="gpt-5-mini",
input=[
{"role": "developer", "content": answer_prompt},
{"role": "user", "content": ev.query},
],
reasoning={"effort": "low"},
)
return StopEvent(result=answer_response.output_text)
Résumé
La recherche DRIFT présente une stratégie intéressante pour équilibrer l’étendue de la recherche globale avec la précision de la recherche locale. En commençant par le contexte au niveau de la communauté et en approfondissant progressivement les requêtes de suivi itératives, il évite la surcharge de calcul liée au traitement de tous les rapports de la communauté tout en conservant une couverture complète.
Cependant, plusieurs améliorations sont possibles. La mise en œuvre actuelle traite toutes les réponses intermédiaires de la même manière, mais un filtrage basé sur leurs scores de confiance pourrait améliorer la qualité de la réponse finale et réduire le bruit. De même, les requêtes de suivi pourraient être classées par pertinence ou par gain d’informations potentiel avant leur exécution, garantissant ainsi que les pistes les plus prometteuses soient poursuivies en premier.
Une autre amélioration prometteuse consisterait à introduire une étape d’affinement des requêtes qui utilise un LLM pour analyser toutes les requêtes de suivi générées, en regroupant les requêtes similaires pour éviter les recherches redondantes et en filtrant les requêtes peu susceptibles de produire des informations utiles. Cela pourrait réduire considérablement le nombre de recherches locales tout en maintenant la qualité des réponses.
La mise en œuvre complète est disponible sur GitHub pour ceux qui souhaitent expérimenter ces améliorations ou adapter DRIFT à leurs propres cas d’utilisation.



