Background Execution
Run agents in the background. Reconnect to in-progress streams via SSE.
Run agents, teams, and workflows in the background by passing background=True to .arun(). Execution continues even if the client disconnects. The behavior depends on whether you also set stream=True.
Execution Modes
background | stream | Behavior |
|---|---|---|
False | True | Default streaming. Runs inline. Client disconnect cancels the run. |
False | False | Non-streaming. Returns full response. |
True | False | Fire-and-forget. Returns PENDING immediately. Poll for results. |
True | True | Resumable streaming. Runs in a detached task. Events are buffered. Reconnect via /resume. |
Background execution requires a database (db) on the agent, team, or workflow for persisting run state.
Fire-and-Forget
Start a background run and poll for the result. Works identically for agents, teams, and workflows.
1import asyncio23from kern.agent import Agent4from kern.db.postgres import PostgresDb5from kern.models.openai import OpenAIResponses6from kern.run.base import RunStatus78db = PostgresDb(9 db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",10 session_table="background_exec_sessions",11)1213agent = Agent(14 name="BackgroundAgent",15 model=OpenAIResponses(id="gpt-5-mini"),16 db=db,17)1819async def main():20 # Returns immediately with PENDING status21 run_output = await agent.arun(22 "Write a short analysis of quantum computing trends.",23 background=True,24 )25 print(f"Run ID: {run_output.run_id}, Status: {run_output.status}")2627 # Poll until complete28 for _ in range(60):29 await asyncio.sleep(1)30 result = await agent.aget_run_output(31 run_id=run_output.run_id,32 session_id=run_output.session_id,33 )34 if result and result.status == RunStatus.completed:35 print(f"Done: {result.content}")36 break3738asyncio.run(main())1import asyncio23from kern.agent import Agent4from kern.db.postgres import PostgresDb5from kern.models.openai import OpenAIResponses6from kern.run.base import RunStatus7from kern.team import Team8from kern.tools.hackernews import HackerNewsTools9from kern.tools.yfinance import YFinanceTools1011db = PostgresDb(12 db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",13 session_table="background_team_sessions",14)1516research_agent = Agent(17 name="Researcher",18 model=OpenAIResponses(id="gpt-5-mini"),19 tools=[HackerNewsTools()],20 role="Research tech trends",21)22finance_agent = Agent(23 name="Finance",24 model=OpenAIResponses(id="gpt-5-mini"),25 tools=[YFinanceTools()],26 role="Get stock data",27)2829team = Team(30 name="Research Team",31 members=[research_agent, finance_agent],32 db=db,33)3435async def main():36 run_output = await team.arun(37 "Short research on AI trends and related stocks.",38 background=True,39 )40 print(f"Run ID: {run_output.run_id}, Status: {run_output.status}")4142 for _ in range(60):43 await asyncio.sleep(2)44 result = await team.aget_run_output(45 run_id=run_output.run_id,46 session_id=run_output.session_id,47 )48 if result and result.status == RunStatus.completed:49 print(f"Done: {result.content}")50 break5152asyncio.run(main())1import asyncio23from kern.agent import Agent4from kern.db.sqlite import SqliteDb5from kern.models.openai import OpenAIResponses6from kern.tools.hackernews import HackerNewsTools7from kern.workflow.step import Step8from kern.workflow.workflow import Workflow910hackernews_agent = Agent(11 name="HackerNews Agent",12 model=OpenAIResponses(id="gpt-5.2"),13 tools=[HackerNewsTools()],14 role="Extract key insights from HackerNews posts",15)1617content_planner = Agent(18 name="Content Planner",19 model=OpenAIResponses(id="gpt-5.2"),20 instructions=["Plan a content schedule for the provided topic"],21)2223workflow = Workflow(24 name="Content Creation Workflow",25 db=SqliteDb(session_table="workflow_session", db_file="tmp/workflow.db"),26 steps=[27 Step(name="Research Step", agent=hackernews_agent),28 Step(name="Planning Step", agent=content_planner),29 ],30)3132async def main():33 bg_response = await workflow.arun(input="AI trends", background=True)34 print(f"Run ID: {bg_response.run_id}, Status: {bg_response.status}")3536 while True:37 result = workflow.get_run(bg_response.run_id)38 if result and result.has_completed():39 print(f"Done: {result.content}")40 break41 await asyncio.sleep(5)4243asyncio.run(main())Resumable Streaming (SSE)
Combine background=True with stream=True for resumable SSE streaming. The run executes in a detached asyncio.Task that survives client disconnects. Events are buffered with sequential event_index values so clients can reconnect without losing events.
How It Works
1Client connects → StreamingResponse reads from queue ← Background task runs2Client disconnects → StreamingResponse cancelled ← Background task keeps running3Client reconnects → /resume reads from subscriber queue ← Background task still publishing- The run persists
RUNNINGstatus in the database - A detached
asyncio.Taskexecutes and publishes events to an in-memory buffer - The client receives SSE events, each containing an
event_indexandrun_id - On disconnect, the client records
last_event_index - On reconnect, the client calls
/resumewithlast_event_indexto catch up on missed events
Starting a Resumable Stream
Resumable streaming requires a running AgentOS server. Pass background=true and stream=true in the request. The pattern is the same for agents, teams, and workflows. Only the URL path differs.
Workflows also support WebSocket-based reconnection. See the WebSocket reconnect example.
1import asyncio2import json3import httpx45BASE_URL = "http://localhost:7777"67async def start_resumable_stream():8 async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:9 # Use /agents, /teams, or /workflows10 agents = (await client.get("/agents")).json()11 agent_id = agents[0]["id"]1213 form_data = {14 "message": "Write a detailed story about a brave knight.",15 "stream": "true",16 "background": "true",17 }1819 run_id = None20 session_id = None21 last_event_index = None2223 async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:24 buffer = ""25 async for chunk in response.aiter_text():26 buffer += chunk27 while "\n\n" in buffer:28 event_str, buffer = buffer.split("\n\n", 1)29 for line in event_str.strip().split("\n"):30 if not line.startswith("data: "):31 continue32 data = json.loads(line[6:])3334 # Track identifiers for reconnection35 if data.get("run_id") and not run_id:36 run_id = data["run_id"]37 if data.get("session_id") and not session_id:38 session_id = data["session_id"]39 if data.get("event_index") is not None:40 last_event_index = data["event_index"]4142 print(f"[{data.get('event_index')}] {data.get('event')}: {str(data.get('content', ''))[:60]}")4344 return run_id, session_id, last_event_index4546asyncio.run(start_resumable_stream())Each SSE event includes:
event_index: Sequential integer for ordering and resumptionrun_id: The run identifier for reconnectionsession_id: The session identifier
Reconnecting via /resume
On disconnect (page refresh, network loss), reconnect to /resume with the last event_index:
1async def resume_stream(agent_id: str, run_id: str, session_id: str, last_event_index: int):2 form_data = {"last_event_index": str(last_event_index)}3 if session_id:4 form_data["session_id"] = session_id56 async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:7 async with client.stream(8 "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data9 ) as response:10 buffer = ""11 async for chunk in response.aiter_text():12 buffer += chunk13 while "\n\n" in buffer:14 event_str, buffer = buffer.split("\n\n", 1)15 for line in event_str.strip().split("\n"):16 if not line.startswith("data: "):17 continue18 data = json.loads(line[6:])19 event_type = data.get("event")2021 if event_type in ("catch_up", "replay", "subscribed"):22 print(f"[META] {event_type}: {data}")23 else:24 print(f"[{data.get('event_index')}] {event_type}: {str(data.get('content', ''))[:60]}")Resume Endpoints
The resume endpoint follows the same pattern for agents, teams, and workflows:
1POST /agents/{agent_id}/runs/{run_id}/resume2POST /teams/{team_id}/runs/{run_id}/resume3POST /workflows/{workflow_id}/runs/{run_id}/resume45Content-Type: multipart/form-data6last_event_index=N&session_id=SResume behavior depends on run state:
| Scenario | Condition | Behavior |
|---|---|---|
| Catch up + live | Run still active in buffer | Replays missed events, then streams live events |
| Replay | Run completed, still in buffer | Replays all missed events |
| DB fallback | Buffer expired (30 min) | Falls back to database |
Meta Events
The /resume stream includes meta events before data events:
| Event | Meaning |
|---|---|
catch_up | Run still active. Missed events follow, then live events. |
replay | Run already completed. All missed events follow. |
subscribed | Catch-up complete. Now receiving live events. |
error | Run not found or other issue. |
Multi-Container Deployments
The detached task and event buffer live in-process on the instance that started the run. In a multi-replica setup, a /resume request that lands on a different instance misses the buffer and falls back to the database (no live tail until the run completes).
Route /resume requests by run_id from the URL path (sticky session / consistent hashing at the load balancer) so they reach the originating instance. Only /resume needs affinity. The initial run-start request can hit any instance.