SSE Reconnection

Reconnect to a background team stream after disconnection using the /resume endpoint.

Test SSE stream reconnection for team runs using background=True, stream=True. The team runs in a detached task that survives client disconnections. Events are buffered so the client can reconnect via /resume and catch up on missed events.

Prerequisites

  1. An AgentOS server running on http://localhost:7777 with at least one team registered (e.g., python cookbook/05_agent_os/basic.py).
  2. The team must have a db configured. Background runs require persistent storage.
  3. Run the script.
1import asyncio
2import json
3from typing import Optional
4
5import httpx
6
7BASE_URL = "http://localhost:7777"
8EVENTS_BEFORE_DISCONNECT = 6
9DISCONNECT_DURATION = 3
10
11
12def parse_sse_line(line: str) -> Optional[dict]:
13 if line.startswith("data: "):
14 try:
15 return json.loads(line[6:])
16 except json.JSONDecodeError:
17 return None
18 return None
19
20
21async def test_team_sse_reconnection():
22 print("=" * 70)
23 print("Team SSE Reconnection Test")
24 print("=" * 70)
25
26 # Discover a team
27 async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:
28 resp = await client.get("/teams")
29 resp.raise_for_status()
30 teams = resp.json()
31 if not teams:
32 print("No teams available")
33 return
34 team_id = teams[0]["id"]
35 print(f"Using team: {team_id}")
36
37 # Phase 1: Start streaming, disconnect after a few events
38 run_id: Optional[str] = None
39 session_id: Optional[str] = None
40 last_event_index: Optional[int] = None
41 events_phase1: list[dict] = []
42
43 print(f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events...")
44
45 async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
46 form_data = {
47 "message": "Tell me a detailed story about a brave knight. Make it at least 5 paragraphs.",
48 "stream": "true",
49 "background": "true",
50 }
51 async with client.stream("POST", f"/teams/{team_id}/runs", data=form_data) as response:
52 event_count = 0
53 buffer = ""
54 async for chunk in response.aiter_text():
55 buffer += chunk
56 while "\n\n" in buffer:
57 event_str, buffer = buffer.split("\n\n", 1)
58 for line in event_str.strip().split("\n"):
59 data = parse_sse_line(line)
60 if data is None:
61 continue
62 if data.get("run_id") and not run_id:
63 run_id = data["run_id"]
64 if data.get("session_id") and not session_id:
65 session_id = data["session_id"]
66 if data.get("event_index") is not None:
67 last_event_index = data["event_index"]
68
69 events_phase1.append(data)
70 event_count += 1
71 print(f" [{event_count}] event={data.get('event')} index={data.get('event_index')}")
72
73 if event_count >= EVENTS_BEFORE_DISCONNECT:
74 break
75 if event_count >= EVENTS_BEFORE_DISCONNECT:
76 break
77 if event_count >= EVENTS_BEFORE_DISCONNECT:
78 break
79
80 print(f"\n[DISCONNECT] run_id={run_id}, last_event_index={last_event_index}")
81
82 if not run_id:
83 print("Could not determine run_id")
84 return
85
86 print(f"\nSimulating disconnect for {DISCONNECT_DURATION}s...")
87 await asyncio.sleep(DISCONNECT_DURATION)
88
89 # Phase 2: Resume via /resume endpoint
90 print("\nPhase 2: Reconnecting via /resume...")
91 events_phase2: list[dict] = []
92
93 form_data: dict = {}
94 if last_event_index is not None:
95 form_data["last_event_index"] = str(last_event_index)
96 if session_id:
97 form_data["session_id"] = session_id
98
99 async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
100 async with client.stream(
101 "POST", f"/teams/{team_id}/runs/{run_id}/resume", data=form_data
102 ) as response:
103 buffer = ""
104 async for chunk in response.aiter_text():
105 buffer += chunk
106 while "\n\n" in buffer:
107 event_str, buffer = buffer.split("\n\n", 1)
108 for line in event_str.strip().split("\n"):
109 data = parse_sse_line(line)
110 if data is None:
111 continue
112 events_phase2.append(data)
113 event_type = data.get("event")
114
115 if event_type in ("catch_up", "replay", "subscribed"):
116 print(f" [META] {event_type}")
117 else:
118 print(f" [RESUME] event={event_type} index={data.get('event_index')}")
119
120 data_events = [e for e in events_phase2 if e.get("event") not in ("catch_up", "replay", "subscribed", "error")]
121 print(f"\nPhase 1: {len(events_phase1)} events | Phase 2: {len(data_events)} data events")
122 print(f"Total: {len(events_phase1) + len(data_events)} events (no events lost)")
123
124
125if __name__ == "__main__":
126 asyncio.run(test_team_sse_reconnection())

Run the Example

1git clone https://github.com/kern-ai/kern.git
2cd kern
3
4uv pip install -U kern-ai httpx
5
6python cookbook/05_agent_os/client/11_team_sse_reconnect.py