Running Workflows
Execute workflows with Workflow.run() and process their output.
The Workflow.run() function runs the agent and generates a response, either as a WorkflowRunOutput object or a stream of WorkflowRunOutput objects.
Many of our examples use workflow.print_response() which is a helper utility to print the response in the terminal. This uses workflow.run() under the hood.
Running your Workflow
Here's how to run your workflow. The response is captured in the response.
1from kern.agent import Agent2from kern.models.openai import OpenAIResponses3from kern.db.sqlite import SqliteDb4from kern.team import Team5from kern.tools.hackernews import HackerNewsTools6from kern.tools.yfinance import YFinanceTools7from kern.workflow import Step, Workflow8from kern.run.workflow import WorkflowRunOutput9from kern.utils.pprint import pprint_run_response1011# Define agents12hackernews_agent = Agent(13 name="Hackernews Agent",14 model=OpenAIResponses(id="gpt-5.2"),15 tools=[HackerNewsTools()],16 role="Extract key insights and content from Hackernews posts",17)18finance_agent = Agent(19 name="Finance Agent",20 model=OpenAIResponses(id="gpt-5.2"),21 tools=[YFinanceTools()],22 role="Get stock prices and financial data",23)2425# Define research team for complex analysis26research_team = Team(27 name="Research Team",28 members=[hackernews_agent, finance_agent],29 instructions="Research tech topics and related stocks",30)3132content_planner = Agent(33 name="Content Planner",34 model=OpenAIResponses(id="gpt-5.2"),35 instructions=[36 "Plan a content schedule over 4 weeks for the provided topic and research content",37 "Ensure that I have posts for 3 posts per week",38 ],39)4041content_creation_workflow = Workflow(42 name="Content Creation Workflow",43 description="Automated content creation from blog posts to social media",44 db=SqliteDb(db_file="tmp/workflow.db"),45 steps=[research_team, content_planner],46)4748# Create and use workflow49if __name__ == "__main__":50 response: WorkflowRunOutput = content_creation_workflow.run(51 input="AI trends in 2024",52 markdown=True,53 )5455 pprint_run_response(response, markdown=True)The Workflow.run() function returns a WorkflowRunOutput object when not
streaming. Here is detailed documentation for
WorkflowRunOutput.
Async Execution
The Workflow.arun() function is the async version of Workflow.run().
Here is an example of how to use it:
1from typing import AsyncIterator2import asyncio34from kern.agent import Agent5from kern.tools.hackernews import HackerNewsTools6from kern.workflow import Condition, Step, Workflow, StepInput7from kern.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent89# === BASIC AGENTS ===10researcher = Agent(11 name="Researcher",12 instructions="Research the given topic and provide detailed findings.",13 tools=[HackerNewsTools()],14)1516summarizer = Agent(17 name="Summarizer",18 instructions="Create a clear summary of the research findings.",19)2021fact_checker = Agent(22 name="Fact Checker",23 instructions="Verify facts and check for accuracy in the research.",24 tools=[HackerNewsTools()],25)2627writer = Agent(28 name="Writer",29 instructions="Write a comprehensive article based on all available research and verification.",30)3132# === CONDITION EVALUATOR ===33def needs_fact_checking(step_input: StepInput) -> bool:34 """Determine if the research contains claims that need fact-checking"""35 summary = step_input.previous_step_content or ""3637 # Look for keywords that suggest factual claims38 fact_indicators = [39 "study shows",40 "breakthroughs",41 "research indicates",42 "according to",43 "statistics",44 "data shows",45 "survey",46 "report",47 "million",48 "billion",49 "percent",50 "%",51 "increase",52 "decrease",53 ]5455 return any(indicator in summary.lower() for indicator in fact_indicators)565758# === WORKFLOW STEPS ===59research_step = Step(60 name="research",61 description="Research the topic",62 agent=researcher,63)6465summarize_step = Step(66 name="summarize",67 description="Summarize research findings",68 agent=summarizer,69)7071# Conditional fact-checking step72fact_check_step = Step(73 name="fact_check",74 description="Verify facts and claims",75 agent=fact_checker,76)7778write_article = Step(79 name="write_article",80 description="Write final article",81 agent=writer,82)8384# === BASIC LINEAR WORKFLOW ===85basic_workflow = Workflow(86 name="Basic Linear Workflow",87 description="Research -> Summarize -> Condition(Fact Check) -> Write Article",88 steps=[89 research_step,90 summarize_step,91 Condition(92 name="fact_check_condition",93 description="Check if fact-checking is needed",94 evaluator=needs_fact_checking,95 steps=[fact_check_step],96 ),97 write_article,98 ],99)100101async def main():102 try:103 response: WorkflowRunOutput = await basic_workflow.arun(104 input="Recent breakthroughs in quantum computing",105 )106 pprint_run_response(response, markdown=True)107 except Exception as e:108 print(f"Error: {e}")109110if __name__ == "__main__":111 asyncio.run(main())Streaming Responses
To enable streaming, set stream=True when calling run(). This will return an iterator of WorkflowRunOutputEvent objects instead of a single response.
1# Define your agents/team2...34content_creation_workflow = Workflow(5 name="Content Creation Workflow",6 description="Automated content creation from blog posts to social media",7 db=SqliteDb(db_file="tmp/workflow.db"),8 steps=[research_team, content_planner],9)1011# Create and use workflow12if __name__ == "__main__":13 response: Iterator[WorkflowRunOutputEvent] = content_creation_workflow.run(14 input="AI trends in 2024",15 markdown=True,16 stream=True,17 )1819 pprint_run_response(response, markdown=True)Streaming all events
By default, when you stream a response, only the WorkflowStartedEvent and WorkflowCompletedEvent events will be streamed (together with all the Agent and Team events).
You can also stream all events by setting stream_events=True.
This will provide real-time updates about the workflow's internal processes:
1from typing import Iterator23from kern.agent import Agent4from kern.tools.hackernews import HackerNewsTools5from kern.workflow import Condition, Step, Workflow, StepInput6from kern.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent78# === BASIC AGENTS ===9researcher = Agent(10 name="Researcher",11 instructions="Research the given topic and provide detailed findings.",12 tools=[HackerNewsTools()],13)1415summarizer = Agent(16 name="Summarizer",17 instructions="Create a clear summary of the research findings.",18)1920fact_checker = Agent(21 name="Fact Checker",22 instructions="Verify facts and check for accuracy in the research.",23 tools=[HackerNewsTools()],24)2526writer = Agent(27 name="Writer",28 instructions="Write a comprehensive article based on all available research and verification.",29)3031# === CONDITION EVALUATOR ===32def needs_fact_checking(step_input: StepInput) -> bool:33 """Determine if the research contains claims that need fact-checking"""34 summary = step_input.previous_step_content or ""3536 # Look for keywords that suggest factual claims37 fact_indicators = [38 "study shows",39 "breakthroughs",40 "research indicates",41 "according to",42 "statistics",43 "data shows",44 "survey",45 "report",46 "million",47 "billion",48 "percent",49 "%",50 "increase",51 "decrease",52 ]5354 return any(indicator in summary.lower() for indicator in fact_indicators)555657# === WORKFLOW STEPS ===58research_step = Step(59 name="research",60 description="Research the topic",61 agent=researcher,62)6364summarize_step = Step(65 name="summarize",66 description="Summarize research findings",67 agent=summarizer,68)6970# Conditional fact-checking step71fact_check_step = Step(72 name="fact_check",73 description="Verify facts and claims",74 agent=fact_checker,75)7677write_article = Step(78 name="write_article",79 description="Write final article",80 agent=writer,81)8283# === BASIC LINEAR WORKFLOW ===84basic_workflow = Workflow(85 name="Basic Linear Workflow",86 description="Research -> Summarize -> Condition(Fact Check) -> Write Article",87 steps=[88 research_step,89 summarize_step,90 Condition(91 name="fact_check_condition",92 description="Check if fact-checking is needed",93 evaluator=needs_fact_checking,94 steps=[fact_check_step],95 ),96 write_article,97 ],98)99100if __name__ == "__main__":101 try:102 response: Iterator[WorkflowRunOutputEvent] = basic_workflow.run(103 input="Recent breakthroughs in quantum computing",104 stream=True,105 stream_events=True,106 )107 for event in response:108 if event.event == WorkflowRunEvent.condition_execution_started.value:109 print(event)110 print()111 elif event.event == WorkflowRunEvent.condition_execution_completed.value:112 print(event)113 print()114 elif event.event == WorkflowRunEvent.workflow_started.value:115 print(event)116 print()117 elif event.event == WorkflowRunEvent.step_started.value:118 print(event)119 print()120 elif event.event == WorkflowRunEvent.step_completed.value:121 print(event)122 print()123 elif event.event == WorkflowRunEvent.workflow_completed.value:124 print(event)125 print()126 except Exception as e:127 print(f"Error: {e}")128 import traceback129130 traceback.print_exc()Streaming Executor Events
The events from Agents and Teams used inside your workflow are automatically yielded during the streaming of a Workflow. You can choose not to stream these executor events by setting stream_executor_events=False.
The following Workflow events will be streamed in all cases:
WorkflowStartedWorkflowCompletedStepStartedStepCompleted
See the following example:
1from kern.agent import Agent2from kern.models.openai import OpenAIResponses3from kern.workflow.step import Step4from kern.workflow.workflow import Workflow56agent = Agent(7 name="ResearchAgent",8 model=OpenAIResponses(id="gpt-5.2"),9 instructions="You are a helpful research assistant. Be concise.",10)1112workflow = Workflow(13 name="Research Workflow",14 steps=[Step(name="Research", agent=agent)],15 stream=True,16 stream_executor_events=False, # <- Filter out internal executor events17)1819print("\n" + "=" * 70)20print("Workflow Streaming Example: stream_executor_events=False")21print("=" * 70)22print(23 "\nThis will show only workflow and step events and will not yield RunContent and TeamRunContent events"24)25print("filtering out internal agent/team events for cleaner output.\n")2627# Run workflow and display events28for event in workflow.run(29 "What is Python?",30 stream=True,31 stream_events=True,32):33 event_name = event.event if hasattr(event, "event") else type(event).__name__34 print(f" → {event_name}")Async Streaming
The Workflow.arun(stream=True) returns an async iterator of WorkflowRunOutputEvent objects instead of a single response.
So for example, if you want to stream the response, you can do the following:
1# Define your workflow2...34async def main():5 try:6 response: AsyncIterator[WorkflowRunOutputEvent] = basic_workflow.arun(7 message="Recent breakthroughs in quantum computing",8 stream=True,9 stream_events=True,10 )11 async for event in response:12 if event.event == WorkflowRunEvent.condition_execution_started.value:13 print(event)14 print()15 elif event.event == WorkflowRunEvent.condition_execution_completed.value:16 print(event)17 print()18 elif event.event == WorkflowRunEvent.workflow_started.value:19 print(event)20 print()21 elif event.event == WorkflowRunEvent.step_started.value:22 print(event)23 print()24 elif event.event == WorkflowRunEvent.step_completed.value:25 print(event)26 print()27 elif event.event == WorkflowRunEvent.workflow_completed.value:28 print(event)29 print()30 except Exception as e:31 print(f"Error: {e}")32 import traceback33 traceback.print_exc()3435if __name__ == "__main__":36 asyncio.run(main())See the Async Streaming example for more details.
Event Types
The following events are yielded by the Workflow.run() and Workflow.arun() functions depending on the workflow's configuration:
Core Events
| Event Type | Description |
|---|---|
WorkflowStarted | Indicates the start of a workflow run |
WorkflowCompleted | Signals successful completion of the workflow run |
WorkflowError | Indicates an error occurred during the workflow run |
Step Events
| Event Type | Description |
|---|---|
StepStarted | Indicates the start of a step |
StepCompleted | Signals successful completion of a step |
StepError | Indicates an error occurred during a step |
Step Output Events (For custom functions)
| Event Type | Description |
|---|---|
StepOutput | Indicates the output of a step |
Parallel Execution Events
| Event Type | Description |
|---|---|
ParallelExecutionStarted | Indicates the start of a parallel step |
ParallelExecutionCompleted | Signals successful completion of a parallel step |
Condition Execution Events
| Event Type | Description |
|---|---|
ConditionExecutionStarted | Indicates the start of a condition |
ConditionExecutionCompleted | Signals successful completion of a condition |
Loop Execution Events
| Event Type | Description |
|---|---|
LoopExecutionStarted | Indicates the start of a loop |
LoopIterationStartedEvent | Indicates the start of a loop iteration |
LoopIterationCompletedEvent | Signals successful completion of a loop iteration |
LoopExecutionCompleted | Signals successful completion of a loop |
Router Execution Events
| Event Type | Description |
|---|---|
RouterExecutionStarted | Indicates the start of a router |
RouterExecutionCompleted | Signals successful completion of a router |
Steps Execution Events
| Event Type | Description |
|---|---|
StepsExecutionStarted | Indicates the start of Steps being executed |
StepsExecutionCompleted | Signals successful completion of Steps execution |
See detailed documentation in the WorkflowRunOutputEvent documentation.
Storing Events
Workflows can automatically store all execution events for analysis, debugging, and audit purposes. Filter specific event types to reduce noise and storage overhead while maintaining essential execution records.
Access stored events via workflow.run_response.events and in the runs column of your workflow's session database (SQLite, PostgreSQL, etc.).
store_events=True: Automatically stores all workflow events in the databaseevents_to_skip=[]: Filter out specific event types to reduce storage and noise
Access all stored events via workflow.run_response.events
Available Events to Skip:
1from kern.run.workflow import WorkflowRunEvent23# Common events you might want to skip4events_to_skip = [5 WorkflowRunEvent.workflow_started,6 WorkflowRunEvent.workflow_completed,7 WorkflowRunEvent.workflow_cancelled,8 WorkflowRunEvent.step_started,9 WorkflowRunEvent.step_completed,10 WorkflowRunEvent.parallel_execution_started,11 WorkflowRunEvent.parallel_execution_completed,12 WorkflowRunEvent.condition_execution_started,13 WorkflowRunEvent.condition_execution_completed,14 WorkflowRunEvent.loop_execution_started,15 WorkflowRunEvent.loop_execution_completed,16 WorkflowRunEvent.router_execution_started,17 WorkflowRunEvent.router_execution_completed,18]Use Cases
- Debugging: Store all events to analyze workflow execution flow
- Audit Trails: Keep records of all workflow activities for compliance
- Performance Analysis: Analyze timing and execution patterns
- Error Investigation: Review event sequences leading to failures
- Noise Reduction: Skip verbose events like
step_startedto focus on results
Configuration Examples
1# store everything2debug_workflow = Workflow(3 name="Debug Workflow",4 store_events=True,5 steps=[...]6)78# store only important events9production_workflow = Workflow(10 name="Production Workflow",11 store_events=True,12 events_to_skip=[13 WorkflowRunEvent.step_started,14 WorkflowRunEvent.parallel_execution_started,15 # keep step_completed and workflow_completed16 ],17 steps=[...]18)1920# No event storage21fast_workflow = Workflow(22 name="Fast Workflow",23 store_events=False,24 steps=[...]25)See this example for more information.
Kern Telemetry
Kern logs which model an workflow used so we can prioritize updates to the most popular providers. You can disable this by setting AGNO_TELEMETRY=false in your environment or by setting telemetry=False on the workflow.
1export AGNO_TELEMETRY=falseor:
1workflow = Workflow(..., telemetry=False)See the Workflow class reference for more details.