Workflow Run Cancellation
Cancel a running workflow execution from another thread.
This example demonstrates how to cancel a running workflow execution by starting a workflow run in a separate thread and cancelling it from another thread. It shows proper handling of cancelled responses and thread management.
Example
1"""2Example demonstrating how to cancel a running workflow execution.34This example shows how to:51. Start a workflow run in a separate thread62. Cancel the run from another thread73. Handle the cancelled response8"""910import threading11import time1213from kern.agent import Agent14from kern.models.openai import OpenAIResponses15from kern.run.agent import RunEvent16from kern.run.base import RunStatus17from kern.run.workflow import WorkflowRunEvent18from kern.tools.hackernews import HackerNewsTools19from kern.workflow.step import Step20from kern.workflow.workflow import Workflow212223def long_running_task(workflow: Workflow, run_id_container: dict):24 """25 Simulate a long-running workflow task that can be cancelled.2627 Args:28 workflow: The workflow to run29 run_id_container: Dictionary to store the run_id for cancellation3031 Returns:32 Dictionary with run results and status33 """34 try:35 # Start the workflow run - this simulates a long task36 final_response = None37 content_pieces = []3839 for chunk in workflow.run(40 "Write a very long story about a dragon who learns to code. "41 "Make it at least 2000 words with detailed descriptions and dialogue. "42 "Take your time and be very thorough.",43 stream=True,44 ):45 if "run_id" not in run_id_container and chunk.run_id:46 print(f"Workflow run started: {chunk.run_id}")47 run_id_container["run_id"] = chunk.run_id4849 if chunk.event in [RunEvent.run_content]:50 print(chunk.content, end="", flush=True)51 content_pieces.append(chunk.content)52 elif chunk.event == RunEvent.run_cancelled:53 print(f"\nWorkflow run was cancelled: {chunk.run_id}")54 run_id_container["result"] = {55 "status": "cancelled",56 "run_id": chunk.run_id,57 "cancelled": True,58 "content": "".join(content_pieces)[:200] + "..."59 if content_pieces60 else "No content before cancellation",61 }62 return63 elif chunk.event == WorkflowRunEvent.workflow_cancelled:64 print(f"\nWorkflow run was cancelled: {chunk.run_id}")65 run_id_container["result"] = {66 "status": "cancelled",67 "run_id": chunk.run_id,68 "cancelled": True,69 "content": "".join(content_pieces)[:200] + "..."70 if content_pieces71 else "No content before cancellation",72 }73 return74 elif hasattr(chunk, "status") and chunk.status == RunStatus.completed:75 final_response = chunk7677 # If we get here, the run completed successfully78 if final_response:79 run_id_container["result"] = {80 "status": final_response.status.value81 if final_response.status82 else "completed",83 "run_id": final_response.run_id,84 "cancelled": final_response.status == RunStatus.cancelled,85 "content": ("".join(content_pieces)[:200] + "...")86 if content_pieces87 else "No content",88 }89 else:90 run_id_container["result"] = {91 "status": "unknown",92 "run_id": run_id_container.get("run_id"),93 "cancelled": False,94 "content": ("".join(content_pieces)[:200] + "...")95 if content_pieces96 else "No content",97 }9899 except Exception as e:100 print(f"\nException in run: {str(e)}")101 run_id_container["result"] = {102 "status": "error",103 "error": str(e),104 "run_id": run_id_container.get("run_id"),105 "cancelled": True,106 "content": "Error occurred",107 }108109110def cancel_after_delay(111 workflow: Workflow, run_id_container: dict, delay_seconds: int = 3112):113 """114 Cancel the workflow run after a specified delay.115116 Args:117 workflow: The workflow whose run should be cancelled118 run_id_container: Dictionary containing the run_id to cancel119 delay_seconds: How long to wait before cancelling120 """121 print(f"Will cancel workflow run in {delay_seconds} seconds...")122 time.sleep(delay_seconds)123124 run_id = run_id_container.get("run_id")125 if run_id:126 print(f"Cancelling workflow run: {run_id}")127 success = workflow.cancel_run(run_id)128 if success:129 print(f"Workflow run {run_id} marked for cancellation")130 else:131 print(132 f"Failed to cancel workflow run {run_id} (may not exist or already completed)"133 )134 else:135 print("No run_id found to cancel")136137138def main():139 """Main function demonstrating workflow run cancellation."""140141 # Create workflow agents142 researcher = Agent(143 name="Research Agent",144 model=OpenAIResponses(id="gpt-5.2"),145 tools=[HackerNewsTools()],146 instructions="Research the given topic and provide key facts and insights.",147 )148149 writer = Agent(150 name="Writing Agent",151 model=OpenAIResponses(id="gpt-5.2"),152 instructions="Write a comprehensive article based on the research provided. Make it engaging and well-structured.",153 )154 research_step = Step(155 name="research",156 agent=researcher,157 description="Research the topic and gather information",158 )159160 writing_step = Step(161 name="writing",162 agent=writer,163 description="Write an article based on the research",164 )165166 # Create a Steps sequence that chains these above steps together167 article_workflow = Workflow(168 description="Automated article creation from research to writing",169 steps=[research_step, writing_step],170 debug_mode=True,171 )172173 print("Starting workflow run cancellation example...")174 print("=" * 50)175176 # Container to share run_id between threads177 run_id_container = {}178179 # Start the workflow run in a separate thread180 workflow_thread = threading.Thread(181 target=lambda: long_running_task(article_workflow, run_id_container),182 name="WorkflowRunThread",183 )184185 # Start the cancellation thread186 cancel_thread = threading.Thread(187 target=cancel_after_delay,188 args=(article_workflow, run_id_container, 8), # Cancel after 8 seconds189 name="CancelThread",190 )191192 # Start both threads193 print("Starting workflow run thread...")194 workflow_thread.start()195196 print("Starting cancellation thread...")197 cancel_thread.start()198199 # Wait for both threads to complete200 print("Waiting for threads to complete...")201 workflow_thread.join()202 cancel_thread.join()203204 # Print the results205 print("\n" + "=" * 50)206 print("RESULTS:")207 print("=" * 50)208209 result = run_id_container.get("result")210 if result:211 print(f"Status: {result['status']}")212 print(f"Run ID: {result['run_id']}")213 print(f"Was Cancelled: {result['cancelled']}")214215 if result.get("error"):216 print(f"Error: {result['error']}")217 else:218 print(f"Content Preview: {result['content']}")219220 if result["cancelled"]:221 print("\nSUCCESS: Workflow run was successfully cancelled!")222 else:223 print("\nWARNING: Workflow run completed before cancellation")224 else:225 print("No result obtained - check if cancellation happened during streaming")226227 print("\nWorkflow cancellation example completed!")228229230if __name__ == "__main__":231 # Run the main example232 main()API Endpoint
Workflow runs can be cancelled via the AgentOS API:
1POST /workflows/{workflow_id}/runs/{run_id}/cancelExample:
1curl --location 'http://localhost:7777/workflows/analysis-workflow/runs/789/cancel' \2 --request POSTReference: Cancel Workflow Run API