Deep Dive
Ingestion Pipeline

Ingestion Pipeline

The Ingestion Pipeline is responsible for processing incoming documents and converting them into plaintext format. It supports various data types such as TXT, JSON, HTML, and PDF.

Basic Ingestion Pipeline

The BasicIngestionPipeline is a simple implementation of the IngestionPipeline abstract base class. It provides a straightforward way to process documents based on their data type and yields DocumentPage objects.

Supported Data Types

The BasicIngestionPipeline supports the following data types:

  • TXT: Plain text format
  • JSON: JavaScript Object Notation format
  • HTML: Hypertext Markup Language format
  • PDF: Portable Document Format

Initialization

The BasicIngestionPipeline is initialized with an optional adapters dictionary that maps IngestionType enum values to their corresponding Adapter instances. It also takes an optional logging_connection parameter for logging purposes.

Processing Data

The process_data method processes data into plaintext based on the data type using the appropriate adapter and yields DocumentPage objects.

Parsing Entry

The parse_entry method is a wrapper around the process_data method. It takes the entry_type and entry_data as input, processes the data, and yields the resulting DocumentPage objects.

Adapters

Adapters are used to handle specific data types and convert them into plaintext format. The BasicIngestionPipeline uses the following adapters:

  • TextAdapter: Handles plain text format
  • JSONAdapter: Handles JSON format
  • HTMLAdapter: Handles HTML format
  • PDFAdapter: Handles PDF format

Adapters are passed to the IngestionPipeline during initialization and are used to process the corresponding data types.

Pipeline Execution

The run method from the base class (IngestionPipeline) is used to execute the ingestion pipeline. It processes the incoming documents, yields the resulting DocumentPage objects, and uses the appropriate adapters for parsing based on the data type.

Using a Custom Adapter (Reducto)

To use a custom adapter like ReductoAdapter with the Ingestion Pipeline, you can pass it to the E2EPipelineFactory when creating the pipeline:

from r2r.core.adapters import ReductoAdapter
from r2r.main import E2EPipelineFactory, R2RConfig
from r2r.pipelines import IngestionType
 
app = E2EPipelineFactory.create_pipeline(
    config=R2RConfig.load_config(),
    adapters={
        IngestionType.PDF: ReductoAdapter(),
    },
)

In this example, we create an instance of the E2EPipelineFactory and provide a custom adapters dictionary that maps the IngestionType.PDF to an instance of ReductoAdapter. This allows the pipeline to use the ReductoAdapter for parsing PDF documents.

By passing the custom adapter to the E2EPipelineFactory, the pipeline will automatically use it for processing PDF documents without requiring any modifications to the pipeline itself.

Fully Customizing the Ingestion Pipeline

For more advanced customization, you can create a custom ingestion pipeline by subclassing the IngestionPipeline abstract base class. Here's an example:

from typing import Iterator, Union
from r2r.core import DocumentPage, IngestionPipeline
from r2r.core.adapters import ReductoAdapter
from r2r.pipelines import IngestionType
 
class CustomIngestionPipeline(IngestionPipeline):
    def process_data(
        self,
        entry_type: IngestionType,
        entry_data: Union[bytes, str],
    ) -> Iterator[DocumentPage]:
        adapter = self.adapters.get(
            entry_type,
            self.default_adapters[entry_type]
        )
        
        if isinstance(adapter, ReductoAdapter):
            # Custom processing for ReductoAdapter
            texts = adapter.adapt(entry_data)
            for it, text in enumerate(texts):
                yield DocumentPage(
                    document_id=self.document_id,
                    page_number=it,
                    text=text,
                    metadata={
                        "source": "Reducto",
                        "original_type": entry_type.value,
                        **self.metadata,
                    },
                )
        else:
            # Custom processing for other adapters
            texts = adapter.adapt(entry_data)
            for it, text in enumerate(texts):
                yield DocumentPage(
                    document_id=self.document_id,
                    page_number=it,
                    text=text.upper(),  # Custom modification: convert text to uppercase
                    metadata=self.metadata,
                )
 
# Usage
from r2r.main import E2EPipelineFactory, R2RConfig
 
app = E2EPipelineFactory.create_pipeline(
    config=R2RConfig.load_config(),
    ingestion_pipeline_impl=CustomIngestionPipeline,
)

In this example, we create a custom CustomIngestionPipeline class that inherits from the IngestionPipeline abstract base class. We override the process_data method to add custom processing logic.

Inside the process_data method, we retrieve the appropriate adapter based on the entry_type. If the adapter is an instance of ReductoAdapter, we apply custom processing similar to the previous example.

For other adapters, we apply custom processing by converting the extracted text to uppercase using text.upper() before yielding the DocumentPage objects.

To use the fully customized ingestion pipeline, we create an instance of the E2EPipelineFactory and pass the CustomIngestionPipeline as the ingestion_pipeline_impl parameter.

By doing this, the pipeline will use the custom processing logic defined in the CustomIngestionPipeline for all data types. The custom logic will be applied based on the type of adapter used for each data type.

This approach allows for more flexibility and control over the processing of documents within the ingestion pipeline, enabling you to modify the behavior for specific adapters or apply custom transformations to the extracted text.