Running Workflows

Execute workflows with Workflow.run() and process their output.

The Workflow.run() function runs the agent and generates a response, either as a WorkflowRunOutput object or a stream of WorkflowRunOutput objects.

Many of our examples use workflow.print_response() which is a helper utility to print the response in the terminal. This uses workflow.run() under the hood.

Running your Workflow

Here's how to run your workflow. The response is captured in the response.

1from kern.agent import Agent
2from kern.models.openai import OpenAIResponses
3from kern.db.sqlite import SqliteDb
4from kern.team import Team
5from kern.tools.hackernews import HackerNewsTools
6from kern.tools.yfinance import YFinanceTools
7from kern.workflow import Step, Workflow
8from kern.run.workflow import WorkflowRunOutput
9from kern.utils.pprint import pprint_run_response
10
11# Define agents
12hackernews_agent = Agent(
13 name="Hackernews Agent",
14 model=OpenAIResponses(id="gpt-5.2"),
15 tools=[HackerNewsTools()],
16 role="Extract key insights and content from Hackernews posts",
17)
18finance_agent = Agent(
19 name="Finance Agent",
20 model=OpenAIResponses(id="gpt-5.2"),
21 tools=[YFinanceTools()],
22 role="Get stock prices and financial data",
23)
24
25# Define research team for complex analysis
26research_team = Team(
27 name="Research Team",
28 members=[hackernews_agent, finance_agent],
29 instructions="Research tech topics and related stocks",
30)
31
32content_planner = Agent(
33 name="Content Planner",
34 model=OpenAIResponses(id="gpt-5.2"),
35 instructions=[
36 "Plan a content schedule over 4 weeks for the provided topic and research content",
37 "Ensure that I have posts for 3 posts per week",
38 ],
39)
40
41content_creation_workflow = Workflow(
42 name="Content Creation Workflow",
43 description="Automated content creation from blog posts to social media",
44 db=SqliteDb(db_file="tmp/workflow.db"),
45 steps=[research_team, content_planner],
46)
47
48# Create and use workflow
49if __name__ == "__main__":
50 response: WorkflowRunOutput = content_creation_workflow.run(
51 input="AI trends in 2024",
52 markdown=True,
53 )
54
55 pprint_run_response(response, markdown=True)
Note

The Workflow.run() function returns a WorkflowRunOutput object when not streaming. Here is detailed documentation for WorkflowRunOutput.

Async Execution

The Workflow.arun() function is the async version of Workflow.run().

Here is an example of how to use it:

1from typing import AsyncIterator
2import asyncio
3
4from kern.agent import Agent
5from kern.tools.hackernews import HackerNewsTools
6from kern.workflow import Condition, Step, Workflow, StepInput
7from kern.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent
8
9# === BASIC AGENTS ===
10researcher = Agent(
11 name="Researcher",
12 instructions="Research the given topic and provide detailed findings.",
13 tools=[HackerNewsTools()],
14)
15
16summarizer = Agent(
17 name="Summarizer",
18 instructions="Create a clear summary of the research findings.",
19)
20
21fact_checker = Agent(
22 name="Fact Checker",
23 instructions="Verify facts and check for accuracy in the research.",
24 tools=[HackerNewsTools()],
25)
26
27writer = Agent(
28 name="Writer",
29 instructions="Write a comprehensive article based on all available research and verification.",
30)
31
32# === CONDITION EVALUATOR ===
33def needs_fact_checking(step_input: StepInput) -> bool:
34 """Determine if the research contains claims that need fact-checking"""
35 summary = step_input.previous_step_content or ""
36
37 # Look for keywords that suggest factual claims
38 fact_indicators = [
39 "study shows",
40 "breakthroughs",
41 "research indicates",
42 "according to",
43 "statistics",
44 "data shows",
45 "survey",
46 "report",
47 "million",
48 "billion",
49 "percent",
50 "%",
51 "increase",
52 "decrease",
53 ]
54
55 return any(indicator in summary.lower() for indicator in fact_indicators)
56
57
58# === WORKFLOW STEPS ===
59research_step = Step(
60 name="research",
61 description="Research the topic",
62 agent=researcher,
63)
64
65summarize_step = Step(
66 name="summarize",
67 description="Summarize research findings",
68 agent=summarizer,
69)
70
71# Conditional fact-checking step
72fact_check_step = Step(
73 name="fact_check",
74 description="Verify facts and claims",
75 agent=fact_checker,
76)
77
78write_article = Step(
79 name="write_article",
80 description="Write final article",
81 agent=writer,
82)
83
84# === BASIC LINEAR WORKFLOW ===
85basic_workflow = Workflow(
86 name="Basic Linear Workflow",
87 description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
88 steps=[
89 research_step,
90 summarize_step,
91 Condition(
92 name="fact_check_condition",
93 description="Check if fact-checking is needed",
94 evaluator=needs_fact_checking,
95 steps=[fact_check_step],
96 ),
97 write_article,
98 ],
99)
100
101async def main():
102 try:
103 response: WorkflowRunOutput = await basic_workflow.arun(
104 input="Recent breakthroughs in quantum computing",
105 )
106 pprint_run_response(response, markdown=True)
107 except Exception as e:
108 print(f"Error: {e}")
109
110if __name__ == "__main__":
111 asyncio.run(main())

