Confirmation Required with Async Streaming

This example demonstrates human-in-the-loop functionality with asynchronous streaming responses. It shows how to handle user confirmation during tool execution in an async environment while maintaining real-time streaming.

Create a Python file

1import asyncio
2import json
3
4import httpx
5from kern.agent import Agent
6from kern.db.sqlite import SqliteDb
7from kern.models.openai import OpenAIResponses
8from kern.tools import tool
9from rich.console import Console
10from rich.prompt import Prompt
11
12console = Console()
13
14
15@tool(requires_confirmation=True)
16def get_top_hackernews_stories(num_stories: int) -> str:
17 """Fetch top stories from Hacker News.
18
19 Args:
20 num_stories (int): Number of stories to retrieve
21
22 Returns:
23 str: JSON string containing story details
24 """
25 # Fetch top story IDs
26 response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")
27 story_ids = response.json()
28
29 # Yield story details
30 all_stories = []
31 for story_id in story_ids[:num_stories]:
32 story_response = httpx.get(
33 f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"
34 )
35 story = story_response.json()
36 if "text" in story:
37 story.pop("text", None)
38 all_stories.append(story)
39 return json.dumps(all_stories)
40
41
42agent = Agent(
43 model=OpenAIResponses(id="gpt-5.2"),
44 tools=[get_top_hackernews_stories],
45 db=SqliteDb(session_table="test_session", db_file="tmp/example.db"),
46 markdown=True,
47)
48
49
50async def main():
51 async for run_event in agent.arun(
52 "Fetch the top 2 hackernews stories", stream=True
53 ):
54 if run_event.is_paused:
55 for requirement in run_event.active_requirements:
56 if requirement.needs_confirmation:
57 # Ask for confirmation
58 console.print(
59 f"Tool name [bold blue]{requirement.tool_execution.tool_name}({requirement.tool_execution.tool_args})[/] requires confirmation."
60 )
61 message = (
62 Prompt.ask(
63 "Do you want to continue?", choices=["y", "n"], default="y"
64 )
65 .strip()
66 .lower()
67 )
68
69 if message == "n":
70 requirement.reject()
71 else:
72 requirement.confirm()
73
74 async for resp in agent.acontinue_run( # type: ignore
75 run_id=run_event.run_id, updated_tools=run_event.tools, stream=True
76 ):
77 print(resp.content, end="")
78
79 # Or for simple debug flow
80 # await agent.aprint_response("Fetch the top 2 hackernews stories", stream=True)
81
82
83if __name__ == "__main__":
84 asyncio.run(main())

Set up your virtual environment

1uv venv --python 3.12
2source .venv/bin/activate
1uv venv --python 3.12
2.venv\Scripts\activate

Install dependencies

1uv pip install -U kern-ai openai httpx rich

Export your OpenAI API key

1export OPENAI_API_KEY="your_openai_api_key_here"
1$Env:OPENAI_API_KEY="your_openai_api_key_here"

Run Agent

1python confirmation_required_stream_async.py