Distributed RAG with LanceDB

This example demonstrates how multiple specialized agents coordinate to provide comprehensive RAG responses using distributed knowledge bases and specialized retrieval strategies with LanceDB. The team includes primary retrieval, context expansion, answer synthesis, and quality validation.

Code

1"""
2This example demonstrates how multiple specialized agents coordinate to provide
3comprehensive RAG responses using distributed knowledge bases and specialized
4retrieval strategies with LanceDB.
5
6Team Composition:
7- Primary Retriever: Handles primary document retrieval from main knowledge base
8- Context Expander: Expands context by finding related information
9- Answer Synthesizer: Synthesizes retrieved information into comprehensive answers
10- Quality Validator: Validates answer quality and suggests improvements
11
12Setup:
131. Run: `uv pip install openai lancedb tantivy pypdf sqlalchemy kern-ai`
142. Run this script to see distributed RAG in action
15"""
16
17import asyncio
18
19from kern.agent import Agent
20from kern.knowledge.embedder.openai import OpenAIEmbedder
21from kern.knowledge.knowledge import Knowledge
22from kern.models.openai import OpenAIResponses
23from kern.team.team import Team
24from kern.vectordb.lancedb import LanceDb, SearchType
25
26# Primary knowledge base for main retrieval
27primary_knowledge = Knowledge(
28 vector_db=LanceDb(
29 table_name="recipes_primary",
30 uri="tmp/lancedb",
31 search_type=SearchType.vector,
32 embedder=OpenAIEmbedder(id="text-embedding-3-small"),
33 ),
34)
35
36# Secondary knowledge base for context expansion
37context_knowledge = Knowledge(
38 vector_db=LanceDb(
39 table_name="recipes_context",
40 uri="tmp/lancedb",
41 search_type=SearchType.hybrid,
42 embedder=OpenAIEmbedder(id="text-embedding-3-small"),
43 ),
44)
45
46# Primary Retriever Agent - Specialized in main document retrieval
47primary_retriever = Agent(
48 name="Primary Retriever",
49 model=OpenAIResponses(id="gpt-5.2"),
50 role="Retrieve primary documents and core information from knowledge base",
51 knowledge=primary_knowledge,
52 search_knowledge=True,
53 instructions=[
54 "Search the knowledge base for directly relevant information to the user's query.",
55 "Focus on retrieving the most relevant and specific documents first.",
56 "Provide detailed information with proper context.",
57 "Ensure accuracy and completeness of retrieved information.",
58 ],
59 markdown=True,
60)
61
62# Context Expander Agent - Specialized in expanding context
63context_expander = Agent(
64 name="Context Expander",
65 model=OpenAIResponses(id="gpt-5.2"),
66 role="Expand context by finding related and supplementary information",
67 knowledge=context_knowledge,
68 search_knowledge=True,
69 instructions=[
70 "Find related information that complements the primary retrieval.",
71 "Look for background context, related topics, and supplementary details.",
72 "Search for information that helps understand the broader context.",
73 "Identify connections between different pieces of information.",
74 ],
75 markdown=True,
76)
77
78# Answer Synthesizer Agent - Specialized in synthesis
79answer_synthesizer = Agent(
80 name="Answer Synthesizer",
81 model=OpenAIResponses(id="gpt-5.2"),
82 role="Synthesize retrieved information into comprehensive answers",
83 instructions=[
84 "Combine information from the Primary Retriever and Context Expander.",
85 "Create a comprehensive, well-structured response.",
86 "Ensure logical flow and coherence in the final answer.",
87 "Include relevant details while maintaining clarity.",
88 "Organize information in a user-friendly format.",
89 ],
90 markdown=True,
91)
92
93# Quality Validator Agent - Specialized in validation
94quality_validator = Agent(
95 name="Quality Validator",
96 model=OpenAIResponses(id="gpt-5.2"),
97 role="Validate answer quality and suggest improvements",
98 instructions=[
99 "Review the synthesized answer for accuracy and completeness.",
100 "Check if the answer fully addresses the user's query.",
101 "Identify any gaps or areas that need clarification.",
102 "Suggest improvements or additional information if needed.",
103 "Ensure the response meets high quality standards.",
104 ],
105 markdown=True,
106)
107
108# Create distributed RAG team
109distributed_rag_team = Team(
110 name="Distributed RAG Team",
111 model=OpenAIResponses(id="gpt-5.2"),
112 members=[
113 primary_retriever,
114 context_expander,
115 answer_synthesizer,
116 quality_validator,
117 ],
118 instructions=[
119 "Work together to provide comprehensive, high-quality RAG responses.",
120 "Primary Retriever: First retrieve core relevant information.",
121 "Context Expander: Then expand with related context and background.",
122 "Answer Synthesizer: Synthesize all information into a comprehensive answer.",
123 "Quality Validator: Finally validate and suggest any improvements.",
124 "Ensure all responses are accurate, complete, and well-structured.",
125 ],
126 show_members_responses=True,
127 markdown=True,
128)
129
130
131async def async_distributed_rag_demo():
132 """Demonstrate async distributed RAG processing."""
133 print("📚 Async Distributed RAG with LanceDB Demo")
134 print("=" * 50)
135
136 query = "How do I make chicken and galangal in coconut milk soup? Include cooking tips and variations."
137
138 # Add content to knowledge bases
139 await primary_knowledge.ainsert_many(
140 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
141 )
142 await context_knowledge.ainsert_many(
143 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
144 )
145
146 # # Run async distributed RAG
147 # await distributed_rag_team.aprint_response(
148 # query, stream=True
149 # )
150 await distributed_rag_team.aprint_response(input=query)
151
152
153def sync_distributed_rag_demo():
154 """Demonstrate sync distributed RAG processing."""
155 print("📚 Distributed RAG with LanceDB Demo")
156 print("=" * 40)
157
158 query = "How do I make chicken and galangal in coconut milk soup? Include cooking tips and variations."
159
160 # Add content to knowledge bases
161 primary_knowledge.insert_many(
162 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
163 )
164 context_knowledge.insert_many(
165 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
166 )
167
168 # Run distributed RAG
169 distributed_rag_team.print_response(input=query)
170
171
172def multi_course_meal_demo():
173 """Demonstrate distributed RAG for complex multi-part queries."""
174 print("🍽️ Multi-Course Meal Planning with Distributed RAG")
175 print("=" * 55)
176
177 query = """Hi, I want to make a 3 course Thai meal. Can you recommend some recipes?
178 I'd like to start with a soup, then a thai curry for the main course and finish with a dessert.
179 Please include cooking techniques and any special tips."""
180
181 # Add content to knowledge bases
182 primary_knowledge.insert_many(
183 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
184 )
185 context_knowledge.insert_many(
186 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
187 )
188
189 distributed_rag_team.print_response(input=query)
190
191
192if __name__ == "__main__":
193 # Choose which demo to run
194 asyncio.run(async_distributed_rag_demo())
195
196 # multi_course_meal_demo()
197
198 # sync_distributed_rag_demo()

Usage

Set up your virtual environment

1uv venv --python 3.12
2source .venv/bin/activate
1uv venv --python 3.12
2.venv\Scripts\activate

Install required libraries

1uv pip install kern-ai openai lancedb tantivy pypdf sqlalchemy

Set environment variables

1export OPENAI_API_KEY=****

Run the agent

1python cookbook/02_examples/teams/distributed_rag/02_distributed_rag_lancedb.py