Overview

R2R uses a pipeline architecture to process data efficiently and flexibly. Pipelines in R2R are composed of a series of pipes, each responsible for a specific task in the data processing workflow. This modular approach allows for easy customization, extension, and maintenance of the system.

Key Concepts

Pipes

Pipes are the fundamental building blocks of R2R pipelines. Each pipe is an instance of the AsyncPipe class, which defines a standard interface for asynchronous data processing. Key features of pipes include:

  • Asynchronous Processing: Pipes use Python’s async/await syntax for efficient I/O operations.
  • Typed Inputs: Each pipe defines its input structure using Pydantic models.
  • State Management: Pipes can read from and write to a shared AsyncState object.
  • Configurability: Pipes have a PipeConfig for customization.

LoggableAsyncPipe

R2R recommends using the LoggableAsyncPipe class, which extends AsyncPipe with logging capabilities:

class LoggableAsyncPipe(AsyncPipe):
    def __init__(
        self,
        pipe_logger: Optional[KVLoggingSingleton] = None,
        type: PipeType = PipeType.OTHER,
        config: Optional[AsyncPipe.PipeConfig] = None,
        run_manager: Optional[RunManager] = None,
        *args,
        **kwargs,
    ):
        # ... initialization code ...

    async def log_worker(self):
        # ... log worker implementation ...

    async def enqueue_log(self, run_id: uuid.UUID, key: str, value: str):
        # ... log enqueuing logic ...

    async def run(
        self,
        input: AsyncPipe.Input,
        state: AsyncState,
        run_manager: Optional[RunManager] = None,
        *args: Any,
        **kwargs: Any,
    ) -> AsyncGenerator[Any, None]:
        # ... run method with logging capabilities ...

This class provides additional logging functionality, including a log worker and methods for enqueueing logs.

Pipelines

Pipelines in R2R are sequences of pipes working in concert to accomplish complex tasks. The Pipeline class manages the execution of these sequences, handling the flow of data from one pipe to the next. This class is responsible for several key aspects of pipeline operation, including flexible composition of pipes, management of data flow, support for parallel processing in some cases, and comprehensive error handling.

  • Flexible Composition: Pipelines can be composed of any number and type of pipes.
  • Data Flow: Output from one pipe becomes input for the next.
  • Parallel Processing: Some pipelines support parallel execution of pipes.
  • Error Handling: Pipelines manage errors and exceptions across the entire process.

Pipeline

R2R recommends using the Pipeline class to build pipelines:

class Pipeline:
    def __init__(
        self,
        pipe_logger: Optional[KVLoggingSingleton] = None,
        run_manager: Optional[RunManager] = None,
    ):
        self.pipes: list[AsyncPipe] = []
        self.upstream_outputs: list[list[dict[str, str]]] = []
        self.pipe_logger = pipe_logger or KVLoggingSingleton()
        self.run_manager = run_manager or RunManager(self.pipe_logger)
        self.futures = {}

    def add_pipe(
        self,
        pipe: AsyncPipe,
        add_upstream_outputs: Optional[list[dict[str, str]]] = None,
        *args,
        **kwargs,
    ) -> None:
        # Add a pipe to the pipeline

    async def run(
        self,
        input: Any,
        state: Optional[AsyncState] = None,
        stream: bool = False,
        run_manager: Optional[RunManager] = None,
        log_run_info: bool = True,
        *args: Any,
        **kwargs: Any,
    ):
        # Run the pipeline

Example: Creating a Custom Pipeline

Here’s a simple example of how to create a custom pipeline in R2R:

from r2r import Pipeline, AsyncPipe, AsyncState

class CustomPipe(AsyncPipe):
    async def _run_logic(self, input: AsyncPipe.Input, state: AsyncState, *args: Any, **kwargs: Any) -> AsyncGenerator[Any, None]:
        async for item in input.message:
            # Process item
            yield processed_item

custom_pipeline = Pipeline()
custom_pipeline.add_pipe(CustomPipe())
custom_pipeline.add_pipe(AnotherCustomPipe())

result = await custom_pipeline.run(input_data)

Types of Pipelines in R2R

R2R implements several specialized pipeline types:

  1. Ingestion Pipeline: Processes and stores input documents.

  2. Search Pipeline: Retrieves relevant information from stored data.

  3. RAG (Retrieval-Augmented Generation) Pipeline: Combines search results with language generation.

  4. Evaluation Pipeline: Assesses the quality and performance of other pipelines.

Why Pipelines in R2R?

R2R uses pipelines for several reasons:

  1. Modularity: Pipelines break complex processes into manageable, reusable components.
  2. Flexibility: Easy to add, remove, or rearrange pipes to create new workflows.
  3. Scalability: Asynchronous design allows for efficient handling of large-scale data processing.
  4. Maintainability: Modular structure makes it easier to update or debug specific parts of the system.
  5. Extensibility: Developers can easily add new pipes to extend system functionality.

Example: R2R Pipelines Mermaid diagram

The following diagram may be helpful in understanding the flow of R2R pipelines:

Strengths of R2R Pipelines

The pipeline architecture of R2R offers several significant strengths. Its asynchronous processing capabilities enable efficient handling of I/O-bound operations, crucial for maintaining performance in data-intensive tasks. The use of typed inputs and Pydantic models ensures type safety throughout the pipeline, reducing errors and improving code quality. The shared state management allows for complex, multi-step processes, while the high degree of configurability enables adaptation to a wide range of use cases.

However, working with R2R pipelines also requires careful consideration of certain factors. Developers need to have a solid understanding of asynchronous programming concepts and the specifics of the AsyncPipe base class. Performance monitoring, careful state management, and robust error handling are all critical aspects of working effectively with R2R pipelines.

Conclusion

R2R’s pipeline architecture provides a powerful and flexible way to process data and generate responses. By understanding the core concepts and best practices, developers can leverage this system to create efficient, scalable, and maintainable applications.

For more detailed information on specific pipeline types and advanced usage, refer to the individual pipeline documentation pages linked throughout this guide.