Workflow Run Cancellation

Cancel a running workflow execution from another thread.

This example demonstrates how to cancel a running workflow execution by starting a workflow run in a separate thread and cancelling it from another thread. It shows proper handling of cancelled responses and thread management.

Example

1"""
2Example demonstrating how to cancel a running workflow execution.
3
4This example shows how to:
51. Start a workflow run in a separate thread
62. Cancel the run from another thread
73. Handle the cancelled response
8"""
9
10import threading
11import time
12
13from kern.agent import Agent
14from kern.models.openai import OpenAIResponses
15from kern.run.agent import RunEvent
16from kern.run.base import RunStatus
17from kern.run.workflow import WorkflowRunEvent
18from kern.tools.hackernews import HackerNewsTools
19from kern.workflow.step import Step
20from kern.workflow.workflow import Workflow
21
22
23def long_running_task(workflow: Workflow, run_id_container: dict):
24 """
25 Simulate a long-running workflow task that can be cancelled.
26
27 Args:
28 workflow: The workflow to run
29 run_id_container: Dictionary to store the run_id for cancellation
30
31 Returns:
32 Dictionary with run results and status
33 """
34 try:
35 # Start the workflow run - this simulates a long task
36 final_response = None
37 content_pieces = []
38
39 for chunk in workflow.run(
40 "Write a very long story about a dragon who learns to code. "
41 "Make it at least 2000 words with detailed descriptions and dialogue. "
42 "Take your time and be very thorough.",
43 stream=True,
44 ):
45 if "run_id" not in run_id_container and chunk.run_id:
46 print(f"Workflow run started: {chunk.run_id}")
47 run_id_container["run_id"] = chunk.run_id
48
49 if chunk.event in [RunEvent.run_content]:
50 print(chunk.content, end="", flush=True)
51 content_pieces.append(chunk.content)
52 elif chunk.event == RunEvent.run_cancelled:
53 print(f"\nWorkflow run was cancelled: {chunk.run_id}")
54 run_id_container["result"] = {
55 "status": "cancelled",
56 "run_id": chunk.run_id,
57 "cancelled": True,
58 "content": "".join(content_pieces)[:200] + "..."
59 if content_pieces
60 else "No content before cancellation",
61 }
62 return
63 elif chunk.event == WorkflowRunEvent.workflow_cancelled:
64 print(f"\nWorkflow run was cancelled: {chunk.run_id}")
65 run_id_container["result"] = {
66 "status": "cancelled",
67 "run_id": chunk.run_id,
68 "cancelled": True,
69 "content": "".join(content_pieces)[:200] + "..."
70 if content_pieces
71 else "No content before cancellation",
72 }
73 return
74 elif hasattr(chunk, "status") and chunk.status == RunStatus.completed:
75 final_response = chunk
76
77 # If we get here, the run completed successfully
78 if final_response:
79 run_id_container["result"] = {
80 "status": final_response.status.value
81 if final_response.status
82 else "completed",
83 "run_id": final_response.run_id,
84 "cancelled": final_response.status == RunStatus.cancelled,
85 "content": ("".join(content_pieces)[:200] + "...")
86 if content_pieces
87 else "No content",
88 }
89 else:
90 run_id_container["result"] = {
91 "status": "unknown",
92 "run_id": run_id_container.get("run_id"),
93 "cancelled": False,
94 "content": ("".join(content_pieces)[:200] + "...")
95 if content_pieces
96 else "No content",
97 }
98
99 except Exception as e:
100 print(f"\nException in run: {str(e)}")
101 run_id_container["result"] = {
102 "status": "error",
103 "error": str(e),
104 "run_id": run_id_container.get("run_id"),
105 "cancelled": True,
106 "content": "Error occurred",
107 }
108
109
110def cancel_after_delay(
111 workflow: Workflow, run_id_container: dict, delay_seconds: int = 3
112):
113 """
114 Cancel the workflow run after a specified delay.
115
116 Args:
117 workflow: The workflow whose run should be cancelled
118 run_id_container: Dictionary containing the run_id to cancel
119 delay_seconds: How long to wait before cancelling
120 """
121 print(f"Will cancel workflow run in {delay_seconds} seconds...")
122 time.sleep(delay_seconds)
123
124 run_id = run_id_container.get("run_id")
125 if run_id:
126 print(f"Cancelling workflow run: {run_id}")
127 success = workflow.cancel_run(run_id)
128 if success:
129 print(f"Workflow run {run_id} marked for cancellation")
130 else:
131 print(
132 f"Failed to cancel workflow run {run_id} (may not exist or already completed)"
133 )
134 else:
135 print("No run_id found to cancel")
136
137
138def main():
139 """Main function demonstrating workflow run cancellation."""
140
141 # Create workflow agents
142 researcher = Agent(
143 name="Research Agent",
144 model=OpenAIResponses(id="gpt-5.2"),
145 tools=[HackerNewsTools()],
146 instructions="Research the given topic and provide key facts and insights.",
147 )
148
149 writer = Agent(
150 name="Writing Agent",
151 model=OpenAIResponses(id="gpt-5.2"),
152 instructions="Write a comprehensive article based on the research provided. Make it engaging and well-structured.",
153 )
154 research_step = Step(
155 name="research",
156 agent=researcher,
157 description="Research the topic and gather information",
158 )
159
160 writing_step = Step(
161 name="writing",
162 agent=writer,
163 description="Write an article based on the research",
164 )
165
166 # Create a Steps sequence that chains these above steps together
167 article_workflow = Workflow(
168 description="Automated article creation from research to writing",
169 steps=[research_step, writing_step],
170 debug_mode=True,
171 )
172
173 print("Starting workflow run cancellation example...")
174 print("=" * 50)
175
176 # Container to share run_id between threads
177 run_id_container = {}
178
179 # Start the workflow run in a separate thread
180 workflow_thread = threading.Thread(
181 target=lambda: long_running_task(article_workflow, run_id_container),
182 name="WorkflowRunThread",
183 )
184
185 # Start the cancellation thread
186 cancel_thread = threading.Thread(
187 target=cancel_after_delay,
188 args=(article_workflow, run_id_container, 8), # Cancel after 8 seconds
189 name="CancelThread",
190 )
191
192 # Start both threads
193 print("Starting workflow run thread...")
194 workflow_thread.start()
195
196 print("Starting cancellation thread...")
197 cancel_thread.start()
198
199 # Wait for both threads to complete
200 print("Waiting for threads to complete...")
201 workflow_thread.join()
202 cancel_thread.join()
203
204 # Print the results
205 print("\n" + "=" * 50)
206 print("RESULTS:")
207 print("=" * 50)
208
209 result = run_id_container.get("result")
210 if result:
211 print(f"Status: {result['status']}")
212 print(f"Run ID: {result['run_id']}")
213 print(f"Was Cancelled: {result['cancelled']}")
214
215 if result.get("error"):
216 print(f"Error: {result['error']}")
217 else:
218 print(f"Content Preview: {result['content']}")
219
220 if result["cancelled"]:
221 print("\nSUCCESS: Workflow run was successfully cancelled!")
222 else:
223 print("\nWARNING: Workflow run completed before cancellation")
224 else:
225 print("No result obtained - check if cancellation happened during streaming")
226
227 print("\nWorkflow cancellation example completed!")
228
229
230if __name__ == "__main__":
231 # Run the main example
232 main()

API Endpoint

Workflow runs can be cancelled via the AgentOS API:

1POST /workflows/{workflow_id}/runs/{run_id}/cancel

Example:

1curl --location 'http://localhost:7777/workflows/analysis-workflow/runs/789/cancel' \
2 --request POST

Reference: Cancel Workflow Run API