User Input Required Stream Async

This example demonstrates how to use the requires_user_input parameter with async streaming responses. It shows how to collect specific user input fields in an asynchronous environment while maintaining real-time streaming.

Create a Python file

1import asyncio
2from typing import List
3
4from kern.agent import Agent
5from kern.db.sqlite import SqliteDb
6from kern.models.openai import OpenAIResponses
7from kern.tools import tool
8from kern.tools.function import UserInputField
9
10
11# You can either specify the user_input_fields leave empty for all fields to be provided by the user
12@tool(requires_user_input=True, user_input_fields=["to_address"])
13def send_email(subject: str, body: str, to_address: str) -> str:
14 """
15 Send an email.
16
17 Args:
18 subject (str): The subject of the email.
19 body (str): The body of the email.
20 to_address (str): The address to send the email to.
21 """
22 return f"Sent email to {to_address} with subject {subject} and body {body}"
23
24
25agent = Agent(
26 model=OpenAIResponses(id="gpt-5.2"),
27 tools=[send_email],
28 markdown=True,
29 db=SqliteDb(session_table="test_session", db_file="tmp/example.db"),
30)
31
32
33async def main():
34 async for run_event in agent.arun(
35 "Send an email with the subject 'Hello' and the body 'Hello, world!'",
36 stream=True,
37 ):
38 if run_event.is_paused: # Or agent.run_response.is_paused
39 for requirement in run_event.active_requirements:
40 if requirement.needs_user_input:
41 input_schema: List[UserInputField] = requirement.user_input_schema # type: ignore
42
43 for field in input_schema:
44 # Get user input for each field in the schema
45 field_type = field.field_type
46 field_description = field.description
47
48 # Display field information to the user
49 print(f"\nField: {field.name}")
50 print(f"Description: {field_description}")
51 print(f"Type: {field_type}")
52
53 # Get user input
54 if field.value is None:
55 user_value = input(f"Please enter a value for {field.name}: ")
56 else:
57 print(f"Value: {field.value}")
58 user_value = field.value
59
60 # Update the field value
61 field.value = user_value
62
63 async for resp in agent.acontinue_run( # type: ignore
64 run_id=run_event.run_id,
65 updated_tools=run_event.tools,
66 stream=True,
67 ):
68 print(resp.content, end="")
69
70 # Or for simple debug flow
71 # agent.aprint_response("Send an email with the subject 'Hello' and the body 'Hello, world!'")
72
73
74if __name__ == "__main__":
75 asyncio.run(main())

Set up your virtual environment

1uv venv --python 3.12
2source .venv/bin/activate
1uv venv --python 3.12
2.venv\Scripts\activate

Install dependencies

1uv pip install -U kern-ai openai

Export your OpenAI API key

1export OPENAI_API_KEY="your_openai_api_key_here"
1$Env:OPENAI_API_KEY="your_openai_api_key_here"

Run Agent

1python user_input_required_stream_async.py