Streaming Retrieval API

Using real-time streaming for RAG and agent interactions

R2R provides powerful streaming capabilities for its retrieval services, including RAG responses and agent interactions. These streaming features allow for real-time updates as information is retrieved and processed, enhancing user experience for applications that benefit from immediate feedback.

Streaming Events

When using streaming in R2R, various event types are emitted during the retrieval and generation process:

Event TypeDescription
SearchResultsEventContains initial search results from documents
MessageEventStreams partial tokens of the response as they’re generated
CitationEventIndicates when a citation is added to the response
ThinkingEventContains the model’s step-by-step reasoning (for agents)
ToolCallEventIndicates when the model calls a tool (for agents)
ToolResultEventContains results from tool calls (for agents)
FinalAnswerEventContains the complete generated answer with citations

Streaming RAG

Basic Streaming RAG

To use streaming with basic RAG functionality:

1from r2r import (
2 CitationEvent,
3 FinalAnswerEvent,
4 MessageEvent,
5 SearchResultsEvent,
6 R2RClient,
7)
8
9client = R2RClient("http://localhost:7272")
10
11result_stream = client.retrieval.rag(
12 query="What is DeepSeek R1?",
13 search_settings={"limit": 25},
14 rag_generation_config={"stream": True},
15)
16
17for event in result_stream:
18 if isinstance(event, SearchResultsEvent):
19 print("Search results:", event.data)
20 elif isinstance(event, MessageEvent):
21 print("Partial message:", event.data.delta)
22 elif isinstance(event, CitationEvent):
23 print("New citation detected:", event.data)
24 elif isinstance(event, FinalAnswerEvent):
25 print("Final answer:", event.data.generated_answer)
26 print("Citations:", event.data.citations)

To include web search in your streaming RAG:

1result_stream = client.retrieval.rag(
2 query="What are the latest developments with DeepSeek R1?",
3 rag_generation_config={"stream": True},
4 include_web_search=True
5)
6
7for event in result_stream:
8 # Process events as shown in previous example
9 pass

Streaming Agent

R2R provides a powerful streaming agent mode that supports complex interactions with both document-based knowledge and web resources.

Basic Streaming Agent

1from r2r import (
2 ThinkingEvent,
3 ToolCallEvent,
4 ToolResultEvent,
5 CitationEvent,
6 MessageEvent,
7 FinalAnswerEvent,
8)
9
10agent_stream = client.retrieval.agent(
11 query="What does DeepSeek R1 imply for the future of AI?",
12 generation_config={"stream": True},
13 mode="research"
14)
15
16for event in agent_stream:
17 if isinstance(event, ThinkingEvent):
18 print(f"🧠 Thinking: {event.data.delta.content[0].payload.value}")
19 elif isinstance(event, ToolCallEvent):
20 print(f"🔧 Tool call: {event.data.name}({event.data.arguments})")
21 elif isinstance(event, ToolResultEvent):
22 print(f"📊 Tool result: {event.data.content[:60]}...")
23 elif isinstance(event, CitationEvent):
24 print(f"📑 Citation: {event.data}")
25 elif isinstance(event, MessageEvent):
26 print(f"💬 Message: {event.data.delta.content[0].payload.value}")
27 elif isinstance(event, FinalAnswerEvent):
28 print(f"✅ Final answer: {event.data.generated_answer[:100]}...")
29 print(f" Citations: {len(event.data.citations)} sources referenced")

Advanced Research Agent with Tools

R2R’s agent mode can leverage multiple tools to perform in-depth research:

1agent_stream = client.retrieval.agent(
2 query="Analyze DeepSeek R1's performance compared to other models",
3 generation_config={
4 "model": "anthropic/claude-3-7-sonnet-20250219",
5 "extended_thinking": True,
6 "thinking_budget": 4096,
7 "temperature": 1,
8 "max_tokens_to_sample": 16000,
9 "stream": True
10 },
11 mode="research",
12 rag_tools=["web_search", "web_scrape"]
13)
14
15# Process events as shown in previous example

Streaming Citations

R2R’s streaming citations provide detailed attribution information that links specific parts of the response to source documents:

1{
2 "event": "citation",
3 "data": {
4 "id": "abc123",
5 "object": "citation",
6 "raw_index": 1,
7 "index": 1,
8 "start_index": 305,
9 "end_index": 308,
10 "source_type": "chunk",
11 "source_id": "e760bb76-1c6e-52eb-910d-0ce5b567011b",
12 "document_id": "e43864f5-a36f-548e-aacd-6f8d48b30c7f",
13 "source_title": "DeepSeek_R1.pdf"
14 }
15}

Each citation includes:

  • id: Unique identifier for the citation
  • index: The display index (e.g., [1], [2])
  • start_index and end_index: Character positions in the response
  • source_type: The type of source (chunk, graph, web)
  • source_id: ID of the specific chunk/node
  • document_id: ID of the parent document
  • source_title: Title of the source document

Implementing Streaming UI

To create a responsive UI with streaming RAG, consider these patterns:

Frontend Implementation

