Building Custom Providers

Create your own context provider for any data source.

When the built-in providers don't fit, subclass ContextProvider. The base class handles tool wrapping, name derivation, and error shaping.

Minimal Example

1from kern.agent import Agent
2from kern.context import Answer, ContextProvider, Status
3
4FAQ = {"pricing": "See kern.ndx.rocks/pricing", "support": "Email help@kern.ndx.rocks"}
5
6class FAQContextProvider(ContextProvider):
7 def status(self) -> Status:
8 return Status(ok=True, detail=f"{len(FAQ)} entries")
9
10 async def astatus(self) -> Status:
11 return self.status()
12
13 def query(self, question: str, *, run_context=None) -> Answer:
14 key = next((k for k in FAQ if k in question.lower()), None)
15 return Answer(text=FAQ[key] if key else "No FAQ entry matches that.")
16
17 async def aquery(self, question: str, *, run_context=None) -> Answer:
18 return self.query(question, run_context=run_context)
19
20faq = FAQContextProvider(id="faq")
21agent = Agent(model=..., tools=faq.get_tools())

The agent now has a query_faq tool. Same shape as every built-in provider.

Required Methods

You must implement these four abstract methods:

MethodPurpose
query(question, *, run_context=None) -> AnswerSync read
aquery(question, *, run_context=None) -> AnswerAsync read
status() -> StatusSync health check
astatus() -> StatusAsync health check

Answer

Answer is what query() returns:

1from kern.context import Answer, Document
2
3# Text-only answer
4return Answer(text="The weather is sunny.")
5
6# Answer with source documents
7return Answer(
8 text="Found 3 matching policies.",
9 results=[
10 Document(id="doc1", name="Refund Policy", uri="/policies/refund.md", snippet="..."),
11 Document(id="doc2", name="Privacy Policy", uri="/policies/privacy.md", snippet="..."),
12 ]
13)

Status

Status reports provider health:

1from kern.context import Status
2
3# Healthy
4return Status(ok=True, detail="Connected to database")
5
6# Unhealthy
7return Status(ok=False, detail="API key invalid")

Optional Methods

Override these to customize behavior:

MethodDefaultOverride when
update() / aupdate()Raises NotImplementedErrorProvider supports writes
asetup()No-opNeed async init (MCP sessions, cache priming)
aclose()No-opHold long-lived state (watches, connections)
instructions()Generic guidanceWant source-specific usage hints

Adding Write Support

Override update(), aupdate(), and _default_tools() for writable providers:

1class NotesContextProvider(ContextProvider):
2 def __init__(self, id: str, storage: dict):
3 super().__init__(id)
4 self.storage = storage
5
6 def query(self, question: str, *, run_context=None) -> Answer:
7 matches = [v for k, v in self.storage.items() if question.lower() in k.lower()]
8 return Answer(text="\n".join(matches) if matches else "No matching notes.")
9
10 async def aquery(self, question: str, *, run_context=None) -> Answer:
11 return self.query(question, run_context=run_context)
12
13 def update(self, instruction: str, *, run_context=None) -> Answer:
14 if instruction.startswith("save note:"):
15 parts = instruction[10:].split(" - ", 1)
16 if len(parts) == 2:
17 self.storage[parts[0].strip()] = parts[1].strip()
18 return Answer(text=f"Saved note: {parts[0].strip()}")
19 return Answer(text="Could not parse instruction. Use: save note: <title> - <content>")
20
21 async def aupdate(self, instruction: str, *, run_context=None) -> Answer:
22 return self.update(instruction, run_context=run_context)
23
24 def _default_tools(self) -> list:
25 return self._read_write_tools() # Exposes both query and update tools
26
27 def status(self) -> Status:
28 return Status(ok=True, detail=f"{len(self.storage)} notes")
29
30 async def astatus(self) -> Status:
31 return self.status()

Now the agent has both query_notes and update_notes tools.

Async Lifecycle

For providers that need setup and teardown:

1class StreamingAPIContextProvider(ContextProvider):
2 def __init__(self, id: str, api_url: str):
3 super().__init__(id)
4 self.api_url = api_url
5 self.session = None
6
7 async def asetup(self) -> None:
8 import aiohttp
9 self.session = aiohttp.ClientSession()
10
11 async def aclose(self) -> None:
12 if self.session:
13 await self.session.close()
14
15 async def aquery(self, question: str, *, run_context=None) -> Answer:
16 async with self.session.get(f"{self.api_url}/search", params={"q": question}) as resp:
17 data = await resp.json()
18 return Answer(text=data.get("answer", "No answer found."))
19
20 # ... implement query, status, astatus

Custom Instructions

Override instructions() to provide source-specific guidance:

1def instructions(self) -> str:
2 return """
3 Use query_jira for:
4 - Finding issues by key (e.g., "PROJ-123")
5 - Searching by assignee, status, or labels
6 - Getting sprint information
7
8 Use update_jira for:
9 - Changing issue status
10 - Adding comments
11 - Updating assignee
12 """

Using RunContext

The run_context parameter carries caller state. Use it for per-user behavior:

1def query(self, question: str, *, run_context=None) -> Answer:
2 user_id = run_context.user_id if run_context else None
3
4 if user_id:
5 # Fetch user-specific data
6 user_docs = self.get_docs_for_user(user_id)
7 return Answer(text=self.search(question, user_docs))
8
9 # Fall back to global search
10 return Answer(text=self.search(question, self.all_docs))

Available on run_context:

  • user_id — identifies the caller
  • session_id — identifies the conversation
  • metadata — arbitrary dict passed through the call chain
  • dependencies — values injected via agent's dependencies parameter

Wrapping External APIs

Pattern for wrapping a REST API:

1import httpx
2from kern.context import Answer, ContextProvider, Status
3
4class WeatherContextProvider(ContextProvider):
5 def __init__(self, id: str, api_key: str):
6 super().__init__(id, write=False) # Read-only
7 self.api_key = api_key
8 self.client = httpx.Client()
9
10 def query(self, question: str, *, run_context=None) -> Answer:
11 # Extract city from question (simplified)
12 city = question.replace("weather in", "").strip()
13
14 resp = self.client.get(
15 "https://api.weather.com/v1/current",
16 params={"city": city, "key": self.api_key}
17 )
18 data = resp.json()
19
20 return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")
21
22 async def aquery(self, question: str, *, run_context=None) -> Answer:
23 # Use async client for async version
24 async with httpx.AsyncClient() as client:
25 city = question.replace("weather in", "").strip()
26 resp = await client.get(
27 "https://api.weather.com/v1/current",
28 params={"city": city, "key": self.api_key}
29 )
30 data = resp.json()
31 return Answer(text=f"Weather in {city}: {data['temp']}F, {data['condition']}")
32
33 def status(self) -> Status:
34 try:
35 self.client.get("https://api.weather.com/health")
36 return Status(ok=True, detail="API reachable")
37 except Exception as e:
38 return Status(ok=False, detail=str(e))
39
40 async def astatus(self) -> Status:
41 return self.status()

Next