Streaming Responses

To enable streaming, set stream=True when calling run(). This will return an iterator of WorkflowRunOutputEvent objects instead of a single response.

1# Define your agents/team
2...
3
4content_creation_workflow = Workflow(
5 name="Content Creation Workflow",
6 description="Automated content creation from blog posts to social media",
7 db=SqliteDb(db_file="tmp/workflow.db"),
8 steps=[research_team, content_planner],
9)
10
11# Create and use workflow
12if __name__ == "__main__":
13 response: Iterator[WorkflowRunOutputEvent] = content_creation_workflow.run(
14 input="AI trends in 2024",
15 markdown=True,
16 stream=True,
17 )
18
19 pprint_run_response(response, markdown=True)

Streaming all events

By default, when you stream a response, only the WorkflowStartedEvent and WorkflowCompletedEvent events will be streamed (together with all the Agent and Team events).

You can also stream all events by setting stream_events=True.

This will provide real-time updates about the workflow's internal processes:

1from typing import Iterator
2
3from kern.agent import Agent
4from kern.tools.hackernews import HackerNewsTools
5from kern.workflow import Condition, Step, Workflow, StepInput
6from kern.run.workflow import WorkflowRunOutput, WorkflowRunOutputEvent, WorkflowRunEvent
7
8# === BASIC AGENTS ===
9researcher = Agent(
10 name="Researcher",
11 instructions="Research the given topic and provide detailed findings.",
12 tools=[HackerNewsTools()],
13)
14
15summarizer = Agent(
16 name="Summarizer",
17 instructions="Create a clear summary of the research findings.",
18)
19
20fact_checker = Agent(
21 name="Fact Checker",
22 instructions="Verify facts and check for accuracy in the research.",
23 tools=[HackerNewsTools()],
24)
25
26writer = Agent(
27 name="Writer",
28 instructions="Write a comprehensive article based on all available research and verification.",
29)
30
31# === CONDITION EVALUATOR ===
32def needs_fact_checking(step_input: StepInput) -> bool:
33 """Determine if the research contains claims that need fact-checking"""
34 summary = step_input.previous_step_content or ""
35
36 # Look for keywords that suggest factual claims
37 fact_indicators = [
38 "study shows",
39 "breakthroughs",
40 "research indicates",
41 "according to",
42 "statistics",
43 "data shows",
44 "survey",
45 "report",
46 "million",
47 "billion",
48 "percent",
49 "%",
50 "increase",
51 "decrease",
52 ]
53
54 return any(indicator in summary.lower() for indicator in fact_indicators)
55
56
57# === WORKFLOW STEPS ===
58research_step = Step(
59 name="research",
60 description="Research the topic",
61 agent=researcher,
62)
63
64summarize_step = Step(
65 name="summarize",
66 description="Summarize research findings",
67 agent=summarizer,
68)
69
70# Conditional fact-checking step
71fact_check_step = Step(
72 name="fact_check",
73 description="Verify facts and claims",
74 agent=fact_checker,
75)
76
77write_article = Step(
78 name="write_article",
79 description="Write final article",
80 agent=writer,
81)
82
83# === BASIC LINEAR WORKFLOW ===
84basic_workflow = Workflow(
85 name="Basic Linear Workflow",
86 description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
87 steps=[
88 research_step,
89 summarize_step,
90 Condition(
91 name="fact_check_condition",
92 description="Check if fact-checking is needed",
93 evaluator=needs_fact_checking,
94 steps=[fact_check_step],
95 ),
96 write_article,
97 ],
98)
99
100if __name__ == "__main__":
101 try:
102 response: Iterator[WorkflowRunOutputEvent] = basic_workflow.run(
103 input="Recent breakthroughs in quantum computing",
104 stream=True,
105 stream_events=True,
106 )
107 for event in response:
108 if event.event == WorkflowRunEvent.condition_execution_started.value:
109 print(event)
110 print()
111 elif event.event == WorkflowRunEvent.condition_execution_completed.value:
112 print(event)
113 print()
114 elif event.event == WorkflowRunEvent.workflow_started.value:
115 print(event)
116 print()
117 elif event.event == WorkflowRunEvent.step_started.value:
118 print(event)
119 print()
120 elif event.event == WorkflowRunEvent.step_completed.value:
121 print(event)
122 print()
123 elif event.event == WorkflowRunEvent.workflow_completed.value:
124 print(event)
125 print()
126 except Exception as e:
127 print(f"Error: {e}")
128 import traceback
129
130 traceback.print_exc()

