Embedding Pipeline
The Embedding Pipeline is responsible for embedding and storing documents using a specified embedding model and database. It chunks the text into manageable pieces, transforms them based on metadata, generates embeddings, and stores the embedded chunks in the database.
Basic Embedding Pipeline
The BasicEmbeddingPipeline
is a simple implementation of the EmbeddingPipeline
abstract base class. It provides a straightforward way to embed and store documents using a specified embedding model and database.
Initialization
The BasicEmbeddingPipeline
is initialized with an embedding_provider
, vector_db_provider
, text_splitter
, an optional logging_connection
, embedding_batch_size
, and id_prefix
.
Text Extraction
The extract_text
method extracts text from a document and logs the execution to the database.
Text Transformation
The transform_text
method transforms the extracted text before chunking, if necessary.
Text Chunking
The chunk_text
method splits the transformed text into manageable chunks for embedding using the provided text_splitter
.
Chunk Transformation
The transform_chunks
method transforms the text chunks based on their metadata, such as adding prefixes. It logs the execution to the database.
Chunk Embedding
The embed_chunks
method generates embeddings for each text chunk using the specified embedding model and embeddings provider.
Chunk Storage
The store_chunks
method stores the embedded chunks in the vector database, with an option to upsert.
Pipeline Execution
The run
method executes the embedding pipeline by initializing the pipeline, chunking the text (if specified), transforming the chunks, generating embeddings, and storing the embedded chunks in the database. It processes documents in batches based on the embedding_batch_size
.
Using a Custom Adapter (Reducto)
To use a custom adapter like ReductoAdapter
with the Embedding Pipeline, you can pass it to the E2EPipelineFactory
when creating the pipeline:
from r2r.core.adapters import ReductoAdapter
from r2r.main import E2EPipelineFactory, R2RConfig
from r2r.pipelines import IngestionType
app = E2EPipelineFactory.create_pipeline(
config=R2RConfig.load_config(),
adapters={
IngestionType.PDF: ReductoAdapter(),
},
)
In this example, we create an instance of the E2EPipelineFactory
and provide a custom adapters
dictionary that maps the IngestionType.PDF
to an instance of ReductoAdapter
. This allows the pipeline to use the ReductoAdapter
for parsing PDF documents.
By passing the custom adapter to the E2EPipelineFactory
, the pipeline will automatically use it for processing PDF documents without requiring any modifications to the pipeline itself.
Fully Customizing the Embedding Pipeline
For more advanced customization, you can create a custom embedding pipeline by subclassing the EmbeddingPipeline
abstract base class. Here's an example:
from r2r.pipelines import BasicEmbeddingPipeline
import textstat
class CustomEmbeddingPipeline(BasicEmbeddingPipeline):
def __init__(
self,
embedding_provider: OpenAIEmbeddingProvider,
vector_db_provider: VectorDBProvider,
text_splitter: TextSplitter,
logging_connection: Optional[LoggingDatabaseConnection] = None,
embedding_batch_size: int = 1,
id_prefix: str = "demo",
):
super().__init__(
embedding_provider,
vector_db_provider,
text_splitter,
logging_connection,
embedding_batch_size,
id_prefix,
)
@log_execution_to_db
def transform_chunks(
self,
chunks: list[str],
metadatas: list[dict]
) -> list[str]:
"""
Transforms text chunks based on their metadata and calculates readability scores.
"""
transformed_chunks = []
for chunk, metadata in zip(chunks, metadatas):
readability_score = textstat.flesch_reading_ease(chunk)
metadata["readability_score"] = readability_score
transformed_chunks.append(chunk)
return transformed_chunks
# Usage
from r2r.main import E2EPipelineFactory, R2RConfig
app = E2EPipelineFactory.create_pipeline(
config=R2RConfig.load_config(),
embedding_pipeline_impl=CustomEmbeddingPipeline,
)
In this example, we create a custom CustomEmbeddingPipeline
class that inherits from the BasicEmbeddingPipeline
class. We override the transform_chunks
method to include readability scoring logic using the textstat
library. The readability score is added to the chunk metadata.
To use the fully customized embedding pipeline, we create an instance of the E2EPipelineFactory
and pass the CustomEmbeddingPipeline
as the embedding_pipeline_impl
parameter.
By doing this, the pipeline will use the custom processing logic defined in the CustomEmbeddingPipeline
for all documents.
This approach allows for more flexibility and control over the embedding process within the pipeline, enabling you to modify the behavior for specific steps or add custom transformations to the text chunks.