Example demonstrating a custom cancellation manager.
Shows how to extend BaseRunCancellationManager to implement your own cancellation backend (e.g., a database, a message queue, an API, etc.).
1"""2Example demonstrating a custom cancellation manager.34Shows how to extend BaseRunCancellationManager to implement your own5cancellation backend (e.g., a database, a message queue, an API, etc.).67This example creates a file-based cancellation manager that persists8cancellation state to a JSON file, which could be shared across processes9via a network filesystem.1011Usage:12 .venvs/demo/bin/python cookbook/02_agents/other/custom_cancellation_manager.py13"""1415import json16import tempfile17import threading18import time19from pathlib import Path20from typing import Dict2122from kern.agent import Agent23from kern.exceptions import RunCancelledException24from kern.models.openai import OpenAIResponses25from kern.run.agent import RunEvent26from kern.run.cancel import set_cancellation_manager27from kern.run.cancellation_management.base import BaseRunCancellationManager2829# ---------------------------------------------------------------------------30# Create Custom Cancellation Manager31# ---------------------------------------------------------------------------323334class FileBasedCancellationManager(BaseRunCancellationManager):35 """A cancellation manager that persists state to a JSON file.3637 This is a simple example showing how to build a custom backend.38 In production, you might use a database, Redis, or an API instead.39 """4041 def __init__(self, file_path: str):42 self._file_path = Path(file_path)43 self._lock = threading.Lock()44 # Initialize file if it doesn't exist45 if not self._file_path.exists():46 self._write_state({})4748 def _read_state(self) -> Dict[str, bool]:49 """Read the cancellation state from the file."""50 try:51 return json.loads(self._file_path.read_text())52 except (json.JSONDecodeError, FileNotFoundError):53 return {}5455 def _write_state(self, state: Dict[str, bool]) -> None:56 """Write the cancellation state to the file."""57 self._file_path.write_text(json.dumps(state, indent=2))5859 def register_run(self, run_id: str) -> None:60 with self._lock:61 state = self._read_state()62 # Use setdefault to preserve cancel-before-start intent63 state.setdefault(run_id, False)64 self._write_state(state)6566 async def aregister_run(self, run_id: str) -> None:67 self.register_run(run_id)6869 def cancel_run(self, run_id: str) -> bool:70 with self._lock:71 state = self._read_state()72 was_registered = run_id in state73 state[run_id] = True74 self._write_state(state)75 return was_registered7677 async def acancel_run(self, run_id: str) -> bool:78 return self.cancel_run(run_id)7980 def is_cancelled(self, run_id: str) -> bool:81 state = self._read_state()82 return state.get(run_id, False)8384 async def ais_cancelled(self, run_id: str) -> bool:85 return self.is_cancelled(run_id)8687 def cleanup_run(self, run_id: str) -> None:88 with self._lock:89 state = self._read_state()90 state.pop(run_id, None)91 self._write_state(state)9293 async def acleanup_run(self, run_id: str) -> None:94 self.cleanup_run(run_id)9596 def raise_if_cancelled(self, run_id: str) -> None:97 if self.is_cancelled(run_id):98 raise RunCancelledException(f"Run {run_id} was cancelled")99100 async def araise_if_cancelled(self, run_id: str) -> None:101 self.raise_if_cancelled(run_id)102103 def get_active_runs(self) -> Dict[str, bool]:104 return self._read_state()105106 async def aget_active_runs(self) -> Dict[str, bool]:107 return self.get_active_runs()108109110# ---------------------------------------------------------------------------111# Run the Example112# ---------------------------------------------------------------------------113114115def main():116 """Demonstrate the custom file-based cancellation manager."""117118 # Create a temporary file for cancellation state119 with tempfile.NamedTemporaryFile(suffix=".json", delete=False, mode="w") as f:120 state_file = f.name121 f.write("{}")122123 print(f"Cancellation state file: {state_file}")124 print("=" * 50)125126 # Set up the custom cancellation manager127 manager = FileBasedCancellationManager(file_path=state_file)128 set_cancellation_manager(manager)129 print("Custom file-based cancellation manager configured\n")130131 # Create an agent132 agent = Agent(133 name="StoryAgent",134 model=OpenAIResponses(id="gpt-5-mini"),135 description="An agent that writes stories",136 )137138 # Container for sharing state between threads139 run_id_container: dict = {}140141 def run_agent():142 content_pieces = []143 for chunk in agent.run(144 "Write a long story about a wizard learning Python programming. "145 "Make it detailed with lots of dialogue.",146 stream=True,147 ):148 if "run_id" not in run_id_container and chunk.run_id:149 run_id_container["run_id"] = chunk.run_id150151 if chunk.event == RunEvent.run_content:152 print(chunk.content, end="", flush=True)153 content_pieces.append(chunk.content)154 elif chunk.event == RunEvent.run_cancelled:155 print(f"\n\n[CANCELLED] Run was cancelled: {chunk.run_id}")156 run_id_container["cancelled"] = True157 return158159 run_id_container["cancelled"] = False160161 def cancel_after_delay():162 time.sleep(5)163 run_id = run_id_container.get("run_id")164 if run_id:165 print(f"\n\n[CANCEL] Cancelling run {run_id} via file-based manager...")166167 # Show the state file before cancellation168 state = manager.get_active_runs()169 print(f"[STATE] Before cancel: {state}")170171 agent.cancel_run(run_id)172173 # Show the state file after cancellation174 state = manager.get_active_runs()175 print(f"[STATE] After cancel: {state}")176177 # Start both threads178 agent_thread = threading.Thread(target=run_agent)179 cancel_thread = threading.Thread(target=cancel_after_delay)180181 agent_thread.start()182 cancel_thread.start()183184 agent_thread.join()185 cancel_thread.join()186187 # Final state188 print("\n" + "=" * 50)189 print("RESULTS:")190 print(f" Was cancelled: {run_id_container.get('cancelled', 'unknown')}")191 print(f" Final state file contents: {manager.get_active_runs()}")192193 # Cleanup194 Path(state_file).unlink(missing_ok=True)195 print("\nExample completed!")196197198if __name__ == "__main__":199 main()Run the Example
1# Clone and setup repo2git clone https://github.com/kern-ai/kern.git3cd kern/cookbook/02_agents/14_advanced45# Create and activate virtual environment6./scripts/demo_setup.sh7source .venvs/demo/bin/activate89python custom_cancellation_manager.py