WebSocket Reconnect
Tests reconnect behavior for a running workflow: initial subscription, disconnection, reconnect, and missed-event catch-up.
1"""2WebSocket Reconnect3===================45Tests reconnect behavior for a running workflow: initial subscription, disconnection, reconnect, and missed-event catch-up.6"""78import asyncio9import json10from typing import Optional1112# ---------------------------------------------------------------------------13# Setup14# ---------------------------------------------------------------------------15try:16 import websockets17except ImportError:18 print("websockets library not installed. Install with: uv pip install websockets")19 exit(1)202122# ---------------------------------------------------------------------------23# Define Helpers24# ---------------------------------------------------------------------------25def parse_sse_message(message: str) -> dict:26 lines = message.strip().split("\n")27 data_line = None2829 for line in lines:30 if line.startswith("data: "):31 data_line = line[6:]32 break3334 if data_line:35 return json.loads(data_line)36 return json.loads(message)373839# ---------------------------------------------------------------------------40# Create WebSocket Tester41# ---------------------------------------------------------------------------42class WorkflowWebSocketTester:43 def __init__(self, ws_url: str = "ws://localhost:7777/workflows/ws"):44 self.ws_url = ws_url45 self.run_id: Optional[str] = None46 self.last_event_index: Optional[int] = None47 self.received_events = []4849 async def test_workflow_execution_with_reconnection(self) -> None:50 print("\n" + "=" * 80)51 print("WebSocket Reconnection Test")52 print("=" * 80)5354 print("\nPhase 1: Starting workflow and receiving initial events...")55 await self._phase1_start_workflow()5657 print("\nSimulating user leaving page for 3 seconds...")58 await asyncio.sleep(3)5960 print("\nPhase 2: Reconnecting to workflow...")61 await self._phase2_reconnect()6263 print("\nTest completed")64 self._print_summary()6566 async def _phase1_start_workflow(self) -> None:67 try:68 async with websockets.connect(self.ws_url) as websocket:69 print(f"[OK] Connected to {self.ws_url}")7071 response = await websocket.recv()72 data = parse_sse_message(response)73 print(f"[OK] Server: {data.get('message', 'Connected')}")7475 print("\nSending: start-workflow action")76 await websocket.send(77 json.dumps(78 {79 "action": "start-workflow",80 "workflow_id": "content-creation-workflow",81 "message": "Research and create content plan for AI agents",82 "session_id": "test-session-123",83 }84 )85 )8687 event_count = 088 max_initial_events = 208990 print("\nReceiving initial events:")91 async for message in websocket:92 data = parse_sse_message(message)93 event_type = data.get("event")9495 if "run_id" in data and not self.run_id:96 self.run_id = data["run_id"]97 if "event_index" in data:98 self.last_event_index = data["event_index"]99100 self.received_events.append(data)101 event_count += 1102103 event_index = data.get("event_index", "N/A")104 print(105 f" [{event_count}] event_index={event_index}, event={event_type}"106 )107108 if event_type in ["WorkflowCompleted", "WorkflowError"]:109 print(110 f"\nWorkflow finished during initial connection: {event_type}"111 )112 break113114 if event_count >= max_initial_events:115 print(116 f"\nSimulating disconnect after {event_count} events "117 f"(last_event_index={self.last_event_index})"118 )119 break120121 except Exception as e:122 print(f"Error in Phase 1: {e}")123 raise124125 async def _phase2_reconnect(self) -> None:126 if not self.run_id:127 print("No run_id found, cannot reconnect")128 return129130 try:131 async with websockets.connect(self.ws_url) as websocket:132 print(f"[OK] Reconnected to {self.ws_url}")133134 response = await websocket.recv()135 data = parse_sse_message(response)136 print(f"[OK] Server: {data.get('message', 'Connected')}")137138 print(139 f"\nSending: reconnect action (run_id={self.run_id}, "140 f"last_event_index={self.last_event_index})"141 )142 await websocket.send(143 json.dumps(144 {145 "action": "reconnect",146 "run_id": self.run_id,147 "last_event_index": self.last_event_index,148 "workflow_id": "content-creation-workflow",149 "session_id": "test-session-123",150 }151 )152 )153154 print("\nReceiving events after reconnection:")155 event_count = 0156 missed_events_count = 0157158 async for message in websocket:159 data = parse_sse_message(message)160 event_type = data.get("event")161162 if "event_index" in data:163 self.last_event_index = data["event_index"]164165 self.received_events.append(data)166 event_count += 1167168 if event_type == "catch_up":169 missed_events_count = data.get("missed_events", 0)170 print(f"catch_up: {missed_events_count} missed events")171 print(172 f"status={data.get('status')}, current_event_count={data.get('current_event_count')}"173 )174 continue175 if event_type == "replay":176 print(177 f"replay: status={data.get('status')}, total_events={data.get('total_events')}"178 )179 print(f"message={data.get('message')}")180 continue181 if event_type == "subscribed":182 print(f"subscribed: status={data.get('status')}")183 print(f"current_event_count={data.get('current_event_count')}")184 print("\nNow listening for NEW events as workflow continues...")185 continue186 if event_type == "error":187 print(f"ERROR: {data.get('error', 'Unknown error')}")188 print(f"Full data: {data}")189 continue190191 event_index = data.get("event_index", "N/A")192 is_missed = event_count <= missed_events_count193 marker = "MISSED" if is_missed else "NEW"194 print(195 f" [{event_count}] {marker} event_index={event_index}, event={event_type}"196 )197198 if event_type in ["WorkflowCompleted", "WorkflowError"]:199 print(f"\nWorkflow finished: {event_type}")200 break201202 print("\nWebSocket connection closed (workflow may have completed)")203204 except asyncio.TimeoutError:205 print("\nTimeout waiting for events (30s). Workflow may still be running.")206 except Exception as e:207 print(f"Error in Phase 2: {e}")208 raise209210 def _print_summary(self) -> None:211 print("\n" + "=" * 80)212 print("Test Summary")213 print("=" * 80)214 print(f"Run ID: {self.run_id}")215 print(f"Last Event Index: {self.last_event_index}")216 print(f"Total Events Received: {len(self.received_events)}")217218 event_types = {}219 for event in self.received_events:220 event_type = event.get("event", "unknown")221 event_types[event_type] = event_types.get(event_type, 0) + 1222223 print("\nEvent Type Breakdown:")224 for event_type, count in sorted(event_types.items()):225 print(f" {event_type}: {count}")226227 print("\nEvent Index Validation:")228 event_indices = [229 e.get("event_index") for e in self.received_events if "event_index" in e230 ]231 if event_indices:232 print(f" First event_index: {min(event_indices)}")233 print(f" Last event_index: {max(event_indices)}")234 print(f" Total with event_index: {len(event_indices)}")235236 expected = set(range(min(event_indices), max(event_indices) + 1))237 actual = set(event_indices)238 gaps = expected - actual239 if gaps:240 print(f"Gaps in event_index: {sorted(gaps)}")241 else:242 print("No gaps in event_index (all events received)")243 else:244 print("No events with event_index found")245246 print("=" * 80)247248249# ---------------------------------------------------------------------------250# Run Workflow251# ---------------------------------------------------------------------------252async def main() -> None:253 print("\nStarting WebSocket Reconnection Test")254 print("Prerequisites:")255 print(" 1. AgentOS server should be running at http://localhost:7777")256 print(" 2. Run: python cookbook/agent_os/workflow/basic_workflow.py")257 print("\nStarting test in 2 seconds...")258 await asyncio.sleep(2)259260 tester = WorkflowWebSocketTester()261 try:262 await tester.test_workflow_execution_with_reconnection()263 except ConnectionRefusedError:264 print("\nConnection refused. Is the AgentOS server running?")265 print(" Start it with: python cookbook/agent_os/workflow/basic_workflow.py")266 except Exception as e:267 print(f"\nTest failed: {e}")268 import traceback269270 traceback.print_exc()271272273if __name__ == "__main__":274 asyncio.run(main())Run the Example
1# Clone and setup repo2git clone https://github.com/kern-ai/kern.git3cd kern/cookbook/04_workflows/06_advanced_concepts/long_running45# Create and activate virtual environment6./scripts/demo_setup.sh7source .venvs/demo/bin/activate89python websocket_reconnect.py