Introduction

The RAG (Retrieval-Augmented Generation) Pipeline in R2R combines search results with language generation to produce more informative and contextually relevant outputs. It leverages both vector search and knowledge graph capabilities to enhance the quality of generated responses.

Key Features

  • Search and Generate: Integrates search results with language generation for comprehensive responses.
  • Multi-Source Context: Combines vector search and knowledge graph results.
  • Customizable: Supports custom configurations for search settings and generation prompts.
  • Asynchronous Processing: Efficiently handles operations asynchronously.
  • Streaming Support: Offers both regular and streaming modes for different use cases.

RAG Pipeline Components

The RAG Pipeline consists of two main components:

  1. Search Pipeline
  2. RAG Generation Pipeline

Search Pipeline

The Search Pipeline is responsible for retrieving relevant information from vector databases and knowledge graphs. It’s configured within the RAG Pipeline:

class RAGPipeline(Pipeline):
    def set_search_pipeline(
        self,
        _search_pipeline: Pipeline,
        *args,
        **kwargs,
    ) -> None:
        self._search_pipeline = _search_pipeline

RAG Generation Pipeline

The RAG Generation Pipeline processes search results and generates responses. It includes two main pipes:

  1. SearchRAGPipe: For non-streaming RAG operations.
  2. StreamingSearchRAGPipe: For streaming RAG operations.

SearchRAGPipe

The SearchRAGPipe collects context from search results and generates responses:

class SearchRAGPipe(GeneratorPipe):
    async def _run_logic(
        self,
        input: Input,
        state: AsyncState,
        run_id: uuid.UUID,
        rag_generation_config: GenerationConfig,
        *args: Any,
        **kwargs: Any,
    ) -> AsyncGenerator[LLMChatCompletion, None]:
        # Collect context from search results
        # Generate response using LLM
        # Yield response

StreamingSearchRAGPipe

The StreamingSearchRAGPipe supports real-time generation of responses:

class StreamingSearchRAGPipe(SearchRAGPipe):
    async def _run_logic(
        self,
        input: SearchRAGPipe.Input,
        state: AsyncState,
        run_id: uuid.UUID,
        rag_generation_config: GenerationConfig,
        *args: Any,
        **kwargs: Any,
    ) -> AsyncGenerator[str, None]:
        # Stream search results
        # Stream generated response

Using the RAG Pipeline

To use the RAG Pipeline, you need to configure it with the necessary components and run it:

rag_pipeline = RAGPipeline()
rag_pipeline.set_search_pipeline(search_pipeline)
rag_pipeline.add_pipe(rag_pipe)

results = await rag_pipeline.run(
    input=user_query,
    vector_search_settings=VectorSearchSettings(),
    kg_search_settings=KGSearchSettings(),
    rag_generation_config=GenerationConfig(),
)

Customizing the RAG Pipeline

You can customize the RAG Pipeline by adding different pipes or implementing custom logic. Here’s an example of a custom RAG pipeline with query transformation:

from r2r import RAGPipeline, R2RConfig, R2RProviderFactory, R2RPipeFactory, QueryTransformPipe, GenerationConfig

# Load configuration and create providers and pipes
config = R2RConfig.from_json()
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()

# Add a custom query transformation prompt
transform_prompt = {
    "name": "custom_rag_transform_prompt",
    "template": "Transform the query into multiple sub-queries:\n\nQuery: {message}\n\nSub-queries:\n",
    "input_types": {"message": "str"},
}
providers.prompt.add_prompt(**transform_prompt)

# Create a query transform pipe
query_transform_pipe = QueryTransformPipe(
    llm_provider=providers.llm,
    prompt_provider=providers.prompt,
    config=QueryTransformPipe.QueryTransformConfig(
        task_prompt=transform_prompt["name"]
    ),
)

# Define a custom RAG pipeline
class CustomRAGPipeline(RAGPipeline):
    def __init__(self, streaming: bool = False):
        super().__init__()
        self.add_pipe(query_transform_pipe)
        rag_pipe = pipes.streaming_rag_pipe if streaming else pipes.rag_pipe
        self.add_pipe(rag_pipe)
        self.set_search_pipeline(pipes.search_pipeline)

# Use the custom pipeline
custom_pipeline = CustomRAGPipeline(streaming=False)
result = await custom_pipeline.run(
    input="Compare apples and oranges",
    query_transform_config=GenerationConfig(model="gpt-4"),
    rag_generation_config=GenerationConfig(model="gpt-4"),
    vector_search_settings=VectorSearchSettings(search_limit=5)
)

To help you get started with R2R and its RAG capabilities, we recommend exploring the following resources:

Conclusion

The R2R RAG Pipeline provides a powerful way to combine search capabilities with language generation, resulting in more informative and contextually relevant responses. Its modular design supports customization and both streaming and non-streaming modes, making it suitable for a wide range of applications.

For more information on configuring and customizing the RAG Pipeline, refer to the R2R Configuration and Customizing R2R documentation.

Community Support

If you need help or want to connect with other R2R users, join our Discord server. It’s a great place to get support, discuss best practices, and share your experiences with R2R.