Store Events and Events to Skip in a Workflow
This example demonstrates **Workflows 2.0** event storage capabilities
This example demonstrates Workflows 2.0 event storage capabilities, showing how to:
- Store execution events for debugging/auditing (
store_events=True) - Filter noisy events (
events_to_skip) to focus on critical workflow milestones - Access stored events post-execution via
workflow.run_response.events
Key Features:
- Selective Storage: Skip verbose events (e.g.,
step_started) while retaining key milestones. - Debugging/Audit: Capture execution flow for analysis without manual logging.
- Performance Optimization: Reduce storage overhead by filtering non-essential events.
1from kern.agent import Agent2from kern.db.sqlite import SqliteDb3from kern.models.openai import OpenAIResponses4from kern.run.agent import (5 RunContentEvent,6 ToolCallCompletedEvent,7 ToolCallStartedEvent,8)9from kern.run.workflow import WorkflowRunEvent, WorkflowRunOutput10from kern.tools.hackernews import HackerNewsTools11from kern.run.agent import RunEvent12from kern.workflow.parallel import Parallel13from kern.workflow.step import Step14from kern.workflow.workflow import Workflow1516# Define agents for different tasks17news_agent = Agent(18 name="News Agent",19 model=OpenAIResponses(id="gpt-5.2"),20 tools=[HackerNewsTools()],21 instructions="You are a news researcher. Get the latest tech news and summarize key points.",22)2324search_agent = Agent(25 name="Search Agent",26 model=OpenAIResponses(id="gpt-5.2"),27 instructions="You are a search specialist. Find relevant information on given topics.",28)2930analysis_agent = Agent(31 name="Analysis Agent",32 model=OpenAIResponses(id="gpt-5.2"),33 instructions="You are an analyst. Analyze the provided information and give insights.",34)3536summary_agent = Agent(37 name="Summary Agent",38 model=OpenAIResponses(id="gpt-5.2"),39 instructions="You are a summarizer. Create concise summaries of the provided content.",40)4142research_step = Step(43 name="Research Step",44 agent=news_agent,45)4647search_step = Step(48 name="Search Step",49 agent=search_agent,50)515253def print_stored_events(run_response: WorkflowRunOutput, example_name):54 """Helper function to print stored events in a nice format"""55 print(f"\n--- {example_name} - Stored Events ---")56 if run_response.events:57 print(f"Total stored events: {len(run_response.events)}")58 for i, event in enumerate(run_response.events, 1):59 print(f" {i}. {event.event}")60 else:61 print("No events stored")62 print()636465print("=== Simple Step Workflow with Event Storage ===")66step_workflow = Workflow(67 name="Simple Step Workflow",68 description="Basic workflow demonstrating step event storage",69 db=SqliteDb(70 session_table="workflow_session",71 db_file="tmp/workflow.db",72 ),73 steps=[research_step, search_step],74 store_events=True,75 events_to_skip=[76 WorkflowRunEvent.step_started,77 WorkflowRunEvent.workflow_completed,78 RunEvent.run_content,79 RunEvent.run_started,80 RunEvent.run_completed,81 ], # Skip step started events to reduce noise82)8384print("Running Step workflow with streaming...")85for event in step_workflow.run(86 input="AI trends in 2024",87 stream=True,88 stream_events=True,89):90 # Filter out RunContentEvent from printing to reduce noise91 if not isinstance(92 event, (RunContentEvent, ToolCallStartedEvent, ToolCallCompletedEvent)93 ):94 print(95 f"Event: {event.event if hasattr(event, 'event') else type(event).__name__}"96 )97run_response = step_workflow.get_last_run_output()9899print("\nStep workflow completed!")100print(101 f"Total events stored: {len(run_response.events) if run_response and run_response.events else 0}"102)103104# Print stored events in a nice format105print_stored_events(run_response, "Simple Step Workflow")106107# ------------------------------------------------------------------------------------------------ #108# ------------------------------------------------------------------------------------------------ #109110# Example 2: Parallel Primitive with Event Storage111print("=== 2. Parallel Example ===")112parallel_workflow = Workflow(113 name="Parallel Research Workflow",114 steps=[115 Parallel(116 Step(name="News Research", agent=news_agent),117 Step(name="Web Search", agent=search_agent),118 name="Parallel Research",119 ),120 Step(name="Combine Results", agent=analysis_agent),121 ],122 db=SqliteDb(123 session_table="workflow_parallel",124 db_file="tmp/workflow_parallel.db",125 ),126 store_events=True,127 events_to_skip=[128 WorkflowRunEvent.parallel_execution_started,129 WorkflowRunEvent.parallel_execution_completed,130 ],131)132133print("Running Parallel workflow...")134for event in parallel_workflow.run(135 input="Research machine learning developments",136 stream=True,137 stream_events=True,138):139 # Filter out RunContentEvent from printing140 if not isinstance(event, RunContentEvent):141 print(142 f"Event: {event.event if hasattr(event, 'event') else type(event).__name__}"143 )144145run_response = parallel_workflow.get_last_run_output()146print(f"Parallel workflow stored {len(run_response.events)} events")147print_stored_events(run_response, "Parallel Workflow")148print()