Deep Dive
Embedding Pipeline

Embedding Pipeline

The Embedding Pipeline is responsible for embedding and storing documents using a specified embedding model and database. It chunks the text into manageable pieces, transforms them based on metadata, generates embeddings, and stores the embedded chunks in the database.

Basic Embedding Pipeline

The BasicEmbeddingPipeline is a simple implementation of the EmbeddingPipeline abstract base class. It provides a straightforward way to embed and store documents using a specified embedding model and database.

Initialization

The BasicEmbeddingPipeline is initialized with an embedding_provider, vector_db_provider, text_splitter, an optional logging_connection, embedding_batch_size, and id_prefix.

Text Extraction

The extract_text method extracts text from a document and logs the execution to the database.

Text Transformation

The transform_text method transforms the extracted text before chunking, if necessary.

Text Chunking

The chunk_text method splits the transformed text into manageable chunks for embedding using the provided text_splitter.

Chunk Transformation

The transform_chunks method transforms the text chunks based on their metadata, such as adding prefixes. It logs the execution to the database.

Chunk Embedding

The embed_chunks method generates embeddings for each text chunk using the specified embedding model and embeddings provider.

Chunk Storage

The store_chunks method stores the embedded chunks in the vector database, with an option to upsert.

Pipeline Execution

The run method executes the embedding pipeline by initializing the pipeline, chunking the text (if specified), transforming the chunks, generating embeddings, and storing the embedded chunks in the database. It processes documents in batches based on the embedding_batch_size.

Using a Custom Adapter (Reducto)

To use a custom adapter like ReductoAdapter with the Embedding Pipeline, you can pass it to the E2EPipelineFactory when creating the pipeline:

from r2r.core.adapters import ReductoAdapter
from r2r.main import E2EPipelineFactory, R2RConfig
from r2r.pipelines import IngestionType
 
app = E2EPipelineFactory.create_pipeline(
    config=R2RConfig.load_config(),
    adapters={
        IngestionType.PDF: ReductoAdapter(),
    },
)

In this example, we create an instance of the E2EPipelineFactory and provide a custom adapters dictionary that maps the IngestionType.PDF to an instance of ReductoAdapter. This allows the pipeline to use the ReductoAdapter for parsing PDF documents.

By passing the custom adapter to the E2EPipelineFactory, the pipeline will automatically use it for processing PDF documents without requiring any modifications to the pipeline itself.

Fully Customizing the Embedding Pipeline

For more advanced customization, you can create a custom embedding pipeline by subclassing the EmbeddingPipeline abstract base class. Here's an example:

from r2r.pipelines import BasicEmbeddingPipeline
import textstat
 
class CustomEmbeddingPipeline(BasicEmbeddingPipeline):
    def __init__(
        self,
        embedding_provider: OpenAIEmbeddingProvider,
        vector_db_provider: VectorDBProvider,
        text_splitter: TextSplitter,
        logging_connection: Optional[LoggingDatabaseConnection] = None,
        embedding_batch_size: int = 1,
        id_prefix: str = "demo",
    ):
        super().__init__(
            embedding_provider,
            vector_db_provider,
            text_splitter,
            logging_connection,
            embedding_batch_size,
            id_prefix,
        )
 
    @log_execution_to_db
    def transform_chunks(
        self,
        chunks: list[str],
        metadatas: list[dict]
    ) -> list[str]:
        """
        Transforms text chunks based on their metadata and calculates readability scores.
        """
        transformed_chunks = []
        for chunk, metadata in zip(chunks, metadatas):
            readability_score = textstat.flesch_reading_ease(chunk)
            metadata["readability_score"] = readability_score
            transformed_chunks.append(chunk)
        return transformed_chunks
 
# Usage
from r2r.main import E2EPipelineFactory, R2RConfig
 
app = E2EPipelineFactory.create_pipeline(
    config=R2RConfig.load_config(),
    embedding_pipeline_impl=CustomEmbeddingPipeline,
)

In this example, we create a custom CustomEmbeddingPipeline class that inherits from the BasicEmbeddingPipeline class. We override the transform_chunks method to include readability scoring logic using the textstat library. The readability score is added to the chunk metadata.

To use the fully customized embedding pipeline, we create an instance of the E2EPipelineFactory and pass the CustomEmbeddingPipeline as the embedding_pipeline_impl parameter.

By doing this, the pipeline will use the custom processing logic defined in the CustomEmbeddingPipeline for all documents.

This approach allows for more flexibility and control over the embedding process within the pipeline, enabling you to modify the behavior for specific steps or add custom transformations to the text chunks.