WebSocket Reconnect

Tests reconnect behavior for a running workflow: initial subscription, disconnection, reconnect, and missed-event catch-up.

1"""
2WebSocket Reconnect
3===================
4
5Tests reconnect behavior for a running workflow: initial subscription, disconnection, reconnect, and missed-event catch-up.
6"""
7
8import asyncio
9import json
10from typing import Optional
11
12# ---------------------------------------------------------------------------
13# Setup
14# ---------------------------------------------------------------------------
15try:
16 import websockets
17except ImportError:
18 print("websockets library not installed. Install with: uv pip install websockets")
19 exit(1)
20
21
22# ---------------------------------------------------------------------------
23# Define Helpers
24# ---------------------------------------------------------------------------
25def parse_sse_message(message: str) -> dict:
26 lines = message.strip().split("\n")
27 data_line = None
28
29 for line in lines:
30 if line.startswith("data: "):
31 data_line = line[6:]
32 break
33
34 if data_line:
35 return json.loads(data_line)
36 return json.loads(message)
37
38
39# ---------------------------------------------------------------------------
40# Create WebSocket Tester
41# ---------------------------------------------------------------------------
42class WorkflowWebSocketTester:
43 def __init__(self, ws_url: str = "ws://localhost:7777/workflows/ws"):
44 self.ws_url = ws_url
45 self.run_id: Optional[str] = None
46 self.last_event_index: Optional[int] = None
47 self.received_events = []
48
49 async def test_workflow_execution_with_reconnection(self) -> None:
50 print("\n" + "=" * 80)
51 print("WebSocket Reconnection Test")
52 print("=" * 80)
53
54 print("\nPhase 1: Starting workflow and receiving initial events...")
55 await self._phase1_start_workflow()
56
57 print("\nSimulating user leaving page for 3 seconds...")
58 await asyncio.sleep(3)
59
60 print("\nPhase 2: Reconnecting to workflow...")
61 await self._phase2_reconnect()
62
63 print("\nTest completed")
64 self._print_summary()
65
66 async def _phase1_start_workflow(self) -> None:
67 try:
68 async with websockets.connect(self.ws_url) as websocket:
69 print(f"[OK] Connected to {self.ws_url}")
70
71 response = await websocket.recv()
72 data = parse_sse_message(response)
73 print(f"[OK] Server: {data.get('message', 'Connected')}")
74
75 print("\nSending: start-workflow action")
76 await websocket.send(
77 json.dumps(
78 {
79 "action": "start-workflow",
80 "workflow_id": "content-creation-workflow",
81 "message": "Research and create content plan for AI agents",
82 "session_id": "test-session-123",
83 }
84 )
85 )
86
87 event_count = 0
88 max_initial_events = 20
89
90 print("\nReceiving initial events:")
91 async for message in websocket:
92 data = parse_sse_message(message)
93 event_type = data.get("event")
94
95 if "run_id" in data and not self.run_id:
96 self.run_id = data["run_id"]
97 if "event_index" in data:
98 self.last_event_index = data["event_index"]
99
100 self.received_events.append(data)
101 event_count += 1
102
103 event_index = data.get("event_index", "N/A")
104 print(
105 f" [{event_count}] event_index={event_index}, event={event_type}"
106 )
107
108 if event_type in ["WorkflowCompleted", "WorkflowError"]:
109 print(
110 f"\nWorkflow finished during initial connection: {event_type}"
111 )
112 break
113
114 if event_count >= max_initial_events:
115 print(
116 f"\nSimulating disconnect after {event_count} events "
117 f"(last_event_index={self.last_event_index})"
118 )
119 break
120
121 except Exception as e:
122 print(f"Error in Phase 1: {e}")
123 raise
124
125 async def _phase2_reconnect(self) -> None:
126 if not self.run_id:
127 print("No run_id found, cannot reconnect")
128 return
129
130 try:
131 async with websockets.connect(self.ws_url) as websocket:
132 print(f"[OK] Reconnected to {self.ws_url}")
133
134 response = await websocket.recv()
135 data = parse_sse_message(response)
136 print(f"[OK] Server: {data.get('message', 'Connected')}")
137
138 print(
139 f"\nSending: reconnect action (run_id={self.run_id}, "
140 f"last_event_index={self.last_event_index})"
141 )
142 await websocket.send(
143 json.dumps(
144 {
145 "action": "reconnect",
146 "run_id": self.run_id,
147 "last_event_index": self.last_event_index,
148 "workflow_id": "content-creation-workflow",
149 "session_id": "test-session-123",
150 }
151 )
152 )
153
154 print("\nReceiving events after reconnection:")
155 event_count = 0
156 missed_events_count = 0
157
158 async for message in websocket:
159 data = parse_sse_message(message)
160 event_type = data.get("event")
161
162 if "event_index" in data:
163 self.last_event_index = data["event_index"]
164
165 self.received_events.append(data)
166 event_count += 1
167
168 if event_type == "catch_up":
169 missed_events_count = data.get("missed_events", 0)
170 print(f"catch_up: {missed_events_count} missed events")
171 print(
172 f"status={data.get('status')}, current_event_count={data.get('current_event_count')}"
173 )
174 continue
175 if event_type == "replay":
176 print(
177 f"replay: status={data.get('status')}, total_events={data.get('total_events')}"
178 )
179 print(f"message={data.get('message')}")
180 continue
181 if event_type == "subscribed":
182 print(f"subscribed: status={data.get('status')}")
183 print(f"current_event_count={data.get('current_event_count')}")
184 print("\nNow listening for NEW events as workflow continues...")
185 continue
186 if event_type == "error":
187 print(f"ERROR: {data.get('error', 'Unknown error')}")
188 print(f"Full data: {data}")
189 continue
190
191 event_index = data.get("event_index", "N/A")
192 is_missed = event_count <= missed_events_count
193 marker = "MISSED" if is_missed else "NEW"
194 print(
195 f" [{event_count}] {marker} event_index={event_index}, event={event_type}"
196 )
197
198 if event_type in ["WorkflowCompleted", "WorkflowError"]:
199 print(f"\nWorkflow finished: {event_type}")
200 break
201
202 print("\nWebSocket connection closed (workflow may have completed)")
203
204 except asyncio.TimeoutError:
205 print("\nTimeout waiting for events (30s). Workflow may still be running.")
206 except Exception as e:
207 print(f"Error in Phase 2: {e}")
208 raise
209
210 def _print_summary(self) -> None:
211 print("\n" + "=" * 80)
212 print("Test Summary")
213 print("=" * 80)
214 print(f"Run ID: {self.run_id}")
215 print(f"Last Event Index: {self.last_event_index}")
216 print(f"Total Events Received: {len(self.received_events)}")
217
218 event_types = {}
219 for event in self.received_events:
220 event_type = event.get("event", "unknown")
221 event_types[event_type] = event_types.get(event_type, 0) + 1
222
223 print("\nEvent Type Breakdown:")
224 for event_type, count in sorted(event_types.items()):
225 print(f" {event_type}: {count}")
226
227 print("\nEvent Index Validation:")
228 event_indices = [
229 e.get("event_index") for e in self.received_events if "event_index" in e
230 ]
231 if event_indices:
232 print(f" First event_index: {min(event_indices)}")
233 print(f" Last event_index: {max(event_indices)}")
234 print(f" Total with event_index: {len(event_indices)}")
235
236 expected = set(range(min(event_indices), max(event_indices) + 1))
237 actual = set(event_indices)
238 gaps = expected - actual
239 if gaps:
240 print(f"Gaps in event_index: {sorted(gaps)}")
241 else:
242 print("No gaps in event_index (all events received)")
243 else:
244 print("No events with event_index found")
245
246 print("=" * 80)
247
248
249# ---------------------------------------------------------------------------
250# Run Workflow
251# ---------------------------------------------------------------------------
252async def main() -> None:
253 print("\nStarting WebSocket Reconnection Test")
254 print("Prerequisites:")
255 print(" 1. AgentOS server should be running at http://localhost:7777")
256 print(" 2. Run: python cookbook/agent_os/workflow/basic_workflow.py")
257 print("\nStarting test in 2 seconds...")
258 await asyncio.sleep(2)
259
260 tester = WorkflowWebSocketTester()
261 try:
262 await tester.test_workflow_execution_with_reconnection()
263 except ConnectionRefusedError:
264 print("\nConnection refused. Is the AgentOS server running?")
265 print(" Start it with: python cookbook/agent_os/workflow/basic_workflow.py")
266 except Exception as e:
267 print(f"\nTest failed: {e}")
268 import traceback
269
270 traceback.print_exc()
271
272
273if __name__ == "__main__":
274 asyncio.run(main())

Run the Example

1# Clone and setup repo
2git clone https://github.com/kern-ai/kern.git
3cd kern/cookbook/04_workflows/06_advanced_concepts/long_running
4
5# Create and activate virtual environment
6./scripts/demo_setup.sh
7source .venvs/demo/bin/activate
8
9python websocket_reconnect.py