Team Run Cancellation

Cancel a running team execution from another thread.

This example demonstrates how to cancel a running team execution by starting a team run in a separate thread and cancelling it from another thread. It shows proper handling of cancelled responses and thread management.

Example

1"""
2Example demonstrating how to cancel a running team execution.
3
4This example shows how to:
51. Start a team run in a separate thread
62. Cancel the run from another thread
73. Handle the cancelled response
8"""
9
10import threading
11import time
12
13from kern.agent import Agent
14from kern.models.openai import OpenAIResponses
15from kern.run.agent import RunEvent
16from kern.run.base import RunStatus
17from kern.run.team import TeamRunEvent
18from kern.team import Team
19
20
21def long_running_task(team: Team, run_id_container: dict):
22 """
23 Simulate a long-running team task that can be cancelled.
24
25 Args:
26 team: The team to run
27 run_id_container: Dictionary to store the run_id for cancellation
28
29 Returns:
30 Dictionary with run results and status
31 """
32 try:
33 # Start the team run - this simulates a long task
34 final_response = None
35 content_pieces = []
36
37 for chunk in team.run(
38 "Write a very long story about a dragon who learns to code. "
39 "Make it at least 2000 words with detailed descriptions and dialogue. "
40 "Take your time and be very thorough.",
41 stream=True,
42 ):
43 if "run_id" not in run_id_container and chunk.run_id:
44 print(f"Team run started: {chunk.run_id}")
45 run_id_container["run_id"] = chunk.run_id
46
47 if chunk.event in [TeamRunEvent.run_content, RunEvent.run_content]:
48 print(chunk.content, end="", flush=True)
49 content_pieces.append(chunk.content)
50 elif chunk.event == RunEvent.run_cancelled:
51 print(f"\nMember run was cancelled: {chunk.run_id}")
52 run_id_container["result"] = {
53 "status": "cancelled",
54 "run_id": chunk.run_id,
55 "cancelled": True,
56 "content": "".join(content_pieces)[:200] + "..."
57 if content_pieces
58 else "No content before cancellation",
59 }
60 return
61 elif chunk.event == TeamRunEvent.run_cancelled:
62 print(f"\nTeam run was cancelled: {chunk.run_id}")
63 run_id_container["result"] = {
64 "status": "cancelled",
65 "run_id": chunk.run_id,
66 "cancelled": True,
67 "content": "".join(content_pieces)[:200] + "..."
68 if content_pieces
69 else "No content before cancellation",
70 }
71 return
72 elif hasattr(chunk, "status") and chunk.status == RunStatus.completed:
73 final_response = chunk
74
75 # If we get here, the run completed successfully
76 if final_response:
77 run_id_container["result"] = {
78 "status": final_response.status.value
79 if final_response.status
80 else "completed",
81 "run_id": final_response.run_id,
82 "cancelled": final_response.status == RunStatus.cancelled,
83 "content": ("".join(content_pieces)[:200] + "...")
84 if content_pieces
85 else "No content",
86 }
87 else:
88 run_id_container["result"] = {
89 "status": "unknown",
90 "run_id": run_id_container.get("run_id"),
91 "cancelled": False,
92 "content": ("".join(content_pieces)[:200] + "...")
93 if content_pieces
94 else "No content",
95 }
96
97 except Exception as e:
98 print(f"\nException in run: {str(e)}")
99 run_id_container["result"] = {
100 "status": "error",
101 "error": str(e),
102 "run_id": run_id_container.get("run_id"),
103 "cancelled": True,
104 "content": "Error occurred",
105 }
106
107
108def cancel_after_delay(team: Team, run_id_container: dict, delay_seconds: int = 3):
109 """
110 Cancel the team run after a specified delay.
111
112 Args:
113 team: The team whose run should be cancelled
114 run_id_container: Dictionary containing the run_id to cancel
115 delay_seconds: How long to wait before cancelling
116 """
117 print(f"Will cancel team run in {delay_seconds} seconds...")
118 time.sleep(delay_seconds)
119
120 run_id = run_id_container.get("run_id")
121 if run_id:
122 print(f"Cancelling team run: {run_id}")
123 success = team.cancel_run(run_id)
124 if success:
125 print(f"Team run {run_id} marked for cancellation")
126 else:
127 print(
128 f"Failed to cancel team run {run_id} (may not exist or already completed)"
129 )
130 else:
131 print("No run_id found to cancel")
132
133
134def main():
135 """Main function demonstrating team run cancellation."""
136
137 # Create team members
138 storyteller_agent = Agent(
139 name="StorytellerAgent",
140 model=OpenAIResponses(id="gpt-5.2"),
141 description="An agent that writes creative stories",
142 )
143
144 editor_agent = Agent(
145 name="EditorAgent",
146 model=OpenAIResponses(id="gpt-5.2"),
147 description="An agent that reviews and improves stories",
148 )
149
150 # Initialize the team with agents
151 team = Team(
152 name="Storytelling Team",
153 members=[storyteller_agent, editor_agent],
154 model=OpenAIResponses(id="gpt-5.2"), # Team leader model
155 description="A team that collaborates to write detailed stories",
156 )
157
158 print("Starting team run cancellation example...")
159 print("=" * 50)
160
161 # Container to share run_id between threads
162 run_id_container = {}
163
164 # Start the team run in a separate thread
165 team_thread = threading.Thread(
166 target=lambda: long_running_task(team, run_id_container), name="TeamRunThread"
167 )
168
169 # Start the cancellation thread
170 cancel_thread = threading.Thread(
171 target=cancel_after_delay,
172 args=(team, run_id_container, 8), # Cancel after 8 seconds
173 name="CancelThread",
174 )
175
176 # Start both threads
177 print("Starting team run thread...")
178 team_thread.start()
179
180 print("Starting cancellation thread...")
181 cancel_thread.start()
182
183 # Wait for both threads to complete
184 print("Waiting for threads to complete...")
185 team_thread.join()
186 cancel_thread.join()
187
188 # Print the results
189 print("\n" + "=" * 50)
190 print("RESULTS:")
191 print("=" * 50)
192
193 result = run_id_container.get("result")
194 if result:
195 print(f"Status: {result['status']}")
196 print(f"Run ID: {result['run_id']}")
197 print(f"Was Cancelled: {result['cancelled']}")
198
199 if result.get("error"):
200 print(f"Error: {result['error']}")
201 else:
202 print(f"Content Preview: {result['content']}")
203
204 if result["cancelled"]:
205 print("\nSUCCESS: Team run was successfully cancelled!")
206 else:
207 print("\nWARNING: Team run completed before cancellation")
208 else:
209 print("No result obtained - check if cancellation happened during streaming")
210
211 print("\nTeam cancellation example completed!")
212
213
214if __name__ == "__main__":
215 # Run the main example
216 main()

API Endpoint

Team runs can be cancelled via the AgentOS API:

1POST /teams/{team_id}/runs/{run_id}/cancel

Example:

1curl --location 'http://localhost:7777/teams/storytelling-team/runs/456/cancel' \
2 --request POST

Reference: Cancel Team Run API