Workflow Cancellation
This example demonstrates **Workflows 2.0** support for cancelling running workflow executions, including thread-based cancellation and handling cancelled responses.
This example shows how to cancel a running workflow execution in real-time. It demonstrates:
- Thread-based Execution: Running workflows in separate threads for non-blocking operation
- Dynamic Cancellation: Cancelling workflows while they're actively running
- Cancellation Events: Handling and responding to cancellation events
- Status Tracking: Monitoring workflow status throughout execution and cancellation
1import threading2import time34from kern.agent import Agent5from kern.models.openai import OpenAIResponses6from kern.run.agent import RunEvent7from kern.run.base import RunStatus8from kern.run.workflow import WorkflowRunEvent9from kern.tools.hackernews import HackerNewsTools10from kern.workflow.step import Step11from kern.workflow.workflow import Workflow121314def long_running_task(workflow: Workflow, run_id_container: dict):15 """16 Simulate a long-running workflow task that can be cancelled.1718 Args:19 workflow: The workflow to run20 run_id_container: Dictionary to store the run_id for cancellation2122 Returns:23 Dictionary with run results and status24 """25 try:26 # Start the workflow run - this simulates a long task27 final_response = None28 content_pieces = []2930 for chunk in workflow.run(31 "Write a very long story about a dragon who learns to code. "32 "Make it at least 2000 words with detailed descriptions and dialogue. "33 "Take your time and be very thorough.",34 stream=True,35 ):36 if "run_id" not in run_id_container and chunk.run_id:37 print(f"> Workflow run started: {chunk.run_id}")38 run_id_container["run_id"] = chunk.run_id3940 if chunk.event in [RunEvent.run_content]:41 print(chunk.content, end="", flush=True)42 content_pieces.append(chunk.content)43 elif chunk.event == RunEvent.run_cancelled:44 print(f"\n> Workflow run was cancelled: {chunk.run_id}")45 run_id_container["result"] = {46 "status": "cancelled",47 "run_id": chunk.run_id,48 "cancelled": True,49 "content": "".join(content_pieces)[:200] + "..."50 if content_pieces51 else "No content before cancellation",52 }53 return54 elif chunk.event == WorkflowRunEvent.workflow_cancelled:55 print(f"\n> Workflow run was cancelled: {chunk.run_id}")56 run_id_container["result"] = {57 "status": "cancelled",58 "run_id": chunk.run_id,59 "cancelled": True,60 "content": "".join(content_pieces)[:200] + "..."61 if content_pieces62 else "No content before cancellation",63 }64 return65 elif hasattr(chunk, "status") and chunk.status == RunStatus.completed:66 final_response = chunk6768 # If we get here, the run completed successfully69 if final_response:70 run_id_container["result"] = {71 "status": final_response.status.value72 if final_response.status73 else "completed",74 "run_id": final_response.run_id,75 "cancelled": final_response.status == RunStatus.cancelled,76 "content": ("".join(content_pieces)[:200] + "...")77 if content_pieces78 else "No content",79 }80 else:81 run_id_container["result"] = {82 "status": "unknown",83 "run_id": run_id_container.get("run_id"),84 "cancelled": False,85 "content": ("".join(content_pieces)[:200] + "...")86 if content_pieces87 else "No content",88 }8990 except Exception as e:91 print(f"\n> Exception in run: {str(e)}")92 run_id_container["result"] = {93 "status": "error",94 "error": str(e),95 "run_id": run_id_container.get("run_id"),96 "cancelled": True,97 "content": "Error occurred",98 }99100101def cancel_after_delay(102 workflow: Workflow, run_id_container: dict, delay_seconds: int = 3103):104 """105 Cancel the workflow run after a specified delay.106107 Args:108 workflow: The workflow whose run should be cancelled109 run_id_container: Dictionary containing the run_id to cancel110 delay_seconds: How long to wait before cancelling111 """112 print(f"> Will cancel workflow run in {delay_seconds} seconds...")113 time.sleep(delay_seconds)114115 run_id = run_id_container.get("run_id")116 if run_id:117 print(f"> Cancelling workflow run: {run_id}")118 success = workflow.cancel_run(run_id)119 if success:120 print(f"> Workflow run {run_id} marked for cancellation")121 else:122 print(123 f"> Failed to cancel workflow run {run_id} (may not exist or already completed)"124 )125 else:126 print("> No run_id found to cancel")127128129def main():130 """Main function demonstrating workflow run cancellation."""131132 # Create workflow agents133 researcher = Agent(134 name="Research Agent",135 model=OpenAIResponses(id="gpt-5.2"),136 tools=[HackerNewsTools()],137 instructions="Research the given topic and provide key facts and insights.",138 )139140 writer = Agent(141 name="Writing Agent",142 model=OpenAIResponses(id="gpt-5.2"),143 instructions="Write a comprehensive article based on the research provided. Make it engaging and well-structured.",144 )145 research_step = Step(146 name="research",147 agent=researcher,148 description="Research the topic and gather information",149 )150151 writing_step = Step(152 name="writing",153 agent=writer,154 description="Write an article based on the research",155 )156157 # Create a Steps sequence that chains these above steps together158 article_workflow = Workflow(159 description="Automated article creation from research to writing",160 steps=[research_step, writing_step],161 debug_mode=True,162 )163164 print("> Starting workflow run cancellation example...")165 print("=" * 50)166167 # Container to share run_id between threads168 run_id_container = {}169170 # Start the workflow run in a separate thread171 workflow_thread = threading.Thread(172 target=lambda: long_running_task(article_workflow, run_id_container),173 name="WorkflowRunThread",174 )175176 # Start the cancellation thread177 cancel_thread = threading.Thread(178 target=cancel_after_delay,179 args=(article_workflow, run_id_container, 8), # Cancel after 8 seconds180 name="CancelThread",181 )182183 # Start both threads184 print("> Starting workflow run thread...")185 workflow_thread.start()186187 print("> Starting cancellation thread...")188 cancel_thread.start()189190 # Wait for both threads to complete191 print("> Waiting for threads to complete...")192 workflow_thread.join()193 cancel_thread.join()194195 # Print the results196 print("\n" + "=" * 50)197 print("> RESULTS:")198 print("=" * 50)199200 result = run_id_container.get("result")201 if result:202 print(f"Status: {result['status']}")203 print(f"Run ID: {result['run_id']}")204 print(f"Was Cancelled: {result['cancelled']}")205206 if result.get("error"):207 print(f"Error: {result['error']}")208 else:209 print(f"Content Preview: {result['content']}")210211 if result["cancelled"]:212 print("\n> SUCCESS: Workflow run was successfully cancelled!")213 else:214 print("\n> WARNING: Workflow run completed before cancellation")215 else:216 print("> No result obtained - check if cancellation happened during streaming")217218 print("\n> Workflow cancellation example completed!")219220221if __name__ == "__main__":222 # Run the main example223 main()