Workflow Cancellation

This example demonstrates **Workflows 2.0** support for cancelling running workflow executions, including thread-based cancellation and handling cancelled responses.

This example shows how to cancel a running workflow execution in real-time. It demonstrates:

  1. Thread-based Execution: Running workflows in separate threads for non-blocking operation
  2. Dynamic Cancellation: Cancelling workflows while they're actively running
  3. Cancellation Events: Handling and responding to cancellation events
  4. Status Tracking: Monitoring workflow status throughout execution and cancellation
1import threading
2import time
3
4from kern.agent import Agent
5from kern.models.openai import OpenAIResponses
6from kern.run.agent import RunEvent
7from kern.run.base import RunStatus
8from kern.run.workflow import WorkflowRunEvent
9from kern.tools.hackernews import HackerNewsTools
10from kern.workflow.step import Step
11from kern.workflow.workflow import Workflow
12
13
14def long_running_task(workflow: Workflow, run_id_container: dict):
15 """
16 Simulate a long-running workflow task that can be cancelled.
17
18 Args:
19 workflow: The workflow to run
20 run_id_container: Dictionary to store the run_id for cancellation
21
22 Returns:
23 Dictionary with run results and status
24 """
25 try:
26 # Start the workflow run - this simulates a long task
27 final_response = None
28 content_pieces = []
29
30 for chunk in workflow.run(
31 "Write a very long story about a dragon who learns to code. "
32 "Make it at least 2000 words with detailed descriptions and dialogue. "
33 "Take your time and be very thorough.",
34 stream=True,
35 ):
36 if "run_id" not in run_id_container and chunk.run_id:
37 print(f"> Workflow run started: {chunk.run_id}")
38 run_id_container["run_id"] = chunk.run_id
39
40 if chunk.event in [RunEvent.run_content]:
41 print(chunk.content, end="", flush=True)
42 content_pieces.append(chunk.content)
43 elif chunk.event == RunEvent.run_cancelled:
44 print(f"\n> Workflow run was cancelled: {chunk.run_id}")
45 run_id_container["result"] = {
46 "status": "cancelled",
47 "run_id": chunk.run_id,
48 "cancelled": True,
49 "content": "".join(content_pieces)[:200] + "..."
50 if content_pieces
51 else "No content before cancellation",
52 }
53 return
54 elif chunk.event == WorkflowRunEvent.workflow_cancelled:
55 print(f"\n> Workflow run was cancelled: {chunk.run_id}")
56 run_id_container["result"] = {
57 "status": "cancelled",
58 "run_id": chunk.run_id,
59 "cancelled": True,
60 "content": "".join(content_pieces)[:200] + "..."
61 if content_pieces
62 else "No content before cancellation",
63 }
64 return
65 elif hasattr(chunk, "status") and chunk.status == RunStatus.completed:
66 final_response = chunk
67
68 # If we get here, the run completed successfully
69 if final_response:
70 run_id_container["result"] = {
71 "status": final_response.status.value
72 if final_response.status
73 else "completed",
74 "run_id": final_response.run_id,
75 "cancelled": final_response.status == RunStatus.cancelled,
76 "content": ("".join(content_pieces)[:200] + "...")
77 if content_pieces
78 else "No content",
79 }
80 else:
81 run_id_container["result"] = {
82 "status": "unknown",
83 "run_id": run_id_container.get("run_id"),
84 "cancelled": False,
85 "content": ("".join(content_pieces)[:200] + "...")
86 if content_pieces
87 else "No content",
88 }
89
90 except Exception as e:
91 print(f"\n> Exception in run: {str(e)}")
92 run_id_container["result"] = {
93 "status": "error",
94 "error": str(e),
95 "run_id": run_id_container.get("run_id"),
96 "cancelled": True,
97 "content": "Error occurred",
98 }
99
100
101def cancel_after_delay(
102 workflow: Workflow, run_id_container: dict, delay_seconds: int = 3
103):
104 """
105 Cancel the workflow run after a specified delay.
106
107 Args:
108 workflow: The workflow whose run should be cancelled
109 run_id_container: Dictionary containing the run_id to cancel
110 delay_seconds: How long to wait before cancelling
111 """
112 print(f"> Will cancel workflow run in {delay_seconds} seconds...")
113 time.sleep(delay_seconds)
114
115 run_id = run_id_container.get("run_id")
116 if run_id:
117 print(f"> Cancelling workflow run: {run_id}")
118 success = workflow.cancel_run(run_id)
119 if success:
120 print(f"> Workflow run {run_id} marked for cancellation")
121 else:
122 print(
123 f"> Failed to cancel workflow run {run_id} (may not exist or already completed)"
124 )
125 else:
126 print("> No run_id found to cancel")
127
128
129def main():
130 """Main function demonstrating workflow run cancellation."""
131
132 # Create workflow agents
133 researcher = Agent(
134 name="Research Agent",
135 model=OpenAIResponses(id="gpt-5.2"),
136 tools=[HackerNewsTools()],
137 instructions="Research the given topic and provide key facts and insights.",
138 )
139
140 writer = Agent(
141 name="Writing Agent",
142 model=OpenAIResponses(id="gpt-5.2"),
143 instructions="Write a comprehensive article based on the research provided. Make it engaging and well-structured.",
144 )
145 research_step = Step(
146 name="research",
147 agent=researcher,
148 description="Research the topic and gather information",
149 )
150
151 writing_step = Step(
152 name="writing",
153 agent=writer,
154 description="Write an article based on the research",
155 )
156
157 # Create a Steps sequence that chains these above steps together
158 article_workflow = Workflow(
159 description="Automated article creation from research to writing",
160 steps=[research_step, writing_step],
161 debug_mode=True,
162 )
163
164 print("> Starting workflow run cancellation example...")
165 print("=" * 50)
166
167 # Container to share run_id between threads
168 run_id_container = {}
169
170 # Start the workflow run in a separate thread
171 workflow_thread = threading.Thread(
172 target=lambda: long_running_task(article_workflow, run_id_container),
173 name="WorkflowRunThread",
174 )
175
176 # Start the cancellation thread
177 cancel_thread = threading.Thread(
178 target=cancel_after_delay,
179 args=(article_workflow, run_id_container, 8), # Cancel after 8 seconds
180 name="CancelThread",
181 )
182
183 # Start both threads
184 print("> Starting workflow run thread...")
185 workflow_thread.start()
186
187 print("> Starting cancellation thread...")
188 cancel_thread.start()
189
190 # Wait for both threads to complete
191 print("> Waiting for threads to complete...")
192 workflow_thread.join()
193 cancel_thread.join()
194
195 # Print the results
196 print("\n" + "=" * 50)
197 print("> RESULTS:")
198 print("=" * 50)
199
200 result = run_id_container.get("result")
201 if result:
202 print(f"Status: {result['status']}")
203 print(f"Run ID: {result['run_id']}")
204 print(f"Was Cancelled: {result['cancelled']}")
205
206 if result.get("error"):
207 print(f"Error: {result['error']}")
208 else:
209 print(f"Content Preview: {result['content']}")
210
211 if result["cancelled"]:
212 print("\n> SUCCESS: Workflow run was successfully cancelled!")
213 else:
214 print("\n> WARNING: Workflow run completed before cancellation")
215 else:
216 print("> No result obtained - check if cancellation happened during streaming")
217
218 print("\n> Workflow cancellation example completed!")
219
220
221if __name__ == "__main__":
222 # Run the main example
223 main()