SSE Reconnection

Reconnect to a background agent stream after disconnection using the /resume endpoint.

Test SSE stream reconnection for agent runs using background=True, stream=True. The agent runs in a detached task that survives client disconnections. Events are buffered so the client can reconnect via /resume and catch up on missed events.

Prerequisites

  1. An AgentOS server running on http://localhost:7777 with at least one agent registered (e.g., python cookbook/05_agent_os/basic.py).
  2. The agent must have a db configured. Background runs require persistent storage.
  3. Run the script.
1import asyncio
2import json
3from typing import Optional
4
5import httpx
6
7BASE_URL = "http://localhost:7777"
8EVENTS_BEFORE_DISCONNECT = 6
9DISCONNECT_DURATION = 3
10
11
12def parse_sse_line(line: str) -> Optional[dict]:
13 if line.startswith("data: "):
14 try:
15 return json.loads(line[6:])
16 except json.JSONDecodeError:
17 return None
18 return None
19
20
21async def test_sse_reconnection():
22 print("=" * 70)
23 print("Agent SSE Reconnection Test")
24 print("=" * 70)
25
26 # Discover an agent
27 async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
28 resp = await client.get("/agents")
29 resp.raise_for_status()
30 agents = resp.json()
31 if not agents:
32 print("No agents available")
33 return
34 agent_id = agents[0]["id"]
35 print(f"Using agent: {agent_id}")
36
37 # Phase 1: Start streaming, disconnect after a few events
38 run_id: Optional[str] = None
39 session_id: Optional[str] = None
40 last_event_index: Optional[int] = None
41 events_phase1: list[dict] = []
42
43 print(f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events...")
44
45 async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
46 form_data = {
47 "message": "Tell me a detailed story about a brave knight. Make it at least 5 paragraphs.",
48 "stream": "true",
49 "background": "true",
50 }
51 async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:
52 event_count = 0
53 buffer = ""
54 async for chunk in response.aiter_text():
55 buffer += chunk
56 while "\n\n" in buffer:
57 event_str, buffer = buffer.split("\n\n", 1)
58 for line in event_str.strip().split("\n"):
59 data = parse_sse_line(line)
60 if data is None:
61 continue
62 if data.get("run_id") and not run_id:
63 run_id = data["run_id"]
64 if data.get("session_id") and not session_id:
65 session_id = data["session_id"]
66 if data.get("event_index") is not None:
67 last_event_index = data["event_index"]
68
69 events_phase1.append(data)
70 event_count += 1
71 print(f" [{event_count}] event={data.get('event')} index={data.get('event_index')}")
72
73 if event_count >= EVENTS_BEFORE_DISCONNECT:
74 break
75 if event_count >= EVENTS_BEFORE_DISCONNECT:
76 break
77 if event_count >= EVENTS_BEFORE_DISCONNECT:
78 break
79
80 print(f"\n[DISCONNECT] run_id={run_id}, last_event_index={last_event_index}")
81
82 if not run_id:
83 print("Could not determine run_id")
84 return
85
86 # Simulate being away
87 print(f"\nSimulating disconnect for {DISCONNECT_DURATION}s...")
88 await asyncio.sleep(DISCONNECT_DURATION)
89
90 # Phase 2: Resume via /resume endpoint
91 print("\nPhase 2: Reconnecting via /resume...")
92 events_phase2: list[dict] = []
93
94 form_data: dict = {}
95 if last_event_index is not None:
96 form_data["last_event_index"] = str(last_event_index)
97 if session_id:
98 form_data["session_id"] = session_id
99
100 async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
101 async with client.stream(
102 "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data
103 ) as response:
104 buffer = ""
105 async for chunk in response.aiter_text():
106 buffer += chunk
107 while "\n\n" in buffer:
108 event_str, buffer = buffer.split("\n\n", 1)
109 for line in event_str.strip().split("\n"):
110 data = parse_sse_line(line)
111 if data is None:
112 continue
113 events_phase2.append(data)
114 event_type = data.get("event")
115
116 if event_type in ("catch_up", "replay", "subscribed"):
117 print(f" [META] {event_type}")
118 else:
119 print(f" [RESUME] event={event_type} index={data.get('event_index')}")
120
121 # Summary
122 data_events = [e for e in events_phase2 if e.get("event") not in ("catch_up", "replay", "subscribed", "error")]
123 print(f"\nPhase 1: {len(events_phase1)} events | Phase 2: {len(data_events)} data events")
124 print(f"Total: {len(events_phase1) + len(data_events)} events (no events lost)")
125
126
127if __name__ == "__main__":
128 asyncio.run(test_sse_reconnection())

Run the Example

1git clone https://github.com/kern-ai/kern.git
2cd kern
3
4uv pip install -U kern-ai httpx
5
6python cookbook/05_agent_os/client/10_sse_reconnect.py