Async Streaming: Confirm Member Agent Tool

Same as confirmation_required_stream.py but uses async run/continue_run.

1"""Team HITL Async Streaming: Member agent tool requiring confirmation.
2
3Same as confirmation_required_stream.py but uses async run/continue_run.
4
5Note: When streaming with member agents, use isinstance() with TeamRunPausedEvent
6to distinguish the team's pause from member agent pauses.
7"""
8
9import asyncio
10
11from kern.agent import Agent
12from kern.db.sqlite import SqliteDb
13from kern.models.openai import OpenAIResponses
14from kern.run.team import RunPausedEvent as TeamRunPausedEvent
15from kern.team.team import Team
16from kern.tools import tool
17from kern.utils import pprint
18
19# ---------------------------------------------------------------------------
20# Setup
21# ---------------------------------------------------------------------------
22db = SqliteDb(db_file="tmp/team_hitl_stream.db")
23
24
25# ---------------------------------------------------------------------------
26# Tools
27# ---------------------------------------------------------------------------
28@tool(requires_confirmation=True)
29def deploy_to_production(app_name: str, version: str) -> str:
30 """Deploy an application to production.
31
32 Args:
33 app_name (str): Name of the application
34 version (str): Version to deploy
35 """
36 return f"Successfully deployed {app_name} v{version} to production"
37
38
39# ---------------------------------------------------------------------------
40# Create Members
41# ---------------------------------------------------------------------------
42deploy_agent = Agent(
43 name="Deploy Agent",
44 role="Handles deployments to production",
45 model=OpenAIResponses(id="gpt-5.2-mini"),
46 tools=[deploy_to_production],
47 db=db,
48)
49
50
51# ---------------------------------------------------------------------------
52# Create Team
53# ---------------------------------------------------------------------------
54team = Team(
55 name="DevOps Team",
56 members=[deploy_agent],
57 model=OpenAIResponses(id="gpt-5.2-mini"),
58 db=db,
59)
60
61
62async def main():
63 async for run_event in team.arun(
64 "Deploy the payments app version 2.1 to production", stream=True
65 ):
66 # Use isinstance to check for team's pause event (not the member agent's)
67 if isinstance(run_event, TeamRunPausedEvent):
68 print("Team paused - requires confirmation")
69 for req in run_event.active_requirements:
70 if req.needs_confirmation:
71 print(f" Tool: {req.tool_execution.tool_name}")
72 print(f" Args: {req.tool_execution.tool_args}")
73 req.confirm()
74
75 # Use apprint_run_response for async streaming
76 response = team.acontinue_run(
77 run_id=run_event.run_id,
78 session_id=run_event.session_id,
79 requirements=run_event.requirements,
80 stream=True,
81 )
82 await pprint.apprint_run_response(response)
83
84
85# ---------------------------------------------------------------------------
86# Run Team
87# ---------------------------------------------------------------------------
88if __name__ == "__main__":
89 asyncio.run(main())

Run the Example

1# Clone and setup repo
2git clone https://github.com/kern-ai/kern.git
3cd kern/cookbook/03_teams/human_in_the_loop
4
5# Create and activate virtual environment
6./scripts/demo_setup.sh
7source .venvs/demo/bin/activate
8
9python confirmation_required_async_stream.py