SSE Reconnection
Reconnect to a background team stream after disconnection using the /resume endpoint.
Test SSE stream reconnection for team runs using background=True, stream=True. The team runs in a detached task that survives client disconnections. Events are buffered so the client can reconnect via /resume and catch up on missed events.
Prerequisites
- An AgentOS server running on
http://localhost:7777with at least one team registered (e.g.,python cookbook/05_agent_os/basic.py). - The team must have a
dbconfigured. Background runs require persistent storage. - Run the script.
1import asyncio2import json3from typing import Optional45import httpx67BASE_URL = "http://localhost:7777"8EVENTS_BEFORE_DISCONNECT = 69DISCONNECT_DURATION = 3101112def parse_sse_line(line: str) -> Optional[dict]:13 if line.startswith("data: "):14 try:15 return json.loads(line[6:])16 except json.JSONDecodeError:17 return None18 return None192021async def test_team_sse_reconnection():22 print("=" * 70)23 print("Team SSE Reconnection Test")24 print("=" * 70)2526 # Discover a team27 async with httpx.AsyncClient(base_url=BASE_URL, timeout=30) as client:28 resp = await client.get("/teams")29 resp.raise_for_status()30 teams = resp.json()31 if not teams:32 print("No teams available")33 return34 team_id = teams[0]["id"]35 print(f"Using team: {team_id}")3637 # Phase 1: Start streaming, disconnect after a few events38 run_id: Optional[str] = None39 session_id: Optional[str] = None40 last_event_index: Optional[int] = None41 events_phase1: list[dict] = []4243 print(f"\nPhase 1: Starting SSE stream, will disconnect after {EVENTS_BEFORE_DISCONNECT} events...")4445 async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:46 form_data = {47 "message": "Tell me a detailed story about a brave knight. Make it at least 5 paragraphs.",48 "stream": "true",49 "background": "true",50 }51 async with client.stream("POST", f"/teams/{team_id}/runs", data=form_data) as response:52 event_count = 053 buffer = ""54 async for chunk in response.aiter_text():55 buffer += chunk56 while "\n\n" in buffer:57 event_str, buffer = buffer.split("\n\n", 1)58 for line in event_str.strip().split("\n"):59 data = parse_sse_line(line)60 if data is None:61 continue62 if data.get("run_id") and not run_id:63 run_id = data["run_id"]64 if data.get("session_id") and not session_id:65 session_id = data["session_id"]66 if data.get("event_index") is not None:67 last_event_index = data["event_index"]6869 events_phase1.append(data)70 event_count += 171 print(f" [{event_count}] event={data.get('event')} index={data.get('event_index')}")7273 if event_count >= EVENTS_BEFORE_DISCONNECT:74 break75 if event_count >= EVENTS_BEFORE_DISCONNECT:76 break77 if event_count >= EVENTS_BEFORE_DISCONNECT:78 break7980 print(f"\n[DISCONNECT] run_id={run_id}, last_event_index={last_event_index}")8182 if not run_id:83 print("Could not determine run_id")84 return8586 print(f"\nSimulating disconnect for {DISCONNECT_DURATION}s...")87 await asyncio.sleep(DISCONNECT_DURATION)8889 # Phase 2: Resume via /resume endpoint90 print("\nPhase 2: Reconnecting via /resume...")91 events_phase2: list[dict] = []9293 form_data: dict = {}94 if last_event_index is not None:95 form_data["last_event_index"] = str(last_event_index)96 if session_id:97 form_data["session_id"] = session_id9899 async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:100 async with client.stream(101 "POST", f"/teams/{team_id}/runs/{run_id}/resume", data=form_data102 ) as response:103 buffer = ""104 async for chunk in response.aiter_text():105 buffer += chunk106 while "\n\n" in buffer:107 event_str, buffer = buffer.split("\n\n", 1)108 for line in event_str.strip().split("\n"):109 data = parse_sse_line(line)110 if data is None:111 continue112 events_phase2.append(data)113 event_type = data.get("event")114115 if event_type in ("catch_up", "replay", "subscribed"):116 print(f" [META] {event_type}")117 else:118 print(f" [RESUME] event={event_type} index={data.get('event_index')}")119120 data_events = [e for e in events_phase2 if e.get("event") not in ("catch_up", "replay", "subscribed", "error")]121 print(f"\nPhase 1: {len(events_phase1)} events | Phase 2: {len(data_events)} data events")122 print(f"Total: {len(events_phase1) + len(data_events)} events (no events lost)")123124125if __name__ == "__main__":126 asyncio.run(test_team_sse_reconnection())Run the Example
1git clone https://github.com/kern-ai/kern.git2cd kern34uv pip install -U kern-ai httpx56python cookbook/05_agent_os/client/11_team_sse_reconnect.py