Deep Dive
Application

R2R Application Documentation

Introduction

R2R comes complete with a FastAPI-based web service that provides endpoints for various operations related to the Retrieval-Augmented Generation (RAG) pipeline. It allows users to upload and process files, add entries, perform searches, generate RAG completions, evaluate completions, delete entries based on filters, retrieve user IDs and documents, and access logs.

The application API specifications are included here.

Application Creation and Launching

The R2R application is created using the R2RPipelineFactory.create_pipelines() method, which assembles various components based on the provided configuration. Here's an example of how the application is typically created in the R2R workflow:

from r2r import R2RConfig, R2RProviderFactory, R2RPipeFactory, R2RPipelineFactory, R2RApp
 
config = R2RConfig.from_json("path/to/config.json")
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
pipelines = R2RPipelineFactory(config, pipes).create_pipelines()
app = R2RApp(config, providers, pipelines).app

In this example, the configuration is loaded from a config.json file. The R2RProviderFactory creates providers based on the configuration, and the R2RPipeFactory creates the necessary pipes. These are then assembled into pipelines by the R2RPipelineFactory, and the FastAPI application is set up with the necessary endpoints and middleware.

To launch the application, you can use a web server such as Uvicorn:

uvicorn path.to.your.module:app --host 0.0.0.0 --port 8000

In this example, Uvicorn runs the FastAPI application specified by the app variable. The application will be accessible at http://localhost:8000.

Adding Custom Endpoints

You can add custom endpoints to the R2R application by modifying the R2RApp class or by creating a custom FastAPI app instance.

Here's an example of how to add a custom endpoint:

from fastapi import FastAPI
from r2r import R2RConfig, R2RProviderFactory, R2RPipeFactory, R2RPipelineFactory, R2RApp
 
config = R2RConfig.from_json("path/to/config.json")
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
pipelines = R2RPipelineFactory(config, pipes).create_pipelines()
r2r_app = R2RApp(config, providers, pipelines)
 
app = r2r_app.app
 
@app.get("/custom_endpoint")
async def custom_endpoint():
    return {"message": "This is a custom endpoint"}

In this example, after creating the R2R application, a custom endpoint is added using the @app.get("/custom_endpoint") decorator. The custom_endpoint function defines the logic for handling requests to the /custom_endpoint route.

API Endpoints

Refer to the documentation here.

Configuration

The application uses a configuration file (config.json) to set various settings for the RAG pipeline, including the vector database provider, LLM settings, embedding settings, parsing logic, evaluation provider, and more. The default values for the configuration are shown below:

{
  "app": {
    "max_logs": 100,
    "max_file_size_in_mb": 32
  },
  "completions": {
    "provider": "litellm"
  },
  "embedding": {
    "provider": "openai",
    "search_model": "text-embedding-3-small",
    "search_dimension": 512,
    "batch_size": 128,
    "text_splitter": {
      "type": "recursive_character",
      "chunk_size": 512,
      "chunk_overlap": 20
    },
    "rerank_model": "None"
  },
  "eval": {
    "provider": "local",
    "llm": {
      "model": "gpt-4o",
      "provider": "openai"
    },
    "sampling_fraction": 1.0
  },
  "ingestion": {
    "selected_parsers": {
      "csv": "default",
      "docx": "default",
      "html": "default",
      "json": "default",
      "md": "default",
      "pdf": "default",
      "pptx": "default",
      "txt": "default",
      "xlsx": "default",
      "gif": "default",
      "png": "default",
      "jpg": "default",
      "jpeg": "default",
      "svg": "default"
    }
  },
  "logging": {
    "provider": "local",
    "log_table": "logs",
    "log_info_table": "log_info"
  },
  "prompt": {
    "provider": "local"
  },
  "vector_database": {
    "provider": "local",
    "collection_name": "demo_vecs"
  }
}

The available options for each section are:

  • app:

    • max_logs: Maximum number of logs to retrieve.
    • max_file_size_in_mb: Maximum file size allowed for ingestion (in MB).
  • completions:

    • provider: "litellm", "openai", "llama-cpp" (see LLM Providers for more info).
  • embedding:

    • provider: "openai", "sentence-transformers" (see Embedding Providers for more info).
    • search_model: Embedding model to use (e.g., "text-embedding-3-small" for OpenAI).
    • search_dimension: Embedding dimension.
    • batch_size: Number of texts to embed in each batch.
    • text_splitter: Configuration for text splitting.
      • type: "recursive_character".
      • chunk_size: Target chunk size in characters.
      • chunk_overlap: Number of overlapping characters between chunks.
    • rerank_model: Model used for re-ranking (optional).
  • eval:

    • provider: "local", "deepeval", "parea" (see Evaluation Providers for more info).
    • llm: LLM configuration for evaluation.
      • model: Model to use (e.g., "gpt-4o").
      • provider: Provider to use (e.g., "openai").
    • sampling_fraction: Fraction of data to sample for evaluation (e.g., 1.0 for 100% of requests).
  • ingestion:

    • selected_parsers: Specifies the parsers for different file types.
      • csv: "default".
      • docx: "default".
      • html: "default".
      • json: "default".
      • md: "default".
      • pdf: "default".
      • pptx: "default".
      • txt: "default".
      • xlsx: "default".
      • gif: "default".
      • png: "default".
      • jpg: "default".
      • jpeg: "default".
      • svg: "default".
  • logging:

    • provider: "local", "postgres", "redis".
    • log_table: Name of the table to store logs.
    • log_info_table: Name of the table to store log info.
  • prompt:

    • provider: "local".
  • vector_database:

    • provider: "local", "pgvector", "qdrant" (see Vector Database Providers for more info).
    • collection_name: Name of the collection to store vector embeddings.

