Async Events Streaming

This example demonstrates how to stream events from a workflow.

Code

1import asyncio
2from textwrap import dedent
3from typing import AsyncIterator
4
5from kern.agent import Agent
6from kern.db.sqlite import SqliteDb
7from kern.models.openai import OpenAIResponses
8from kern.run.workflow import WorkflowRunOutputEvent, WorkflowRunEvent
9from kern.team import Team
10from kern.tools.hackernews import HackerNewsTools
11from kern.tools.yfinance import YFinanceTools
12from kern.workflow.types import StepInput, StepOutput
13from kern.workflow.workflow import Workflow
14
15# Define agents
16hackernews_agent = Agent(
17 name="Hackernews Agent",
18 model=OpenAIResponses(id="gpt-5.2"),
19 tools=[HackerNewsTools()],
20 role="Extract key insights and content from Hackernews posts",
21)
22finance_agent = Agent(
23 name="Finance Agent",
24 model=OpenAIResponses(id="gpt-5.2"),
25 tools=[YFinanceTools()],
26 role="Get stock prices and financial data",
27)
28
29writer_agent = Agent(
30 name="Writer Agent",
31 model=OpenAIResponses(id="gpt-5.2"),
32 instructions="Write a blog post on the topic",
33)
34
35
36async def prepare_input_for_web_search(
37 step_input: StepInput,
38) -> AsyncIterator[StepOutput]:
39 """Generator function that yields StepOutput"""
40 topic = step_input.input
41
42 # Create proper StepOutput content
43 content = dedent(f"""\
44 I'm writing a blog post on the topic
45 <topic>
46 {topic}
47 </topic>
48
49 Search the web for atleast 10 articles\
50 """)
51
52 # Yield a StepOutput as the final result
53 yield StepOutput(content=content)
54
55
56async def prepare_input_for_writer(step_input: StepInput) -> AsyncIterator[StepOutput]:
57 """Generator function that yields StepOutput"""
58 topic = step_input.input
59 research_team_output = step_input.previous_step_content
60
61 # Create proper StepOutput content
62 content = dedent(f"""\
63 I'm writing a blog post on the topic:
64 <topic>
65 {topic}
66 </topic>
67
68 Here is information from the web:
69 <research_results>
70 {research_team_output}
71 </research_results>\
72 """)
73
74 # Yield a StepOutput as the final result
75 yield StepOutput(content=content)
76
77
78# Define research team for complex analysis
79research_team = Team(
80 name="Research Team",
81 members=[hackernews_agent, finance_agent],
82 instructions="Research tech topics from Hackernews and the web",
83)
84
85
86async def main():
87 content_creation_workflow = Workflow(
88 name="Blog Post Workflow",
89 description="Automated blog post creation from Hackernews and the web",
90 steps=[
91 prepare_input_for_web_search,
92 research_team,
93 prepare_input_for_writer,
94 writer_agent,
95 ],
96 db=SqliteDb(
97 session_table="workflow_session",
98 db_file="tmp/workflow.db",
99 ),
100 )
101
102 resp: AsyncIterator[WorkflowRunOutputEvent] = content_creation_workflow.arun(
103 input="AI trends in 2024",
104 markdown=True,
105 stream=True,
106 stream_events=True,
107 )
108 async for event in resp:
109 if event.event == WorkflowRunEvent.condition_execution_started.value:
110 print(event)
111 print()
112 elif event.event == WorkflowRunEvent.condition_execution_completed.value:
113 print(event)
114 print()
115 elif event.event == WorkflowRunEvent.workflow_started.value:
116 print(event)
117 print()
118 elif event.event == WorkflowRunEvent.step_started.value:
119 print(event)
120 print()
121 elif event.event == WorkflowRunEvent.step_completed.value:
122 print(event)
123 print()
124 elif event.event == WorkflowRunEvent.workflow_completed.value:
125 print(event)
126 print()
127
128
129if __name__ == "__main__":
130 asyncio.run(main())