Streaming Executor Events

The events from Agents and Teams used inside your workflow are automatically yielded during the streaming of a Workflow. You can choose not to stream these executor events by setting stream_executor_events=False.

The following Workflow events will be streamed in all cases:

  • WorkflowStarted
  • WorkflowCompleted
  • StepStarted
  • StepCompleted

See the following example:

1from kern.agent import Agent
2from kern.models.openai import OpenAIResponses
3from kern.workflow.step import Step
4from kern.workflow.workflow import Workflow
5
6agent = Agent(
7 name="ResearchAgent",
8 model=OpenAIResponses(id="gpt-5.2"),
9 instructions="You are a helpful research assistant. Be concise.",
10)
11
12workflow = Workflow(
13 name="Research Workflow",
14 steps=[Step(name="Research", agent=agent)],
15 stream=True,
16 stream_executor_events=False, # <- Filter out internal executor events
17)
18
19print("\n" + "=" * 70)
20print("Workflow Streaming Example: stream_executor_events=False")
21print("=" * 70)
22print(
23 "\nThis will show only workflow and step events and will not yield RunContent and TeamRunContent events"
24)
25print("filtering out internal agent/team events for cleaner output.\n")
26
27# Run workflow and display events
28for event in workflow.run(
29 "What is Python?",
30 stream=True,
31 stream_events=True,
32):
33 event_name = event.event if hasattr(event, "event") else type(event).__name__
34 print(f" → {event_name}")

Async Streaming

The Workflow.arun(stream=True) returns an async iterator of WorkflowRunOutputEvent objects instead of a single response. So for example, if you want to stream the response, you can do the following:

1# Define your workflow
2...
3
4async def main():
5 try:
6 response: AsyncIterator[WorkflowRunOutputEvent] = basic_workflow.arun(
7 message="Recent breakthroughs in quantum computing",
8 stream=True,
9 stream_events=True,
10 )
11 async for event in response:
12 if event.event == WorkflowRunEvent.condition_execution_started.value:
13 print(event)
14 print()
15 elif event.event == WorkflowRunEvent.condition_execution_completed.value:
16 print(event)
17 print()
18 elif event.event == WorkflowRunEvent.workflow_started.value:
19 print(event)
20 print()
21 elif event.event == WorkflowRunEvent.step_started.value:
22 print(event)
23 print()
24 elif event.event == WorkflowRunEvent.step_completed.value:
25 print(event)
26 print()
27 elif event.event == WorkflowRunEvent.workflow_completed.value:
28 print(event)
29 print()
30 except Exception as e:
31 print(f"Error: {e}")
32 import traceback
33 traceback.print_exc()
34
35if __name__ == "__main__":
36 asyncio.run(main())

See the Async Streaming example for more details.

Event Types

