Ingestion Pipeline
The Ingestion Pipeline is responsible for processing incoming documents and converting them into plaintext format. It supports various data types such as TXT, JSON, HTML, and PDF.
Basic Ingestion Pipeline
The BasicIngestionPipeline
is a simple implementation of the IngestionPipeline
abstract base class. It provides a straightforward way to process documents based on their data type and yields DocumentPage
objects.
Supported Data Types
The BasicIngestionPipeline
supports the following data types:
- TXT: Plain text format
- JSON: JavaScript Object Notation format
- HTML: Hypertext Markup Language format
- PDF: Portable Document Format
Initialization
The BasicIngestionPipeline
is initialized with an optional adapters
dictionary that maps IngestionType
enum values to their corresponding Adapter
instances. It also takes an optional logging_connection
parameter for logging purposes.
Processing Data
The process_data
method processes data into plaintext based on the data type using the appropriate adapter and yields DocumentPage
objects.
Parsing Entry
The parse_entry
method is a wrapper around the process_data
method. It takes the entry_type
and entry_data
as input, processes the data, and yields the resulting DocumentPage
objects.
Adapters
Adapters are used to handle specific data types and convert them into plaintext format. The BasicIngestionPipeline
uses the following adapters:
TextAdapter
: Handles plain text formatJSONAdapter
: Handles JSON formatHTMLAdapter
: Handles HTML formatPDFAdapter
: Handles PDF format
Adapters are passed to the IngestionPipeline
during initialization and are used to process the corresponding data types.
Pipeline Execution
The run
method from the base class (IngestionPipeline
) is used to execute the ingestion pipeline. It processes the incoming documents, yields the resulting DocumentPage
objects, and uses the appropriate adapters for parsing based on the data type.
Using a Custom Adapter (Reducto)
To use a custom adapter like ReductoAdapter
with the Ingestion Pipeline, you can pass it to the E2EPipelineFactory
when creating the pipeline:
from r2r.core.adapters import ReductoAdapter
from r2r.main import E2EPipelineFactory, R2RConfig
from r2r.pipelines import IngestionType
app = E2EPipelineFactory.create_pipeline(
config=R2RConfig.load_config(),
adapters={
IngestionType.PDF: ReductoAdapter(),
},
)
In this example, we create an instance of the E2EPipelineFactory
and provide a custom adapters
dictionary that maps the IngestionType.PDF
to an instance of ReductoAdapter
. This allows the pipeline to use the ReductoAdapter
for parsing PDF documents.
By passing the custom adapter to the E2EPipelineFactory
, the pipeline will automatically use it for processing PDF documents without requiring any modifications to the pipeline itself.
Fully Customizing the Ingestion Pipeline
For more advanced customization, you can create a custom ingestion pipeline by subclassing the IngestionPipeline
abstract base class. Here's an example:
from typing import Iterator, Union
from r2r.core import DocumentPage, IngestionPipeline
from r2r.core.adapters import ReductoAdapter
from r2r.pipelines import IngestionType
class CustomIngestionPipeline(IngestionPipeline):
def process_data(
self,
entry_type: IngestionType,
entry_data: Union[bytes, str],
) -> Iterator[DocumentPage]:
adapter = self.adapters.get(
entry_type,
self.default_adapters[entry_type]
)
if isinstance(adapter, ReductoAdapter):
# Custom processing for ReductoAdapter
texts = adapter.adapt(entry_data)
for it, text in enumerate(texts):
yield DocumentPage(
document_id=self.document_id,
page_number=it,
text=text,
metadata={
"source": "Reducto",
"original_type": entry_type.value,
**self.metadata,
},
)
else:
# Custom processing for other adapters
texts = adapter.adapt(entry_data)
for it, text in enumerate(texts):
yield DocumentPage(
document_id=self.document_id,
page_number=it,
text=text.upper(), # Custom modification: convert text to uppercase
metadata=self.metadata,
)
# Usage
from r2r.main import E2EPipelineFactory, R2RConfig
app = E2EPipelineFactory.create_pipeline(
config=R2RConfig.load_config(),
ingestion_pipeline_impl=CustomIngestionPipeline,
)
In this example, we create a custom CustomIngestionPipeline
class that inherits from the IngestionPipeline
abstract base class. We override the process_data
method to add custom processing logic.
Inside the process_data
method, we retrieve the appropriate adapter based on the entry_type
. If the adapter is an instance of ReductoAdapter
, we apply custom processing similar to the previous example.
For other adapters, we apply custom processing by converting the extracted text to uppercase using text.upper()
before yielding the DocumentPage
objects.
To use the fully customized ingestion pipeline, we create an instance of the E2EPipelineFactory
and pass the CustomIngestionPipeline
as the ingestion_pipeline_impl
parameter.
By doing this, the pipeline will use the custom processing logic defined in the CustomIngestionPipeline
for all data types. The custom logic will be applied based on the type of adapter used for each data type.
This approach allows for more flexibility and control over the processing of documents within the ingestion pipeline, enabling you to modify the behavior for specific adapters or apply custom transformations to the extracted text.