Building Custom Providers
Create your own context provider for any data source.
When the built-in providers don't fit, subclass ContextProvider. The base class handles tool wrapping, name derivation, and error shaping.
Minimal Example
1from kern.agent import Agent2from kern.context import Answer, ContextProvider, Status34FAQ = {"pricing": "See kern.ndx.rocks/pricing", "support": "Email help@kern.ndx.rocks"}56class FAQContextProvider(ContextProvider):7 def status(self) -> Status:8 return Status(ok=True, detail=f"{len(FAQ)} entries")910 async def astatus(self) -> Status:11 return self.status()1213 def query(self, question: str, *, run_context=None) -> Answer:14 key = next((k for k in FAQ if k in question.lower()), None)15 return Answer(text=FAQ[key] if key else "No FAQ entry matches that.")1617 async def aquery(self, question: str, *, run_context=None) -> Answer:18 return self.query(question, run_context=run_context)1920faq = FAQContextProvider(id="faq")21agent = Agent(model=..., tools=faq.get_tools())The agent now has a query_faq tool. Same shape as every built-in provider.
Required Methods
You must implement these four abstract methods:
| Method | Purpose |
|---|---|
query(question, *, run_context=None) -> Answer | Sync read |
aquery(question, *, run_context=None) -> Answer | Async read |
status() -> Status | Sync health check |
astatus() -> Status | Async health check |
Answer
Answer is what query() returns:
1from kern.context import Answer, Document23# Text-only answer4return Answer(text="The weather is sunny.")56# Answer with source documents7return Answer(8 text="Found 3 matching policies.",9 results=[10 Document(id="doc1", name="Refund Policy", uri="/policies/refund.md", snippet="..."),11 Document(id="doc2", name="Privacy Policy", uri="/policies/privacy.md", snippet="..."),12 ]13)Status
Status reports provider health:
1from kern.context import Status23# Healthy4return Status(ok=True, detail="Connected to database")56# Unhealthy7return Status(ok=False, detail="API key invalid")Optional Methods
Override these to customize behavior:
| Method | Default | Override when |
|---|---|---|
update() / aupdate() | Raises NotImplementedError | Provider supports writes |
asetup() | No-op | Need async init (MCP sessions, cache priming) |
aclose() | No-op | Hold long-lived state (watches, connections) |
instructions() | Generic guidance | Want source-specific usage hints |
Adding Write Support
Override update(), aupdate(), and _default_tools() for writable providers:
1class NotesContextProvider(ContextProvider):2 def __init__(self, id: str, storage: dict):3 super().__init__(id)4 self.storage = storage56 def query(self, question: str, *, run_context=None) -> Answer:7 matches = [v for k, v in self.storage.items() if question.lower() in k.lower()]8 return Answer(text="\n".join(matches) if matches else "No matching notes.")910 async def aquery(self, question: str, *, run_context=None) -> Answer:11 return self.query(question, run_context=run_context)1213 def update(self, instruction: str, *, run_context=None) -> Answer:14 if instruction.startswith("save note:"):15 parts = instruction[10:].split(" - ", 1)16 if len(parts) == 2:17 self.storage[parts[0].strip()] = parts[1].strip()18 return Answer(text=f"Saved note: {parts[0].strip()}")19 return Answer(text="Could not parse instruction. Use: save note: <title> - <content>")2021 async def aupdate(self, instruction: str, *, run_context=None) -> Answer:22 return self.update(instruction, run_context=run_context)2324 def _default_tools(self) -> list:25 return self._read_write_tools() # Exposes both query and update tools2627 def status(self) -> Status:28 return Status(ok=True, detail=f"{len(self.storage)} notes")2930 async def astatus(self) -> Status:31 return self.status()Now the agent has both query_notes and update_notes tools.
Async Lifecycle
For providers that need setup and teardown:
1class StreamingAPIContextProvider(ContextProvider):2 def __init__(self, id: str, api_url: str):3 super().__init__(id)4 self.api_url = api_url5 self.session = None67 async def asetup(self) -> None:8 import aiohttp9 self.session = aiohttp.ClientSession()1011 async def aclose(self) -> None:12 if self.session:13 await self.session.close()1415 async def aquery(self, question: str, *, run_context=None) -> Answer:16 async with self.session.get(f"{self.api_url}/search", params={"q": question}) as resp:17 data = await resp.json()18 return Answer(text=data.get("answer", "No answer found."))1920 # ... implement query, status, astatusCustom Instructions
Override instructions() to provide source-specific guidance:
1def instructions(self) -> str:2 return """3 Use query_jira for:4 - Finding issues by key (e.g., "PROJ-123")5 - Searching by assignee, status, or labels6 - Getting sprint information78 Use update_jira for:9 - Changing issue status10 - Adding comments11 - Updating assignee12 """Using RunContext
The run_context parameter carries caller state. Use it for per-user behavior:
1def query(self, question: str, *, run_context=None) -> Answer:2 user_id = run_context.user_id if run_context else None34 if user_id:5 # Fetch user-specific data6 user_docs = self.get_docs_for_user(user_id)7 return Answer(text=self.search(question, user_docs))89 # Fall back to global search10 return Answer(text=self.search(question, self.all_docs))Available on run_context:
user_id— identifies the callersession_id— identifies the conversationmetadata— arbitrary dict passed through the call chaindependencies— values injected via agent'sdependenciesparameter
Wrapping External APIs
Pattern for wrapping a REST API:
1import httpx2from kern.context import Answer, ContextProvider, Status34class WeatherContextProvider(ContextProvider):5 def __init__(self, id: str, api_key: str):6 super().__init__(id, write=False) # Read-only7 self.api_key = api_key8 self.client = httpx.Client()910 def query(self, question: str, *, run_context=None) -> Answer:11 # Extract city from question (simplified)12 city = question.replace("weather in", "").strip()1314 resp = self.client.get(15 "https://api.weather.com/v1/current",16 params={"city": city, "key": self.api_key}17 )18 data = resp.json()1920 return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")2122 async def aquery(self, question: str, *, run_context=None) -> Answer:23 # Use async client for async version24 async with httpx.AsyncClient() as client:25 city = question.replace("weather in", "").strip()26 resp = await client.get(27 "https://api.weather.com/v1/current",28 params={"city": city, "key": self.api_key}29 )30 data = resp.json()31 return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")3233 def status(self) -> Status:34 try:35 self.client.get("https://api.weather.com/health")36 return Status(ok=True, detail="API reachable")37 except Exception as e:38 return Status(ok=False, detail=str(e))3940 async def astatus(self) -> Status:41 return self.status()