Deep Dive
RAG Pipeline

RAG Pipeline

The RAG (Retrieval-Augmented Generation) Pipeline combines search results with language generation to produce more informative and contextually relevant outputs. Key features include:

  • Search and Generate: Combines search results with language generation for enhanced responses.
  • Customizable: Supports custom configurations for search and generation prompts.
  • Asynchronous Processing: Efficiently handles data and operations asynchronously.
  • Streaming and Non-Streaming Modes: Supports both regular and streaming RAG pipelines for different use cases.

RAG Pipe

The R2RRAGPipe integrates search results and generates responses using a language model provider.

Logic

  • Collect Context: Aggregates search results to form the context.
  • Generate Responses: Uses the aggregated context to generate responses via the language model.
  • Logging: Logs the generated responses and context for auditing and debugging.

Streaming RAG Pipe

The R2RStreamingRAGPipe extends the RAG functionality to support streaming, allowing for real-time generation of responses.

Logic

  • Stream Search Results: Streams search results as they are being processed.
  • Stream Generation: Streams the generated responses chunk by chunk.
  • Markers: Uses markers to delineate search results and generation phases in the stream.

Example Custom RAG Pipeline

To create a custom RAG pipeline, follow these steps:

  1. Define Custom Prompts and Configurations (if needed): Implement any custom prompts required for specific needs.
  2. Create and Configure the Pipeline: Add the necessary pipes to the pipeline.

Example Custom RAG Pipeline with Query Transformation

from r2r import RAGPipeline, R2RConfig, R2RProviderFactory, R2RPipeFactory, R2RPipelineFactory, R2RQueryTransformPipe, GenerationConfig
 
# Load configuration
config = R2RConfig.from_json()
 
# Create providers and pipes
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
 
# Add a custom prompt for transforming the user query
transform_prompt = {
    "name": "custom_rag_transform_prompt",
    "template": "Transform the following query into multiple sub-queries to improve search results:\n\nQuery: {message}\n\nSub-queries:\n",
    "input_types": {"message": "str"},
}
providers.prompt.add_prompt(**transform_prompt)
 
# Initialize the new query transform pipe
query_transform_pipe = R2RQueryTransformPipe(
    llm_provider=providers.llm,
    prompt_provider=providers.prompt,
    config=R2RQueryTransformPipe.QueryTransformConfig(
        task_prompt=transform_prompt["name"]
    ),
)
 
# Define the custom RAG pipeline
class CustomRAGPipeline(RAGPipeline):
    def __init__(self, streaming: bool = False):
        super().__init__()
        self.add_pipe(query_transform_pipe)
        self.add_pipe(pipes.search_pipe)
        rag_pipe = pipes.streaming_rag_pipe if streaming else pipes.rag_pipe
        self.add_pipe(
            rag_pipe,
            add_upstream_outputs=[
                {
                    "prev_pipe_name": pipes.search_pipe.config.name,
                    "prev_output_field": "search_results",
                    "input_field": "raw_search_results",
                },
                {
                    "prev_pipe_name": pipes.search_pipe.config.name,
                    "prev_output_field": "search_queries",
                    "input_field": "query",
                },
            ],
        )
 
# Instantiate and run the pipeline
custom_pipeline = CustomRAGPipeline(streaming=False)
result = await custom_pipeline.run(
    input="What are the key differences between apples and oranges?",
    query_transform_config=GenerationConfig(model="gpt-4"),
    rag_generation_config=GenerationConfig(model="gpt-4"),
    search_limit=5  # Custom search limit
)
 
print(f"Final Result:\n\n{result}")

Conclusion

The RAG Pipeline is a powerful tool for combining search and language generation to produce detailed and contextually accurate responses. Its modular design supports customization and asynchronous processing, and the option for streaming enhances its applicability for real-time use cases.