Deep Dive
RAG Pipeline

RAG Pipeline

The RAG (Retrieve-Augmented-Generation) Pipeline is responsible for retrieving relevant documents based on a query and generating a response using the retrieved documents as context. It transforms the query, searches for relevant documents, reranks the results, constructs a context, and generates a completion.

Question and Answer RAG Pipeline

The QnARAGPipeline is a simple implementation of the RAGPipeline abstract base class. It provides a straightforward way to perform document retrieval and generation using a specified language model, vector database, and embedding model. It is designed to support single-turn question and answer style RAG.

Initialization

The QnARAGPipeline is initialized with the following parameters:

  • llm_provider: An instance of LLMProvider for generating completions.
  • vector_db_provider: An instance of VectorDBProvider for searching and retrieving documents.
  • embedding_provider: An instance of OpenAIEmbeddingProvider for generating embeddings.
  • prompt_provider (optional): An instance of PromptProvider for providing prompts (default is BasicPromptProvider).
  • logging_connection (optional): An instance of LoggingDatabaseConnection for logging.

Query Transformation

The transform_query method transforms the input query before retrieval, if necessary. In the basic implementation, it returns the query as is.

Document Retrieval

The search method searches the vector database with the transformed query to retrieve relevant documents. It uses the specified embedding model and embeddings provider to generate embeddings for the query and performs a similarity search in the database.

Result Reranking

The rerank_results method reranks the retrieved documents based on relevance, if necessary. In the basic implementation, it returns the results as is (a unit transformation).

Result Formatting

The _format_results method formats the reranked results into a human-readable string. In the basic implementation, it joins the text metadata of each result with newline separators.

Using a Custom RAG Pipeline

To create a custom RAG pipeline, you can subclass the RAGPipeline abstract base class and override the necessary methods. Here's an example:

class CustomRAGPipeline(RAGPipeline):
    def __init__(
        self,
        llm_provider: LLMProvider,
        vector_db_provider: VectorDBProvider,
        embedding_provider: OpenAIEmbeddingProvider,
        prompt_provider: Optional[PromptProvider] = None,
        logging_connection: Optional[LoggingDatabaseConnection] = None,
    ) -> None:
        if not prompt_provider:
            prompt_provider = BasicPromptProvider()
        self.prompt_provider = prompt_provider
 
        super().__init__(
            llm_provider=llm_provider,
            prompt_provider=prompt_provider,
            embedding_provider=embedding_provider,
            vector_db_provider=vector_db_provider,
            logging_connection=logging_connection,
        )
        self.pipeline_run_info = None
 
    def transform_query(self, query: str) -> str:
        # Custom query transformation logic
        return f"Custom query: {query}"
 
    @log_execution_to_db
    def search(
        self,
        transformed_query: str,
        filters: dict,
        limit: int,
        *args,
        **kwargs,
    ) -> list[VectorSearchResult]:
        # Custom document retrieval logic
        results = self.vector_db_provider.search(
            query_vector=self.embedding_provider.get_embedding(
                transformed_query,
            ),
            filters=filters,
            limit=limit,
        )
        return results
 
    def rerank_results(
        self, transformed_query: str, results: list[VectorSearchResult], limit
    ) -> list[VectorSearchResult]:
        # Custom result reranking logic
        return list(reversed(results))[0:limit]
 
    def _format_results(self, results: list[VectorSearchResult]) -> str:
        # Custom result formatting logic
        formatted_results = [
            f"{i+1}. {result.metadata['text']}"
            for i, result in enumerate(results)
        ]
        return "\n".join(formatted_results)

In this example, the CustomRAGPipeline overrides the following methods:

  • transform_query: Applies custom query transformation logic.
  • search: Implements custom document retrieval logic.
  • rerank_results: Implements custom result reranking logic.
  • _format_results: Formats the results in a custom way.

Passing the Custom RAG Pipeline to the Factory

To use the custom RAG pipeline with the E2EPipelineFactory, you can pass it when creating the pipeline:

from r2r.main import E2EPipelineFactory
from my_custom_pipeline import CustomRAGPipeline
 
app = E2EPipelineFactory.create_pipeline(
    config=config,
    rag_pipeline_impl=CustomRAGPipeline,
)

By passing the CustomRAGPipeline to the create_pipeline method using the rag_pipeline_impl parameter, the factory will use the custom pipeline instead of the default QnARAGPipeline.

That's it! You now have a custom RAG pipeline that demonstrates how to modify various aspects of the pipeline and can be passed to the E2EPipelineFactory for use in the end-to-end pipeline.