Example demonstrating a custom cancellation manager.

Shows how to extend BaseRunCancellationManager to implement your own cancellation backend (e.g., a database, a message queue, an API, etc.).

1"""
2Example demonstrating a custom cancellation manager.
3
4Shows how to extend BaseRunCancellationManager to implement your own
5cancellation backend (e.g., a database, a message queue, an API, etc.).
6
7This example creates a file-based cancellation manager that persists
8cancellation state to a JSON file, which could be shared across processes
9via a network filesystem.
10
11Usage:
12 .venvs/demo/bin/python cookbook/02_agents/other/custom_cancellation_manager.py
13"""
14
15import json
16import tempfile
17import threading
18import time
19from pathlib import Path
20from typing import Dict
21
22from kern.agent import Agent
23from kern.exceptions import RunCancelledException
24from kern.models.openai import OpenAIResponses
25from kern.run.agent import RunEvent
26from kern.run.cancel import set_cancellation_manager
27from kern.run.cancellation_management.base import BaseRunCancellationManager
28
29# ---------------------------------------------------------------------------
30# Create Custom Cancellation Manager
31# ---------------------------------------------------------------------------
32
33
34class FileBasedCancellationManager(BaseRunCancellationManager):
35 """A cancellation manager that persists state to a JSON file.
36
37 This is a simple example showing how to build a custom backend.
38 In production, you might use a database, Redis, or an API instead.
39 """
40
41 def __init__(self, file_path: str):
42 self._file_path = Path(file_path)
43 self._lock = threading.Lock()
44 # Initialize file if it doesn't exist
45 if not self._file_path.exists():
46 self._write_state({})
47
48 def _read_state(self) -> Dict[str, bool]:
49 """Read the cancellation state from the file."""
50 try:
51 return json.loads(self._file_path.read_text())
52 except (json.JSONDecodeError, FileNotFoundError):
53 return {}
54
55 def _write_state(self, state: Dict[str, bool]) -> None:
56 """Write the cancellation state to the file."""
57 self._file_path.write_text(json.dumps(state, indent=2))
58
59 def register_run(self, run_id: str) -> None:
60 with self._lock:
61 state = self._read_state()
62 # Use setdefault to preserve cancel-before-start intent
63 state.setdefault(run_id, False)
64 self._write_state(state)
65
66 async def aregister_run(self, run_id: str) -> None:
67 self.register_run(run_id)
68
69 def cancel_run(self, run_id: str) -> bool:
70 with self._lock:
71 state = self._read_state()
72 was_registered = run_id in state
73 state[run_id] = True
74 self._write_state(state)
75 return was_registered
76
77 async def acancel_run(self, run_id: str) -> bool:
78 return self.cancel_run(run_id)
79
80 def is_cancelled(self, run_id: str) -> bool:
81 state = self._read_state()
82 return state.get(run_id, False)
83
84 async def ais_cancelled(self, run_id: str) -> bool:
85 return self.is_cancelled(run_id)
86
87 def cleanup_run(self, run_id: str) -> None:
88 with self._lock:
89 state = self._read_state()
90 state.pop(run_id, None)
91 self._write_state(state)
92
93 async def acleanup_run(self, run_id: str) -> None:
94 self.cleanup_run(run_id)
95
96 def raise_if_cancelled(self, run_id: str) -> None:
97 if self.is_cancelled(run_id):
98 raise RunCancelledException(f"Run {run_id} was cancelled")
99
100 async def araise_if_cancelled(self, run_id: str) -> None:
101 self.raise_if_cancelled(run_id)
102
103 def get_active_runs(self) -> Dict[str, bool]:
104 return self._read_state()
105
106 async def aget_active_runs(self) -> Dict[str, bool]:
107 return self.get_active_runs()
108
109
110# ---------------------------------------------------------------------------
111# Run the Example
112# ---------------------------------------------------------------------------
113
114
115def main():
116 """Demonstrate the custom file-based cancellation manager."""
117
118 # Create a temporary file for cancellation state
119 with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as f:
120 state_file = f.name
121 f.write("{}")
122
123 print(f"Cancellation state file: {state_file}")
124 print("=" * 50)
125
126 # Set up the custom cancellation manager
127 manager = FileBasedCancellationManager(file_path=state_file)
128 set_cancellation_manager(manager)
129 print("Custom file-based cancellation manager configured\n")
130
131 # Create an agent
132 agent = Agent(
133 name="StoryAgent",
134 model=OpenAIResponses(id="gpt-5-mini"),
135 description="An agent that writes stories",
136 )
137
138 # Container for sharing state between threads
139 run_id_container: dict = {}
140
141 def run_agent():
142 content_pieces = []
143 for chunk in agent.run(
144 "Write a long story about a wizard learning Python programming. "
145 "Make it detailed with lots of dialogue.",
146 stream=True,
147 ):
148 if "run_id" not in run_id_container and chunk.run_id:
149 run_id_container["run_id"] = chunk.run_id
150
151 if chunk.event == RunEvent.run_content:
152 print(chunk.content, end="", flush=True)
153 content_pieces.append(chunk.content)
154 elif chunk.event == RunEvent.run_cancelled:
155 print(f"\n\n[CANCELLED] Run was cancelled: {chunk.run_id}")
156 run_id_container["cancelled"] = True
157 return
158
159 run_id_container["cancelled"] = False
160
161 def cancel_after_delay():
162 time.sleep(5)
163 run_id = run_id_container.get("run_id")
164 if run_id:
165 print(f"\n\n[CANCEL] Cancelling run {run_id} via file-based manager...")
166
167 # Show the state file before cancellation
168 state = manager.get_active_runs()
169 print(f"[STATE] Before cancel: {state}")
170
171 agent.cancel_run(run_id)
172
173 # Show the state file after cancellation
174 state = manager.get_active_runs()
175 print(f"[STATE] After cancel: {state}")
176
177 # Start both threads
178 agent_thread = threading.Thread(target=run_agent)
179 cancel_thread = threading.Thread(target=cancel_after_delay)
180
181 agent_thread.start()
182 cancel_thread.start()
183
184 agent_thread.join()
185 cancel_thread.join()
186
187 # Final state
188 print("\n" + "=" * 50)
189 print("RESULTS:")
190 print(f" Was cancelled: {run_id_container.get('cancelled', 'unknown')}")
191 print(f" Final state file contents: {manager.get_active_runs()}")
192
193 # Cleanup
194 Path(state_file).unlink(missing_ok=True)
195 print("\nExample completed!")
196
197
198if __name__ == "__main__":
199 main()

Run the Example

1# Clone and setup repo
2git clone https://github.com/kern-ai/kern.git
3cd kern/cookbook/02_agents/14_advanced
4
5# Create and activate virtual environment
6./scripts/demo_setup.sh
7source .venvs/demo/bin/activate
8
9python custom_cancellation_manager.py