Store Events and Events to Skip in a Workflow

This example demonstrates **Workflows 2.0** event storage capabilities

This example demonstrates Workflows 2.0 event storage capabilities, showing how to:

  1. Store execution events for debugging/auditing (store_events=True)
  2. Filter noisy events (events_to_skip) to focus on critical workflow milestones
  3. Access stored events post-execution via workflow.run_response.events

Key Features:

  • Selective Storage: Skip verbose events (e.g., step_started) while retaining key milestones.
  • Debugging/Audit: Capture execution flow for analysis without manual logging.
  • Performance Optimization: Reduce storage overhead by filtering non-essential events.
1from kern.agent import Agent
2from kern.db.sqlite import SqliteDb
3from kern.models.openai import OpenAIResponses
4from kern.run.agent import (
5 RunContentEvent,
6 ToolCallCompletedEvent,
7 ToolCallStartedEvent,
8)
9from kern.run.workflow import WorkflowRunEvent, WorkflowRunOutput
10from kern.tools.hackernews import HackerNewsTools
11from kern.run.agent import RunEvent
12from kern.workflow.parallel import Parallel
13from kern.workflow.step import Step
14from kern.workflow.workflow import Workflow
15
16# Define agents for different tasks
17news_agent = Agent(
18 name="News Agent",
19 model=OpenAIResponses(id="gpt-5.2"),
20 tools=[HackerNewsTools()],
21 instructions="You are a news researcher. Get the latest tech news and summarize key points.",
22)
23
24search_agent = Agent(
25 name="Search Agent",
26 model=OpenAIResponses(id="gpt-5.2"),
27 instructions="You are a search specialist. Find relevant information on given topics.",
28)
29
30analysis_agent = Agent(
31 name="Analysis Agent",
32 model=OpenAIResponses(id="gpt-5.2"),
33 instructions="You are an analyst. Analyze the provided information and give insights.",
34)
35
36summary_agent = Agent(
37 name="Summary Agent",
38 model=OpenAIResponses(id="gpt-5.2"),
39 instructions="You are a summarizer. Create concise summaries of the provided content.",
40)
41
42research_step = Step(
43 name="Research Step",
44 agent=news_agent,
45)
46
47search_step = Step(
48 name="Search Step",
49 agent=search_agent,
50)
51
52
53def print_stored_events(run_response: WorkflowRunOutput, example_name):
54 """Helper function to print stored events in a nice format"""
55 print(f"\n--- {example_name} - Stored Events ---")
56 if run_response.events:
57 print(f"Total stored events: {len(run_response.events)}")
58 for i, event in enumerate(run_response.events, 1):
59 print(f" {i}. {event.event}")
60 else:
61 print("No events stored")
62 print()
63
64
65print("=== Simple Step Workflow with Event Storage ===")
66step_workflow = Workflow(
67 name="Simple Step Workflow",
68 description="Basic workflow demonstrating step event storage",
69 db=SqliteDb(
70 session_table="workflow_session",
71 db_file="tmp/workflow.db",
72 ),
73 steps=[research_step, search_step],
74 store_events=True,
75 events_to_skip=[
76 WorkflowRunEvent.step_started,
77 WorkflowRunEvent.workflow_completed,
78 RunEvent.run_content,
79 RunEvent.run_started,
80 RunEvent.run_completed,
81 ], # Skip step started events to reduce noise
82)
83
84print("Running Step workflow with streaming...")
85for event in step_workflow.run(
86 input="AI trends in 2024",
87 stream=True,
88 stream_events=True,
89):
90 # Filter out RunContentEvent from printing to reduce noise
91 if not isinstance(
92 event, (RunContentEvent, ToolCallStartedEvent, ToolCallCompletedEvent)
93 ):
94 print(
95 f"Event: {event.event if hasattr(event, 'event') else type(event).__name__}"
96 )
97run_response = step_workflow.get_last_run_output()
98
99print("\nStep workflow completed!")
100print(
101 f"Total events stored: {len(run_response.events) if run_response and run_response.events else 0}"
102)
103
104# Print stored events in a nice format
105print_stored_events(run_response, "Simple Step Workflow")
106
107# ------------------------------------------------------------------------------------------------ #
108# ------------------------------------------------------------------------------------------------ #
109
110# Example 2: Parallel Primitive with Event Storage
111print("=== 2. Parallel Example ===")
112parallel_workflow = Workflow(
113 name="Parallel Research Workflow",
114 steps=[
115 Parallel(
116 Step(name="News Research", agent=news_agent),
117 Step(name="Web Search", agent=search_agent),
118 name="Parallel Research",
119 ),
120 Step(name="Combine Results", agent=analysis_agent),
121 ],
122 db=SqliteDb(
123 session_table="workflow_parallel",
124 db_file="tmp/workflow_parallel.db",
125 ),
126 store_events=True,
127 events_to_skip=[
128 WorkflowRunEvent.parallel_execution_started,
129 WorkflowRunEvent.parallel_execution_completed,
130 ],
131)
132
133print("Running Parallel workflow...")
134for event in parallel_workflow.run(
135 input="Research machine learning developments",
136 stream=True,
137 stream_events=True,
138):
139 # Filter out RunContentEvent from printing
140 if not isinstance(event, RunContentEvent):
141 print(
142 f"Event: {event.event if hasattr(event, 'event') else type(event).__name__}"
143 )
144
145run_response = parallel_workflow.get_last_run_output()
146print(f"Parallel workflow stored {len(run_response.events)} events")
147print_stored_events(run_response, "Parallel Workflow")
148print()