Confirmation Required with Async Streaming
This example demonstrates human-in-the-loop functionality with asynchronous streaming responses. It shows how to handle user confirmation during tool execution in an async environment while maintaining real-time streaming.
Create a Python file
1import asyncio2import json34import httpx5from kern.agent import Agent6from kern.db.sqlite import SqliteDb7from kern.models.openai import OpenAIResponses8from kern.tools import tool9from rich.console import Console10from rich.prompt import Prompt1112console = Console()131415@tool(requires_confirmation=True)16def get_top_hackernews_stories(num_stories: int) -> str:17 """Fetch top stories from Hacker News.1819 Args:20 num_stories (int): Number of stories to retrieve2122 Returns:23 str: JSON string containing story details24 """25 # Fetch top story IDs26 response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json")27 story_ids = response.json()2829 # Yield story details30 all_stories = []31 for story_id in story_ids[:num_stories]:32 story_response = httpx.get(33 f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json"34 )35 story = story_response.json()36 if "text" in story:37 story.pop("text", None)38 all_stories.append(story)39 return json.dumps(all_stories)404142agent = Agent(43 model=OpenAIResponses(id="gpt-5.2"),44 tools=[get_top_hackernews_stories],45 db=SqliteDb(session_table="test_session", db_file="tmp/example.db"),46 markdown=True,47)484950async def main():51 async for run_event in agent.arun(52 "Fetch the top 2 hackernews stories", stream=True53 ):54 if run_event.is_paused:55 for requirement in run_event.active_requirements:56 if requirement.needs_confirmation:57 # Ask for confirmation58 console.print(59 f"Tool name [bold blue]{requirement.tool_execution.tool_name}({requirement.tool_execution.tool_args})[/] requires confirmation."60 )61 message = (62 Prompt.ask(63 "Do you want to continue?", choices=["y", "n"], default="y"64 )65 .strip()66 .lower()67 )6869 if message == "n":70 requirement.reject()71 else:72 requirement.confirm()7374 async for resp in agent.acontinue_run( # type: ignore75 run_id=run_event.run_id, updated_tools=run_event.tools, stream=True76 ):77 print(resp.content, end="")7879 # Or for simple debug flow80 # await agent.aprint_response("Fetch the top 2 hackernews stories", stream=True)818283if __name__ == "__main__":84 asyncio.run(main())Set up your virtual environment
1uv venv --python 3.122source .venv/bin/activate1uv venv --python 3.122.venv\Scripts\activateInstall dependencies
1uv pip install -U kern-ai openai httpx richExport your OpenAI API key
1export OPENAI_API_KEY="your_openai_api_key_here"1$Env:OPENAI_API_KEY="your_openai_api_key_here"Run Agent
1python confirmation_required_stream_async.py