Distributed RAG with PgVector

This example demonstrates how multiple specialized agents coordinate to provide comprehensive RAG responses using distributed PostgreSQL vector databases with pgvector for scalable, production-ready retrieval. The team includes vector retrieval, hybrid search, data validation, and response composition specialists.

Code

1"""
2This example demonstrates how multiple specialized agents coordinate to provide
3comprehensive RAG responses using distributed PostgreSQL vector databases with
4pgvector for scalable, production-ready retrieval.
5
6Team Composition:
7- Vector Retriever: Specialized in vector similarity search using pgvector
8- Hybrid Searcher: Combines vector and text search for comprehensive results
9- Data Validator: Validates retrieved data quality and relevance
10- Response Composer: Composes final responses with proper source attribution
11
12Setup:
131. Run: `./cookbook/run_pgvector.sh` to start a postgres container with pgvector
142. Run: `uv pip install openai sqlalchemy 'psycopg[binary]' pgvector kern-ai`
153. Run this script to see distributed PgVector RAG in action
16"""
17
18import asyncio # noqa: F401
19
20from kern.agent import Agent
21from kern.knowledge.embedder.openai import OpenAIEmbedder
22from kern.knowledge.knowledge import Knowledge
23from kern.models.openai import OpenAIResponses
24from kern.team.team import Team
25from kern.vectordb.pgvector import PgVector, SearchType
26
27# Database connection URL
28db_url = "postgresql+psycopg://ai:ai@localhost:5532/ai"
29
30# Vector-focused knowledge base for similarity search
31vector_knowledge = Knowledge(
32 vector_db=PgVector(
33 table_name="recipes_vector",
34 db_url=db_url,
35 search_type=SearchType.vector,
36 embedder=OpenAIEmbedder(id="text-embedding-3-small"),
37 ),
38)
39
40# Hybrid knowledge base for comprehensive search
41hybrid_knowledge = Knowledge(
42 vector_db=PgVector(
43 table_name="recipes_hybrid",
44 db_url=db_url,
45 search_type=SearchType.hybrid,
46 embedder=OpenAIEmbedder(id="text-embedding-3-small"),
47 ),
48)
49
50# Vector Retriever Agent - Specialized in vector similarity search
51vector_retriever = Agent(
52 name="Vector Retriever",
53 model=OpenAIResponses(id="gpt-5.2"),
54 role="Retrieve information using vector similarity search in PostgreSQL",
55 knowledge=vector_knowledge,
56 search_knowledge=True,
57 instructions=[
58 "Use vector similarity search to find semantically related content.",
59 "Focus on finding information that matches the semantic meaning of queries.",
60 "Leverage pgvector's efficient similarity search capabilities.",
61 "Retrieve content that has high semantic relevance to the user's query.",
62 ],
63 markdown=True,
64)
65
66# Hybrid Searcher Agent - Specialized in hybrid search
67hybrid_searcher = Agent(
68 name="Hybrid Searcher",
69 model=OpenAIResponses(id="gpt-5.2"),
70 role="Perform hybrid search combining vector and text search",
71 knowledge=hybrid_knowledge,
72 search_knowledge=True,
73 instructions=[
74 "Combine vector similarity and text search for comprehensive results.",
75 "Find information that matches both semantic and lexical criteria.",
76 "Use PostgreSQL's hybrid search capabilities for best coverage.",
77 "Ensure retrieval of both conceptually and textually relevant content.",
78 ],
79 markdown=True,
80)
81
82# Data Validator Agent - Specialized in data quality validation
83data_validator = Agent(
84 name="Data Validator",
85 model=OpenAIResponses(id="gpt-5.2"),
86 role="Validate retrieved data quality and relevance",
87 instructions=[
88 "Assess the quality and relevance of retrieved information.",
89 "Check for consistency across different search results.",
90 "Identify the most reliable and accurate information.",
91 "Filter out any irrelevant or low-quality content.",
92 "Ensure data integrity and relevance to the user's query.",
93 ],
94 markdown=True,
95)
96
97# Response Composer Agent - Specialized in response composition
98response_composer = Agent(
99 name="Response Composer",
100 model=OpenAIResponses(id="gpt-5.2"),
101 role="Compose comprehensive responses with proper source attribution",
102 instructions=[
103 "Combine validated information from all team members.",
104 "Create well-structured, comprehensive responses.",
105 "Include proper source attribution and data provenance.",
106 "Ensure clarity and coherence in the final response.",
107 "Format responses for optimal user experience.",
108 ],
109 markdown=True,
110)
111
112# Create distributed PgVector RAG team
113distributed_pgvector_team = Team(
114 name="Distributed PgVector RAG Team",
115 model=OpenAIResponses(id="gpt-5.2"),
116 members=[vector_retriever, hybrid_searcher, data_validator, response_composer],
117 instructions=[
118 "Work together to provide comprehensive RAG responses using PostgreSQL pgvector.",
119 "Vector Retriever: First perform vector similarity search.",
120 "Hybrid Searcher: Then perform hybrid search for comprehensive coverage.",
121 "Data Validator: Validate and filter the retrieved information quality.",
122 "Response Composer: Compose the final response with proper attribution.",
123 "Leverage PostgreSQL's scalability and pgvector's performance.",
124 "Ensure enterprise-grade reliability and accuracy.",
125 ],
126 show_members_responses=True,
127 markdown=True,
128)
129
130
131async def async_pgvector_rag_demo():
132 """Demonstrate async distributed PgVector RAG processing."""
133 print("🐘 Async Distributed PgVector RAG Demo")
134 print("=" * 40)
135
136 query = "How do I make chicken and galangal in coconut milk soup? What are the key ingredients and techniques?"
137
138 try:
139 # Add content to knowledge bases
140 await vector_knowledge.ainsert_many(
141 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
142 )
143 await hybrid_knowledge.ainsert_many(
144 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
145 )
146 # Run async distributed PgVector RAG
147 await distributed_pgvector_team.aprint_response(input=query)
148 except Exception as e:
149 print(f"❌ Error: {e}")
150 print("💡 Make sure PostgreSQL with pgvector is running!")
151 print(" Run: ./cookbook/run_pgvector.sh")
152
153
154def sync_pgvector_rag_demo():
155 """Demonstrate sync distributed PgVector RAG processing."""
156 print("🐘 Distributed PgVector RAG Demo")
157 print("=" * 35)
158
159 query = "How do I make chicken and galangal in coconut milk soup? What are the key ingredients and techniques?"
160
161 try:
162 # Add content to knowledge bases
163 vector_knowledge.insert_many(
164 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
165 )
166 hybrid_knowledge.insert_many(
167 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
168 )
169 # Run distributed PgVector RAG
170 distributed_pgvector_team.print_response(input=query)
171 except Exception as e:
172 print(f"❌ Error: {e}")
173 print("💡 Make sure PostgreSQL with pgvector is running!")
174 print(" Run: ./cookbook/run_pgvector.sh")
175
176
177def complex_query_demo():
178 """Demonstrate distributed RAG for complex culinary queries."""
179 print("👨‍🍳 Complex Culinary Query with Distributed PgVector RAG")
180 print("=" * 60)
181
182 query = """I'm planning a Thai dinner party for 8 people. Can you help me plan a complete menu?
183 I need appetizers, main courses, and desserts. Please include:
184 - Preparation timeline
185 - Shopping list
186 - Cooking techniques for each dish
187 - Any dietary considerations or alternatives"""
188
189 try:
190 # Add content to knowledge bases
191 vector_knowledge.insert_many(
192 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
193 )
194 hybrid_knowledge.insert_many(
195 url="https://kern-public.s3.amazonaws.com/recipes/ThaiRecipes.pdf"
196 )
197
198 distributed_pgvector_team.print_response(input=query)
199 except Exception as e:
200 print(f"❌ Error: {e}")
201 print("💡 Make sure PostgreSQL with pgvector is running!")
202 print(" Run: ./cookbook/run_pgvector.sh")
203
204
205if __name__ == "__main__":
206 # Choose which demo to run
207
208 # asyncio.run(async_pgvector_rag_demo())
209
210 # complex_query_demo()
211
212 sync_pgvector_rag_demo()

Usage

Set up your virtual environment

1uv venv --python 3.12
2source .venv/bin/activate
1uv venv --python 3.12
2.venv\Scripts\activate

Set up PostgreSQL with pgvector

1./cookbook/run_pgvector.sh

Install required libraries

1uv pip install kern-ai openai sqlalchemy 'psycopg[binary]' pgvector

Set environment variables

1export OPENAI_API_KEY=****

Run the agent

1python cookbook/02_examples/teams/distributed_rag/01_distributed_rag_pgvector.py