For more detailed information on configuring specific providers, please refer to the relevant documentation:

These documentation files provide in-depth guides on setting up and using each provider, including any required environment variables and additional configuration options.

To launch the application with your own configuration, you can create a config.json file with your desired settings and pass it to the R2RPipelineFactory.create_pipelines() method:

config = R2RConfig.from_json("your_config_path.json")
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
pipelines = R2RPipelineFactory(config, pipes).create_pipelines()
app = R2RApp(config, providers, pipelines).app

This will create the application with your custom configuration.

Custom Pipelines

Ingestion Pipeline

The ingestion pipeline is responsible for parsing, embedding, and storing documents in a vector database. Here’s a concise overview of the key features:

  • Parsing: Supports multiple data formats like CSV, DOCX, HTML, JSON, Markdown, PDF, PPTX, TXT, XLSX, and various image formats.
  • **Embedding

**: Converts parsed documents into embeddings using specified models.

  • Vector Storage: Stores embeddings in a vector database.

You can create a custom ingestion pipeline by implementing your own logic and adding it to the pipeline. Here’s an example of a custom ingestion pipeline:

class CustomIngestionPipeline(IngestionPipeline):
    def __init__(self, pipes):
        super().__init__()
        self.add_pipe(pipes.custom_parsing_pipe)
        self.add_pipe(pipes.custom_embedding_pipe)
        self.add_pipe(pipes.custom_vector_storage_pipe)

RAG Pipeline

The RAG pipeline handles search and retrieval of documents, followed by generating answers using an LLM. There are two types of RAG pipelines: regular and streaming.

  • Regular RAG Pipeline: Performs the search and retrieval in a single step and generates a complete response.
  • Streaming RAG Pipeline: Streams the search results and LLM response in chunks.

Here’s an example of creating a custom RAG pipeline:

class CustomRAGPipeline(RAGPipeline):
    def __init__(self, pipes, streaming=False):
        super().__init__()
        search_pipe = pipes.custom_search_pipe
        rag_pipe = pipes.custom_streaming_rag_pipe if streaming else pipes.custom_rag_pipe
 
        self.add_pipe(search_pipe)
        self.add_pipe(
            rag_pipe,
            add_upstream_outputs=[
                {
                    "prev_pipe_name": search_pipe.config.name,
                    "prev_output_field": "search_results",
                    "input_field": "raw_search_results",
                },
                {
                    "prev_pipe_name": search_pipe.config.name,
                    "prev_output_field": "search_queries",
                    "input_field": "query",
                },
            ],
        )

Evaluation Pipeline

The evaluation pipeline assesses the quality of generated completions. Here’s an example of creating a custom evaluation pipeline:

class CustomEvalPipeline(EvalPipeline):
    def __init__(self, pipes):
        super().__init__()
        self.add_pipe(pipes.custom_eval_pipe)

R2RClient

The R2RClient class provides a client interface for interacting with the R2R application’s API endpoints. Here are some key methods:

  • ingest_documents(documents: list[dict]) -> dict: Ingests documents.
  • ingest_files(metadatas: list[dict], files: list[str], ids: Optional[list[str]] = None) -> dict: Ingests files.
  • search(query: str, search_filters: dict = {}, search_limit: int = 10) -> dict: Performs a search.
  • rag(message: str, search_filters: Optional[dict] = None, search_limit: int = 10, rag_generation_config: Optional[dict] = None, streaming: bool = False) -> Union[dict, Generator[str, None, None]]: Generates a RAG completion.
  • delete(key: str, value: str) -> dict: Deletes entries based on filters.
  • get_user_ids() -> dict: Retrieves user IDs.
  • get_user_document_data(user_id: str) -> dict: Retrieves document data for a user.
  • get_logs(pipeline_type: Optional[str] = None) -> dict: Retrieves logs.

Here’s an example of how to use R2RClient:

client = R2RClient(base_url="http://localhost:8000")
 
# Ingest documents
documents = [{"id": "1", "data": "This is a test document."}]
response = client.ingest_documents(documents)
 
# Perform a search
query = "test"
search_response = client.search(query)
 
# Generate a RAG completion
rag_response = client.rag(message="What is the meaning of life?", streaming=False)

This documentation provides a comprehensive overview of the R2R application, including configuration, application creation, custom pipelines, and using the R2RClient. For more detailed information, refer to the relevant sections in the documentation.