RAG Pipeline
Learn about the R2R Retrieval-Augmented Generation (RAG) Pipeline for enhanced information retrieval and response generation
Introduction
The RAG (Retrieval-Augmented Generation) Pipeline in R2R combines search results with language generation to produce more informative and contextually relevant outputs. It leverages both vector search and knowledge graph capabilities to enhance the quality of generated responses.
Key Features
- Search and Generate: Integrates search results with language generation for comprehensive responses.
- Multi-Source Context: Combines vector search and knowledge graph results.
- Customizable: Supports custom configurations for search settings and generation prompts.
- Asynchronous Processing: Efficiently handles operations asynchronously.
- Streaming Support: Offers both regular and streaming modes for different use cases.
RAG Pipeline Components
The RAG Pipeline consists of two main components:
- Search Pipeline
- RAG Generation Pipeline
Search Pipeline
The Search Pipeline is responsible for retrieving relevant information from vector databases and knowledge graphs. It’s configured within the RAG Pipeline:
class RAGPipeline(Pipeline):
def set_search_pipeline(
self,
_search_pipeline: Pipeline,
*args,
**kwargs,
) -> None:
self._search_pipeline = _search_pipeline
RAG Generation Pipeline
The RAG Generation Pipeline processes search results and generates responses. It includes two main pipes:
- SearchRAGPipe: For non-streaming RAG operations.
- StreamingSearchRAGPipe: For streaming RAG operations.
SearchRAGPipe
The SearchRAGPipe
collects context from search results and generates responses:
class SearchRAGPipe(GeneratorPipe):
async def _run_logic(
self,
input: Input,
state: AsyncState,
run_id: uuid.UUID,
rag_generation_config: GenerationConfig,
*args: Any,
**kwargs: Any,
) -> AsyncGenerator[LLMChatCompletion, None]:
# Collect context from search results
# Generate response using LLM
# Yield response
StreamingSearchRAGPipe
The StreamingSearchRAGPipe
supports real-time generation of responses:
class StreamingSearchRAGPipe(SearchRAGPipe):
async def _run_logic(
self,
input: SearchRAGPipe.Input,
state: AsyncState,
run_id: uuid.UUID,
rag_generation_config: GenerationConfig,
*args: Any,
**kwargs: Any,
) -> AsyncGenerator[str, None]:
# Stream search results
# Stream generated response
Using the RAG Pipeline
To use the RAG Pipeline, you need to configure it with the necessary components and run it:
rag_pipeline = RAGPipeline()
rag_pipeline.set_search_pipeline(search_pipeline)
rag_pipeline.add_pipe(rag_pipe)
results = await rag_pipeline.run(
input=user_query,
vector_search_settings=VectorSearchSettings(),
kg_search_settings=KGSearchSettings(),
rag_generation_config=GenerationConfig(),
)
Customizing the RAG Pipeline
You can customize the RAG Pipeline by adding different pipes or implementing custom logic. Here’s an example of a custom RAG pipeline with query transformation:
from r2r import RAGPipeline, R2RConfig, R2RProviderFactory, R2RPipeFactory, QueryTransformPipe, GenerationConfig
# Load configuration and create providers and pipes
config = R2RConfig.from_json()
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
# Add a custom query transformation prompt
transform_prompt = {
"name": "custom_rag_transform_prompt",
"template": "Transform the query into multiple sub-queries:\n\nQuery: {message}\n\nSub-queries:\n",
"input_types": {"message": "str"},
}
providers.prompt.add_prompt(**transform_prompt)
# Create a query transform pipe
query_transform_pipe = QueryTransformPipe(
llm_provider=providers.llm,
prompt_provider=providers.prompt,
config=QueryTransformPipe.QueryTransformConfig(
task_prompt=transform_prompt["name"]
),
)
# Define a custom RAG pipeline
class CustomRAGPipeline(RAGPipeline):
def __init__(self, streaming: bool = False):
super().__init__()
self.add_pipe(query_transform_pipe)
rag_pipe = pipes.streaming_rag_pipe if streaming else pipes.rag_pipe
self.add_pipe(rag_pipe)
self.set_search_pipeline(pipes.search_pipeline)
# Use the custom pipeline
custom_pipeline = CustomRAGPipeline(streaming=False)
result = await custom_pipeline.run(
input="Compare apples and oranges",
query_transform_config=GenerationConfig(model="gpt-4"),
rag_generation_config=GenerationConfig(model="gpt-4"),
vector_search_settings=VectorSearchSettings(search_limit=5)
)
Related Quickstarts and Cookbooks
To help you get started with R2R and its RAG capabilities, we recommend exploring the following resources:
- R2R Quickstart: A guide to get you familiarized with R2R.
- Multiple LLMs: Demonstration of how to run R2R with different LLMs.
- Local RAG Cookbook: Demonstration of how to run R2R with local LLMs.
- Multimodal RAG cookbook: Introduction to multimodal support with R2R.
Conclusion
The R2R RAG Pipeline provides a powerful way to combine search capabilities with language generation, resulting in more informative and contextually relevant responses. Its modular design supports customization and both streaming and non-streaming modes, making it suitable for a wide range of applications.
For more information on configuring and customizing the RAG Pipeline, refer to the R2R Configuration and Customizing R2R documentation.
Community Support
If you need help or want to connect with other R2R users, join our Discord server. It’s a great place to get support, discuss best practices, and share your experiences with R2R.
Was this page helpful?