Async Events Streaming
This example demonstrates how to stream events from a workflow.
Code
1import asyncio2from textwrap import dedent3from typing import AsyncIterator45from kern.agent import Agent6from kern.db.sqlite import SqliteDb7from kern.models.openai import OpenAIResponses8from kern.run.workflow import WorkflowRunOutputEvent, WorkflowRunEvent9from kern.team import Team10from kern.tools.hackernews import HackerNewsTools11from kern.tools.yfinance import YFinanceTools12from kern.workflow.types import StepInput, StepOutput13from kern.workflow.workflow import Workflow1415# Define agents16hackernews_agent = Agent(17 name="Hackernews Agent",18 model=OpenAIResponses(id="gpt-5.2"),19 tools=[HackerNewsTools()],20 role="Extract key insights and content from Hackernews posts",21)22finance_agent = Agent(23 name="Finance Agent",24 model=OpenAIResponses(id="gpt-5.2"),25 tools=[YFinanceTools()],26 role="Get stock prices and financial data",27)2829writer_agent = Agent(30 name="Writer Agent",31 model=OpenAIResponses(id="gpt-5.2"),32 instructions="Write a blog post on the topic",33)343536async def prepare_input_for_web_search(37 step_input: StepInput,38) -> AsyncIterator[StepOutput]:39 """Generator function that yields StepOutput"""40 topic = step_input.input4142 # Create proper StepOutput content43 content = dedent(f"""\44 I'm writing a blog post on the topic45 <topic>46 {topic}47 </topic>4849 Search the web for atleast 10 articles\50 """)5152 # Yield a StepOutput as the final result53 yield StepOutput(content=content)545556async def prepare_input_for_writer(step_input: StepInput) -> AsyncIterator[StepOutput]:57 """Generator function that yields StepOutput"""58 topic = step_input.input59 research_team_output = step_input.previous_step_content6061 # Create proper StepOutput content62 content = dedent(f"""\63 I'm writing a blog post on the topic:64 <topic>65 {topic}66 </topic>6768 Here is information from the web:69 <research_results>70 {research_team_output}71 </research_results>\72 """)7374 # Yield a StepOutput as the final result75 yield StepOutput(content=content)767778# Define research team for complex analysis79research_team = Team(80 name="Research Team",81 members=[hackernews_agent, finance_agent],82 instructions="Research tech topics from Hackernews and the web",83)848586async def main():87 content_creation_workflow = Workflow(88 name="Blog Post Workflow",89 description="Automated blog post creation from Hackernews and the web",90 steps=[91 prepare_input_for_web_search,92 research_team,93 prepare_input_for_writer,94 writer_agent,95 ],96 db=SqliteDb(97 session_table="workflow_session",98 db_file="tmp/workflow.db",99 ),100 )101102 resp: AsyncIterator[WorkflowRunOutputEvent] = content_creation_workflow.arun(103 input="AI trends in 2024",104 markdown=True,105 stream=True,106 stream_events=True,107 )108 async for event in resp:109 if event.event == WorkflowRunEvent.condition_execution_started.value:110 print(event)111 print()112 elif event.event == WorkflowRunEvent.condition_execution_completed.value:113 print(event)114 print()115 elif event.event == WorkflowRunEvent.workflow_started.value:116 print(event)117 print()118 elif event.event == WorkflowRunEvent.step_started.value:119 print(event)120 print()121 elif event.event == WorkflowRunEvent.step_completed.value:122 print(event)123 print()124 elif event.event == WorkflowRunEvent.workflow_completed.value:125 print(event)126 print()127128129if __name__ == "__main__":130 asyncio.run(main())