在RAG(检索和生成)这样的框架内管理和处理多个文档有很大的挑战。关键不仅在于提取相关内容,还在于选择包含用户查询所寻求的信息的适当文档。基于用户查询对齐的多粒度特性,需要动态选择文档,本文将介绍结构化层次检索来解决多文档RAG问题。
一、Llamaindex结构化检索介绍
Llamaindex支持多层次信息检索。它不只是筛选文档,而是利用元数据过滤来简化选择过程。通过使用自动检索机制,这些过滤器可以根据用户查询检索出最相关的文档。这个过程包括推断语义查询,在矢量数据库中确定最佳过滤器集,有效地将文本到SQL和语义搜索的能力结合起来。
二、结构化层次检索的优点
下面介绍Llamaindex提供的结构化分层检索的一些好处:
增强相关性:通过利用元数据驱动的过滤器,可以准确地识别和检索符合用户查询细微要求的文档。这确保了内容选择中更高的相关性和准确性;
动态文档选择:与传统的静态文档检索是不同,Llamaindex支持动态文档选择。Llamaindex通过根据相关文档的属性和结构化元数据灵活选择相关文档,智能地适应不同的用户查询;
高效信息检索:结构化层次检索显著提高了信息检索的效率。通过将文档预处理到元数据字典中并将其存储在矢量数据库中,该系统简化了检索过程,最大限度地减少了计算开销并优化了搜索效率;
语义查询优化:文本到SQL和语义搜索的融合使系统能够更好地理解用户意图。Llamaindex的自动检索机制将用户查询细化为语义结构,从而能够从文档存储库中精确而细致地检索信息。
三、结构化层次检索代码实现
下面使用Python代码来展示Llamaindex的基本概念,并实现一个结构化的分层检索系统。使用Llamaindex类初始化来管理矢量数据库中的文档元数据。
- 文档添加:add_document方法通过创建包含摘要和关键字等关键信息的元数据字典,将文档添加到Llamaindex;
- 检索逻辑:retrieve_documents方法通过将用户查询与矢量数据库中的元数据过滤器进行匹配来处理用户查询。为了演示目的,使用了一个基本的模拟匹配逻辑;
- 匹配机制:match_metadata方法模拟用户查询和文档元数据之间的匹配过程。这是一个简化的演示逻辑,通常会使用更高级的NLP或语义分析技术。
本示例旨在说明Llamaindex的核心概念,展示如何通过Python中的简化实现来存储文档元数据并基于用户查询检索相关文档。
步骤1:安装库
!pip install llama-index wandb llama_hub weaviate-client --quiet
步骤2:导入库
import os
import openai
import logging
import sys
from IPython.display import Markdown, display
from llama_index.llms import OpenAI
from llama_index.callbacks import CallbackManager, WandbCallbackHandler
from llama_index import load_index_from_storage
import pandas as pd
from llama_index.query_engine import PandasQueryEngine
from pprint import pprint
from llama_index import (
VectorStoreIndex,
SimpleKeywordTableIndex,
SimpleDirectoryReader,
StorageContext,
ServiceContext,
)
import nest_asyncio
nest_asyncio.apply()
#SetupOPEN API Key
os.environ["OPENAI_API_KEY"] = ""
# openai_key = "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" #<--- Your API KEY
# openai.api_key = openai_key
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
# initialise WandbCallbackHandler and pass any wandb.init args
wandb_args = {"project":"llama-index-report"}
wandb_callback = WandbCallbackHandler(run_args=wandb_args)
# pass wandb_callback to the service context
callback_manager = CallbackManager([wandb_callback])
service_context = ServiceContext.from_defaults(llm=OpenAI(model="gpt-3.5-turbo-0613", temperature=0), chunk_size=1024, callback_manager=callback_manager)
步骤3:下载Githubissues
os.environ["GITHUB_TOKEN"] = ""
from llama_hub.github_repo_issues import (
GitHubRepositoryIssuesReader,
GitHubIssuesClient,
)
github_client = GitHubIssuesClient()
loader = GitHubRepositoryIssuesReader(
github_client,
owner="run-llama",
repo="llama_index",
verbose=True,
)
orig_docs = loader.load_data()
limit = 100
docs = []
for idx, doc in enumerate(orig_docs):
doc.metadata["index_id"] = doc.id_
if idx >= limit:
break
docs.append(doc)
# Output
Found 100 issues in the repo page 1
Resulted in 100 documents
Found 100 issues in the repo page 2
Resulted in 200 documents
Found 100 issues in the repo page 3
Resulted in 300 documents
Found 8 issues in the repo page 4
Resulted in 308 documents
No more issues found, stopping
from copy import deepcopy
import asyncio
from tqdm.asyncio import tqdm_asyncio
from llama_index import SummaryIndex, Document, ServiceContext
from llama_index.llms import OpenAI
from llama_index.async_utils import run_jobs
async def aprocess_doc(doc, include_summary: bool = True):
"""Process doc."""
print(f"Processing {doc.id_}")
metadata = doc.metadata
date_tokens = metadata["created_at"].split("T")[0].split("-")
year = int(date_tokens[0])
month = int(date_tokens[1])
day = int(date_tokens[2])
assignee = (
"" if "assignee" not in doc.metadata else doc.metadata["assignee"]
)
size = ""
if len(doc.metadata["labels"]) > 0:
size_arr = [l for l in doc.metadata["labels"] if "size:" in l]
size = size_arr[0].split(":")[1] if len(size_arr) > 0 else ""
new_metadata = {
"state": metadata["state"],
"year": year,
"month": month,
"day": day,
"assignee": assignee,
"size": size,
"index_id": doc.id_,
}
# now extract out summary
summary_index = SummaryIndex.from_documents([doc])
query_str = "Give a one-sentence concise summary of this issue."
query_engine = summary_index.as_query_engine(
service_context=ServiceContext.from_defaults(
llm=OpenAI(model="gpt-3.5-turbo")
)
)
summary_txt = str(query_engine.query(query_str))
new_doc = Document(text=summary_txt, metadata=new_metadata)
return new_doc
async def aprocess_docs(docs):
"""Process metadata on docs."""
new_docs = []
tasks = []
for doc in docs:
task = aprocess_doc(doc)
tasks.append(task)
new_docs = await run_jobs(tasks, show_progress=True, workers=5)
# new_docs = await tqdm_asyncio.gather(*tasks)
return new_docs
new_docs = await aprocess_docs(docs)
# Output
Processing 9398
Processing 9427
Processing 9613
Processing 9417
Processing 9612
Processing 8832
Processing 9609
Processing 9353
Processing 9431
Processing 9426
Processing 9425
Processing 9435
Processing 9419
Processing 9571
Processing 9373
Processing 9383
Processing 9408
Processing 9405
Processing 9372
Processing 9546
Processing 9565
Processing 9664
Processing 9560
Processing 9470
Processing 9343
Processing 9518
Processing 9358
Processing 8536
Processing 9385
Processing 9380
Processing 9510
Processing 9352
Processing 9368
Processing 7457
Processing 8893
Processing 9583
Processing 9312
Processing 7720
Processing 9219
Processing 9481
Processing 9469
Processing 9655
Processing 9477
Processing 9670
Processing 9475
Processing 9667
Processing 9665
Processing 9348
Processing 9471
Processing 9342
Processing 9488
Processing 9338
Processing 9523
Processing 9416
Processing 7726
Processing 9522
Processing 9652
Processing 9520
Processing 9651
Processing 7244
Processing 9650
Processing 9519
Processing 9649
Processing 9492
Processing 9603
Processing 9509
Processing 9269
Processing 9491
Processing 8802
Processing 9525
Processing 9611
Processing 9543
Processing 8551
Processing 9627
Processing 9450
Processing 9658
Processing 9421
Processing 9394
Processing 9653
Processing 9439
Processing 9604
Processing 9413
Processing 9507
Processing 9625
Processing 9490
Processing 9626
Processing 9483
Processing 9638
Processing 7744
Processing 9472
Processing 8475
Processing 9244
Processing 9618
100%|██████████| 100/100 [02:07<00:00,1.27s/it]
步骤4:将数据加载到Weaviate Vector Store
from llama_index.vector_stores import WeaviateVectorStore
from llama_index.storage import StorageContext
from llama_index import VectorStoreIndex
import weaviate
# cloud
auth_config = weaviate.AuthApiKey(api_key="")
client = weaviate.Client(
"https://.weaviate.network",
auth_client_secret=auth_config,
)
class_name = "LlamaIndex_auto"
vector_store = WeaviateVectorStore(
weaviate_client=client, index_name=class_name
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
# Since "new_docs" are concise summaries, we can directly feed them as nodes into VectorStoreIndex
index = VectorStoreIndex(new_docs, storage_context=storage_context)
docs[0].metadata
# Output
{'state': 'open',
'created_at': '2023-12-21T20:18:03Z',
'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9655',
'source': 'https://github.com/run-llama/llama_index/pull/9655',
'labels': ['size:L'],
'index_id': '9655'}
步骤5:对原始文档建立WeaviateIndex
vector_store = WeaviateVectorStore(
weaviate_client=client, index_name=doc_class_name
)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
doc_index = VectorStoreIndex.from_documents(
docs, storage_context=storage_context
)
步骤6:建立自动检索机制
自动检索器的设置过程通过分为以下几个关键步骤:
定义Schema:定义向量数据库模式,包括元数据字段;
VectorIndexAutoRetriever初始化:实例化此类将创建一个利用压缩元数据索引的检索器。需要定义的Schema作为其输入;
创建Wrapper Retriever:该步骤主要将每个节点后处理为IndexNode。此转换包含一个链接回源文档的索引ID,此链接支持在后面的部分中进行递归检索,依靠IndexNode对象与下游检索器、查询引擎或其他节点连接。
6(a)定义Schema
from llama_index.vector_stores.types import MetadataInfo, VectorStoreInfo
vector_store_info = VectorStoreInfo(
content_info="Github Issues",
metadata_info=[
MetadataInfo(
name="state",
description="Whether the issue is `open` or `closed`",
type="string",
),
MetadataInfo(
name="year",
description="The year issue was created",
type="integer",
),
MetadataInfo(
name="month",
description="The month issue was created",
type="integer",
),
MetadataInfo(
name="day",
description="The day issue was created",
type="integer",
),
MetadataInfo(
name="assignee",
description="The assignee of the ticket",
type="string",
),
MetadataInfo(
name="size",
description="How big the issue is (XS, S, M, L, XL, XXL)",
type="string",
),
],
)
6(b)实例化VectorIndexAutoRetriever
from llama_index.retrievers import VectorIndexAutoRetriever
retriever = VectorIndexAutoRetriever(
index,
vector_store_info=vector_store_info,
similarity_top_k=2,
empty_query_top_k=10,# if only metadata filters are specified, this is the limit
verbose=True,
)
nodes = retriever.retrieve("Tell me about some issues on 12/11")
print(f"Number retrieved: {len(nodes)}")
print(nodes[0].metadata)
# Output
Using query str:
Using filters: [('month', '==', 12), ('day', '==', 11)]
Number retrieved: 6
{'state': 'open', 'year': 2023, 'month': 12, 'day': 11, 'assignee': '', 'size': 'XL', 'index_id': '9431'}
6(c)定义Wrapper Retriever
from llama_index.retrievers import BaseRetriever
from llama_index.indices.query.schema import QueryBundle
from llama_index.schema import IndexNode, NodeWithScore
class IndexAutoRetriever(BaseRetriever):
"""Index auto-retriever."""
def __init__(self, retriever: VectorIndexAutoRetriever):
"""Init params."""
self.retriever = retriever
def _retrieve(self, query_bundle: QueryBundle):
"""Convert nodes to index node."""
retrieved_nodes = self.retriever.retrieve(query_bundle)
new_retrieved_nodes = []
for retrieved_node in retrieved_nodes:
index_id = retrieved_node.metadata["index_id"]
index_node = IndexNode.from_text_node(
retrieved_node.node, index_id=index_id
)
new_retrieved_nodes.append(
NodeWithScore(node=index_node, score=retrieved_node.score)
)
return new_retrieved_nodes
index_retriever = IndexAutoRetriever(retriever=retriever)
步骤7:建立递归检索机制
这种类型的检索器将检索器的每个节点连接到另一个检索器、查询引擎或节点。该设置包括将每个汇总的元数据节点链接到与相应文档对应的RAG管道对齐的检索器。
配置过程如下:
为每个文档定义一个检索器,并把他们添加到字典中;
定义递归检索器:在参数中定义包括root检索器(汇总元数据检索器)和其他文档检索器。
from llama_index.vector_stores.types import (
MetadataFilter,
MetadataFilters,
FilterOperator,
)
retriever_dict = {}
query_engine_dict = {}
for doc in docs:
index_id = doc.metadata["index_id"]
# filter for the specific doc id
filters = MetadataFilters(
filters=[
MetadataFilter(
key="index_id", operator=FilterOperator.EQ, value=index_id
),
]
)
retriever = doc_index.as_retriever(filters=filters)
query_engine = doc_index.as_query_engine(filters=filters)
retriever_dict[index_id] = retriever
query_engine_dict[index_id] = query_engine
from llama_index.retrievers import RecursiveRetriever
# note: can pass `agents` dict as `query_engine_dict` since every agent can be used as a query engine
recursive_retriever = RecursiveRetriever(
"vector",
retriever_dict={"vector": index_retriever, **retriever_dict},
# query_engine_dict=query_engine_dict,
verbose=True,
)
nodes = recursive_retriever.retrieve("Tell me about some issues on 12/11")
print(f"Number of source nodes: {len(nodes)}")
nodes[0].node.metadata
# Output
Retrieving with query id None: Tell me about some issues on 12/11
Using query str:
Using filters: [('month', '==', 12), ('day', '==', 11)]
Retrieved node with id, entering: 9431
Retrieving with query id 9431: Tell me about some issues on 12/11
Retrieving text node: Dev awiss
# Description
Try to use clickhouse as vectorDB.
Try to chunk docs with independent parser service.
Special designed schema and tricks for better query and retriever.
Fixes # (issue)
## Type of Change
Please delete options that are not relevant.
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] This change requires a documentation update
# How Has This Been Tested" />
filters = MetadataFilters(
filters=[
MetadataFilter(key="theme", value="Mafia"),
MetadataFilter(key="author", value="Stephen King"),
],
condition=FilterCondition.OR,
)
retriever = index.as_retriever(filters=filters)
### Version
0.9.13
### Steps to Reproduce
nodes = [
TextNode(
text="The Shawshank Redemption",
metadata={
"author": "Stephen King",
"theme": "Friendship",
},
),
TextNode(
text="The Godfather",
metadata={
"director": "Francis Ford Coppola",
"theme": "Mafia",
},
),
TextNode(
text="Inception",
metadata={
"director": "Christopher Nolan",
},
),
]
filters = MetadataFilters(
filters=[
MetadataFilter(key="theme", value="Mafia"),
MetadataFilter(key="author", value="Stephen King"),
],
condition=FilterCondition.OR,
)
retriever = index.as_retriever(filters=filters)
### Relevant Logs/Tracbacks
_No response_
Retrieved node with id, entering: 9427
Retrieving with query id 9427: Tell me about some issues on 12/11
Retrieving text node: [Feature Request]: Postgres BM25 support
### Feature Description
Feature: add a variation of PGVectorStore which uses ParadeDB's BM25 extension.
BM25 is now possible in Postgres with a Rust extension [pg_bm25): https://github.com/paradedb/paradedb/tree/dev/pg_bm25
Unsure if it might be better to use [pg_search](https://github.com/paradedb/paradedb/tree/dev/pg_search) and get HNSW at the same time..
I'm interested in contributing on this myself, but am just starting to look into it. Interested to hear others' thoughts.
### Reason
Although the code comments for the PGVectorStore class currently suggest BM25 search is present in Postgres - it is not.
### Value of Feature
BM25 retrieval hit rate and MRR is measurable better than Postgres full text search with tsvector and tsquery. Indexing is also supposed to be faster with pg_bm25.
Number of source nodes: 6
{'state': 'open',
'created_at': '2023-12-11T10:17:52Z',
'url': 'https://api.github.com/repos/run-llama/llama_index/issues/9431',
'source': 'https://github.com/run-llama/llama_index/pull/9431',
'labels': ['size:XL'],
'index_id': '9431'}
步骤8:插入RetrieverQueryEngine
from llama_index.query_engine import RetrieverQueryEngine
from llama_index import ServiceContext
llm = OpenAI(model="gpt-3.5-turbo")
service_context = ServiceContext.from_defaults(llm=llm)
query_engine = RetrieverQueryEngine.from_args(recursive_retriever, llm=llm)
response = query_engine.query(
"Tell me about some open issues related to agents"
)
print(str(response))
# Output
There were several issues created on 12/11. One of them is a bug where the metadata filter is not working correctly with Elastic search indexing. Another bug involves an error loading the 'punkt' module in the NLTK library. There are also a couple of feature requests, one for adding Postgres BM25 support and another for making llama-index compatible with models finetuned and hosted on modal.com. Additionally, there is a question about using the Slack Loader with large Slack channels.
四、结论
总之,将Llamaindex集成到多文档RAG架构的结构中预示着信息检索的新时代。它能够基于结构化元数据动态选择文档,再加上语义查询优化的技巧,重塑了我们如何利用庞大文档存储库中的知识,提高了检索过程的效率、相关性和准确性。
参考文献:
[1]https://ai.gopubby.com/structured-hierarchical-retrieval-revolutionizing-multi-document-rag-architectures-f101463db689
[2]https://weaviate.io/developers/wcs/quickstart
[3]https://docs.llamaindex.ai/en/stable/examples/query_engine/multi_doc_auto_retrieval/multi_doc_auto_retrieval.html