The following events are yielded by the Workflow.run() and Workflow.arun() functions depending on the workflow's configuration:

Core Events

Event TypeDescription
WorkflowStartedIndicates the start of a workflow run
WorkflowCompletedSignals successful completion of the workflow run
WorkflowErrorIndicates an error occurred during the workflow run

Step Events

Event TypeDescription
StepStartedIndicates the start of a step
StepCompletedSignals successful completion of a step
StepErrorIndicates an error occurred during a step

Step Output Events (For custom functions)

Event TypeDescription
StepOutputIndicates the output of a step

Parallel Execution Events

Event TypeDescription
ParallelExecutionStartedIndicates the start of a parallel step
ParallelExecutionCompletedSignals successful completion of a parallel step

Condition Execution Events

Event TypeDescription
ConditionExecutionStartedIndicates the start of a condition
ConditionExecutionCompletedSignals successful completion of a condition

Loop Execution Events

Event TypeDescription
LoopExecutionStartedIndicates the start of a loop
LoopIterationStartedEventIndicates the start of a loop iteration
LoopIterationCompletedEventSignals successful completion of a loop iteration
LoopExecutionCompletedSignals successful completion of a loop

Router Execution Events

Event TypeDescription
RouterExecutionStartedIndicates the start of a router
RouterExecutionCompletedSignals successful completion of a router

Steps Execution Events

Event TypeDescription
StepsExecutionStartedIndicates the start of Steps being executed
StepsExecutionCompletedSignals successful completion of Steps execution

See detailed documentation in the WorkflowRunOutputEvent documentation.

Storing Events

Workflows can automatically store all execution events for analysis, debugging, and audit purposes. Filter specific event types to reduce noise and storage overhead while maintaining essential execution records.

Access stored events via workflow.run_response.events and in the runs column of your workflow's session database (SQLite, PostgreSQL, etc.).

  • store_events=True: Automatically stores all workflow events in the database
  • events_to_skip=[]: Filter out specific event types to reduce storage and noise

Access all stored events via workflow.run_response.events

Available Events to Skip:

1from kern.run.workflow import WorkflowRunEvent
2
3# Common events you might want to skip
4events_to_skip = [
5 WorkflowRunEvent.workflow_started,
6 WorkflowRunEvent.workflow_completed,
7 WorkflowRunEvent.workflow_cancelled,
8 WorkflowRunEvent.step_started,
9 WorkflowRunEvent.step_completed,
10 WorkflowRunEvent.parallel_execution_started,
11 WorkflowRunEvent.parallel_execution_completed,
12 WorkflowRunEvent.condition_execution_started,
13 WorkflowRunEvent.condition_execution_completed,
14 WorkflowRunEvent.loop_execution_started,
15 WorkflowRunEvent.loop_execution_completed,
16 WorkflowRunEvent.router_execution_started,
17 WorkflowRunEvent.router_execution_completed,
18]

Use Cases

  • Debugging: Store all events to analyze workflow execution flow
  • Audit Trails: Keep records of all workflow activities for compliance
  • Performance Analysis: Analyze timing and execution patterns
  • Error Investigation: Review event sequences leading to failures
  • Noise Reduction: Skip verbose events like step_started to focus on results

Configuration Examples

1# store everything
2debug_workflow = Workflow(
3 name="Debug Workflow",
4 store_events=True,
5 steps=[...]
6)
7
8# store only important events
9production_workflow = Workflow(
10 name="Production Workflow",
11 store_events=True,
12 events_to_skip=[
13 WorkflowRunEvent.step_started,
14 WorkflowRunEvent.parallel_execution_started,
15 # keep step_completed and workflow_completed
16 ],
17 steps=[...]
18)
19
20# No event storage
21fast_workflow = Workflow(
22 name="Fast Workflow",
23 store_events=False,
24 steps=[...]
25)
Tip

See this example for more information.

Kern Telemetry

Kern logs which model an workflow used so we can prioritize updates to the most popular providers. You can disable this by setting AGNO_TELEMETRY=false in your environment or by setting telemetry=False on the workflow.

1export AGNO_TELEMETRY=false

or:

1workflow = Workflow(..., telemetry=False)

See the Workflow class reference for more details.

Developer Resources