1import { useState, useEffect } from 'react';
2
3function RAGComponent() {
4 const [messages, setMessages] = useState([]);
5 const [currentMessage, setCurrentMessage] = useState('');
6 const [citations, setCitations] = useState([]);
7 const [isLoading, setIsLoading] = useState(false);
8
9 const handleSubmit = async (query) => {
10 setIsLoading(true);
11 setCurrentMessage('');
12 setCitations([]);
13
14 try {
15 const response = await fetch('/api/rag', {
16 method: 'POST',
17 headers: { 'Content-Type': 'application/json' },
18 body: JSON.stringify({
19 query,
20 stream: true
21 })
22 });
23
24 const reader = response.body.getReader();
25 const decoder = new TextDecoder();
26
27 while (true) {
28 const { done, value } = await reader.read();
29 if (done) break;
30
31 const chunk = decoder.decode(value);
32 const events = chunk.split('\n\n').filter(Boolean);
33
34 for (const eventText of events) {
35 if (!eventText.startsWith('data: ')) continue;
36
37 const eventData = JSON.parse(eventText.slice(6));
38
39 switch (eventData.event) {
40 case 'message':
41 setCurrentMessage(prev => prev + eventData.data.delta.content[0].payload.value);
42 break;
43 case 'citation':
44 setCitations(prev => [...prev, eventData.data]);
45 break;
46 case 'final_answer':
47 setMessages(prev => [...prev, {
48 role: 'assistant',
49 content: eventData.data.generated_answer,
50 citations: eventData.data.citations
51 }]);
52 break;
53 }
54 }
55 }
56 } catch (error) {
57 console.error('Error with streaming RAG:', error);
58 } finally {
59 setIsLoading(false);
60 }
61 };
62
63 return (
64 <div className="rag-container">
65 {/* UI implementation */}
66 {isLoading && <div className="typing-indicator">{currentMessage}</div>}
67 {/* Display messages and citations */}
68 </div>
69 );
70}

Best Practices

Optimizing Streaming RAG

  1. Handle Event Types Properly

    • Process each event type according to its purpose
    • Update UI incrementally as events arrive
    • Cache search results to improve perceived performance
  2. Error Handling

    • Implement robust error handling for stream interruptions
    • Provide fallback mechanisms for connection issues
    • Consider retry logic for temporary failures
  3. UI Considerations

    • Display typing indicators during generation
    • Highlight citations as they appear
    • Show search results separately from generated content
  4. Performance

    • Monitor stream processing performance
    • Optimize rendering for large responses
    • Consider batching UI updates for efficiency

Example Implementation

Here’s a complete example of RAG with hybrid search, web integration, and streaming:

1from r2r import R2RClient, CitationEvent, MessageEvent, SearchResultsEvent, FinalAnswerEvent
2
3client = R2RClient("http://localhost:7272")
4
5# Configure the streaming RAG request
6stream = client.retrieval.rag(
7 query="What are the key capabilities of DeepSeek R1 for reasoning tasks?",
8 search_settings={
9 "use_hybrid_search": True,
10 "hybrid_settings": {
11 "full_text_weight": 1.0,
12 "semantic_weight": 3.0
13 },
14 "limit": 30
15 },
16 rag_generation_config={
17 "model": "anthropic/claude-3-5-sonnet-20241022",
18 "temperature": 0.7,
19 "stream": True
20 },
21 include_web_search=True
22)
23
24# Process the streaming events
25search_results = None
26message_buffer = ""
27citations = []
28
29for event in stream:
30 if isinstance(event, SearchResultsEvent):
31 search_results = event.data
32 print(f"Retrieved {len(event.data.chunk_search_results)} chunks")
33
34 if event.data.web_search_results:
35 print(f"Retrieved {len(event.data.web_search_results)} web results")
36
37 elif isinstance(event, MessageEvent):
38 delta = event.data.delta.content[0].payload.value
39 message_buffer += delta
40 print(delta, end="", flush=True)
41
42 elif isinstance(event, CitationEvent):
43 citations.append(event.data)
44 print(f"\n[Citation {event.data.index}]", end="", flush=True)
45
46 elif isinstance(event, FinalAnswerEvent):
47 print("\n\nFinal answer complete with", len(event.data.citations), "citations")

Advanced Configuration

Customizing Streaming Behavior

1# Custom timeout and chunk size for streaming
2client = R2RClient(
3 base_url="http://localhost:7272",
4 timeout=120.0,
5 stream_chunk_size=4096
6)
7
8# Configure retrieval and generation parameters
9stream = client.retrieval.rag(
10 query="Complex query requiring detailed analysis",
11 search_settings={
12 "limit": 50,
13 "use_hybrid_search": True
14 },
15 rag_generation_config={
16 "model": "anthropic/claude-3-7-sonnet-20250219",
17 "stream": True,
18 "temperature": 0.2,
19 "max_tokens_to_sample": 4000
20 }
21)

Limitations and Considerations

  • Stream connections require stable network connectivity
  • Processing streams requires more client-side logic than non-streaming requests
  • Citation indices may not be finalized until the entire response is generated
  • Some LLM providers may have different streaming behaviors or limitations

For more information on RAG and retrieval capabilities, see the Search and RAG and Retrieval API Reference documentation.