RAG Pipeline
The RAG (Retrieve-Augmented-Generation) Pipeline is responsible for retrieving relevant documents based on a query and generating a response using the retrieved documents as context. It transforms the query, searches for relevant documents, reranks the results, constructs a context, and generates a completion.
Question and Answer RAG Pipeline
The QnARAGPipeline
is a simple implementation of the RAGPipeline
abstract base class. It provides a straightforward way to perform document retrieval and generation using a specified language model, vector database, and embedding model. It is designed to support single-turn question and answer style RAG.
Initialization
The QnARAGPipeline
is initialized with the following parameters:
llm_provider
: An instance ofLLMProvider
for generating completions.vector_db_provider
: An instance ofVectorDBProvider
for searching and retrieving documents.embedding_provider
: An instance ofOpenAIEmbeddingProvider
for generating embeddings.prompt_provider
(optional): An instance ofPromptProvider
for providing prompts (default isBasicPromptProvider
).logging_connection
(optional): An instance ofLoggingDatabaseConnection
for logging.
Query Transformation
The transform_query
method transforms the input query before retrieval, if necessary. In the basic implementation, it returns the query as is.
Document Retrieval
The search
method searches the vector database with the transformed query to retrieve relevant documents. It uses the specified embedding model and embeddings provider to generate embeddings for the query and performs a similarity search in the database.
Result Reranking
The rerank_results
method reranks the retrieved documents based on relevance, if necessary. In the basic implementation, it returns the results as is (a unit transformation).
Result Formatting
The _format_results
method formats the reranked results into a human-readable string. In the basic implementation, it joins the text metadata of each result with newline separators.
Using a Custom RAG Pipeline
To create a custom RAG pipeline, you can subclass the RAGPipeline
abstract base class and override the necessary methods. Here's an example:
class CustomRAGPipeline(RAGPipeline):
def __init__(
self,
llm_provider: LLMProvider,
vector_db_provider: VectorDBProvider,
embedding_provider: OpenAIEmbeddingProvider,
prompt_provider: Optional[PromptProvider] = None,
logging_connection: Optional[LoggingDatabaseConnection] = None,
) -> None:
if not prompt_provider:
prompt_provider = BasicPromptProvider()
self.prompt_provider = prompt_provider
super().__init__(
llm_provider=llm_provider,
prompt_provider=prompt_provider,
embedding_provider=embedding_provider,
vector_db_provider=vector_db_provider,
logging_connection=logging_connection,
)
self.pipeline_run_info = None
def transform_query(self, query: str) -> str:
# Custom query transformation logic
return f"Custom query: {query}"
@log_execution_to_db
def search(
self,
transformed_query: str,
filters: dict,
limit: int,
*args,
**kwargs,
) -> list[VectorSearchResult]:
# Custom document retrieval logic
results = self.vector_db_provider.search(
query_vector=self.embedding_provider.get_embedding(
transformed_query,
),
filters=filters,
limit=limit,
)
return results
def rerank_results(
self, transformed_query: str, results: list[VectorSearchResult], limit
) -> list[VectorSearchResult]:
# Custom result reranking logic
return list(reversed(results))[0:limit]
def _format_results(self, results: list[VectorSearchResult]) -> str:
# Custom result formatting logic
formatted_results = [
f"{i+1}. {result.metadata['text']}"
for i, result in enumerate(results)
]
return "\n".join(formatted_results)
In this example, the CustomRAGPipeline
overrides the following methods:
transform_query
: Applies custom query transformation logic.search
: Implements custom document retrieval logic.rerank_results
: Implements custom result reranking logic._format_results
: Formats the results in a custom way.
Passing the Custom RAG Pipeline to the Factory
To use the custom RAG pipeline with the E2EPipelineFactory
, you can pass it when creating the pipeline:
from r2r.main import E2EPipelineFactory
from my_custom_pipeline import CustomRAGPipeline
app = E2EPipelineFactory.create_pipeline(
config=config,
rag_pipeline_impl=CustomRAGPipeline,
)
By passing the CustomRAGPipeline
to the create_pipeline
method using the rag_pipeline_impl
parameter, the factory will use the custom pipeline instead of the default QnARAGPipeline
.
That's it! You now have a custom RAG pipeline that demonstrates how to modify various aspects of the pipeline and can be passed to the E2EPipelineFactory
for use in the end-to-end pipeline.