Deep Dive
Ingestion Pipeline

Ingestion Pipeline

The Ingestion Pipeline processes incoming documents and converts them into structured plaintext. Key features include:

  • Multi-format Support: Handles TXT, JSON, HTML, PDF, DOCX, PPTX, XLSX, CSV, Markdown, images, audio, and movies.
  • Customizable: Supports the addition of custom parsers for specific data types.
  • Asynchronous Processing: Efficiently manages data handling with asynchronous operations.

Key Parsers

  • TextParser: Converts raw text data into plaintext.
  • JSONParser: Formats JSON data into readable text.
  • HTMLParser: Extracts text from HTML documents.
  • PDFParser: Extracts text from PDF files.
  • DOCXParser: Extracts text from DOCX files.
  • PPTParser: Extracts text from PPT slides.
  • XLSXParser: Extracts text from XLSX rows.
  • CSVParser: Converts CSV data into plaintext.
  • MarkdownParser: Extracts text from Markdown content.
  • AudioParser: Transcribes audio files.
  • ImageParser: Describes image content.
  • MovieParser: Describes video frames and audio.

Vector Storage Pipe

The R2RVectorStoragePipe stores the generated embeddings in a vector database. It handles batch storage operations efficiently using asynchronous processing.

Storage Logic

  • Batch Processing: Collects vector entries and stores them in batches.
  • Upsert and Copy Operations: Supports upsert and copy operations for vector entries.

Custom Ingestion Pipeline

To create a custom ingestion pipeline, follow these steps:

  1. Define Custom Parsers (if needed): Implement any custom parsers required for specific data types.
  2. Create and Configure the Pipeline: Add the necessary pipes to the pipeline.

Example Custom Ingestion Pipeline

from r2r import IngestionPipeline, R2RConfig, R2RProviderFactory, R2RPipeFactory, R2RPipelineFactory
 
# Load configuration
config = R2RConfig.from_json()
 
# Create providers and pipes
providers = R2RProviderFactory(config).create_providers()
pipes = R2RPipeFactory(config, providers).create_pipes()
 
# Define the custom ingestion pipeline
class CustomIngestionPipeline(IngestionPipeline):
    def __init__(self):
        super().__init__()
        self.add_pipe(pipes.parsing_pipe)
        self.add_pipe(pipes.embedding_pipe)
        self.add_pipe(pipes.vector_storage_pipe)
 
# Instantiate and run the pipeline
custom_ingestion_pipeline = CustomIngestionPipeline()
 
 
pipelines = R2RPipelineFactory(config, pipes).create_pipelines(
    ingestion_pipeline = custom_ingestion_pipeline
)
r2r = R2RApp(config, providers, pipelines)

Conclusion

The Ingestion Pipeline is a versatile tool for processing various document formats into structured plaintext. Its modular design supports customization and asynchronous processing, making it suitable for diverse data handling requirements.