Background Execution

Run agents in the background. Reconnect to in-progress streams via SSE.

Run agents, teams, and workflows in the background by passing background=True to .arun(). Execution continues even if the client disconnects. The behavior depends on whether you also set stream=True.

Execution Modes

backgroundstreamBehavior
FalseTrueDefault streaming. Runs inline. Client disconnect cancels the run.
FalseFalseNon-streaming. Returns full response.
TrueFalseFire-and-forget. Returns PENDING immediately. Poll for results.
TrueTrueResumable streaming. Runs in a detached task. Events are buffered. Reconnect via /resume.
Note

Background execution requires a database (db) on the agent, team, or workflow for persisting run state.

Fire-and-Forget

Start a background run and poll for the result. Works identically for agents, teams, and workflows.

1import asyncio
2
3from kern.agent import Agent
4from kern.db.postgres import PostgresDb
5from kern.models.openai import OpenAIResponses
6from kern.run.base import RunStatus
7
8db = PostgresDb(
9 db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
10 session_table="background_exec_sessions",
11)
12
13agent = Agent(
14 name="BackgroundAgent",
15 model=OpenAIResponses(id="gpt-5-mini"),
16 db=db,
17)
18
19async def main():
20 # Returns immediately with PENDING status
21 run_output = await agent.arun(
22 "Write a short analysis of quantum computing trends.",
23 background=True,
24 )
25 print(f"Run ID: {run_output.run_id}, Status: {run_output.status}")
26
27 # Poll until complete
28 for _ in range(60):
29 await asyncio.sleep(1)
30 result = await agent.aget_run_output(
31 run_id=run_output.run_id,
32 session_id=run_output.session_id,
33 )
34 if result and result.status == RunStatus.completed:
35 print(f"Done: {result.content}")
36 break
37
38asyncio.run(main())
1import asyncio
2
3from kern.agent import Agent
4from kern.db.postgres import PostgresDb
5from kern.models.openai import OpenAIResponses
6from kern.run.base import RunStatus
7from kern.team import Team
8from kern.tools.hackernews import HackerNewsTools
9from kern.tools.yfinance import YFinanceTools
10
11db = PostgresDb(
12 db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
13 session_table="background_team_sessions",
14)
15
16research_agent = Agent(
17 name="Researcher",
18 model=OpenAIResponses(id="gpt-5-mini"),
19 tools=[HackerNewsTools()],
20 role="Research tech trends",
21)
22finance_agent = Agent(
23 name="Finance",
24 model=OpenAIResponses(id="gpt-5-mini"),
25 tools=[YFinanceTools()],
26 role="Get stock data",
27)
28
29team = Team(
30 name="Research Team",
31 members=[research_agent, finance_agent],
32 db=db,
33)
34
35async def main():
36 run_output = await team.arun(
37 "Short research on AI trends and related stocks.",
38 background=True,
39 )
40 print(f"Run ID: {run_output.run_id}, Status: {run_output.status}")
41
42 for _ in range(60):
43 await asyncio.sleep(2)
44 result = await team.aget_run_output(
45 run_id=run_output.run_id,
46 session_id=run_output.session_id,
47 )
48 if result and result.status == RunStatus.completed:
49 print(f"Done: {result.content}")
50 break
51
52asyncio.run(main())
1import asyncio
2
3from kern.agent import Agent
4from kern.db.sqlite import SqliteDb
5from kern.models.openai import OpenAIResponses
6from kern.tools.hackernews import HackerNewsTools
7from kern.workflow.step import Step
8from kern.workflow.workflow import Workflow
9
10hackernews_agent = Agent(
11 name="HackerNews Agent",
12 model=OpenAIResponses(id="gpt-5.2"),
13 tools=[HackerNewsTools()],
14 role="Extract key insights from HackerNews posts",
15)
16
17content_planner = Agent(
18 name="Content Planner",
19 model=OpenAIResponses(id="gpt-5.2"),
20 instructions=["Plan a content schedule for the provided topic"],
21)
22
23workflow = Workflow(
24 name="Content Creation Workflow",
25 db=SqliteDb(session_table="workflow_session", db_file="tmp/workflow.db"),
26 steps=[
27 Step(name="Research Step", agent=hackernews_agent),
28 Step(name="Planning Step", agent=content_planner),
29 ],
30)
31
32async def main():
33 bg_response = await workflow.arun(input="AI trends", background=True)
34 print(f"Run ID: {bg_response.run_id}, Status: {bg_response.status}")
35
36 while True:
37 result = workflow.get_run(bg_response.run_id)
38 if result and result.has_completed():
39 print(f"Done: {result.content}")
40 break
41 await asyncio.sleep(5)
42
43asyncio.run(main())

Resumable Streaming (SSE)

Combine background=True with stream=True for resumable SSE streaming. The run executes in a detached asyncio.Task that survives client disconnects. Events are buffered with sequential event_index values so clients can reconnect without losing events.

How It Works

1Client connects StreamingResponse reads from queue Background task runs
2Client disconnects StreamingResponse cancelled Background task keeps running
3Client reconnects /resume reads from subscriber queue Background task still publishing
  1. The run persists RUNNING status in the database
  2. A detached asyncio.Task executes and publishes events to an in-memory buffer
  3. The client receives SSE events, each containing an event_index and run_id
  4. On disconnect, the client records last_event_index
  5. On reconnect, the client calls /resume with last_event_index to catch up on missed events

Starting a Resumable Stream

Resumable streaming requires a running AgentOS server. Pass background=true and stream=true in the request. The pattern is the same for agents, teams, and workflows. Only the URL path differs.

Note

Workflows also support WebSocket-based reconnection. See the WebSocket reconnect example.

1import asyncio
2import json
3import httpx
4
5BASE_URL = "http://localhost:7777"
6
7async def start_resumable_stream():
8 async with httpx.AsyncClient(base_url=BASE_URL, timeout=60) as client:
9 # Use /agents, /teams, or /workflows
10 agents = (await client.get("/agents")).json()
11 agent_id = agents[0]["id"]
12
13 form_data = {
14 "message": "Write a detailed story about a brave knight.",
15 "stream": "true",
16 "background": "true",
17 }
18
19 run_id = None
20 session_id = None
21 last_event_index = None
22
23 async with client.stream("POST", f"/agents/{agent_id}/runs", data=form_data) as response:
24 buffer = ""
25 async for chunk in response.aiter_text():
26 buffer += chunk
27 while "\n\n" in buffer:
28 event_str, buffer = buffer.split("\n\n", 1)
29 for line in event_str.strip().split("\n"):
30 if not line.startswith("data: "):
31 continue
32 data = json.loads(line[6:])
33
34 # Track identifiers for reconnection
35 if data.get("run_id") and not run_id:
36 run_id = data["run_id"]
37 if data.get("session_id") and not session_id:
38 session_id = data["session_id"]
39 if data.get("event_index") is not None:
40 last_event_index = data["event_index"]
41
42 print(f"[{data.get('event_index')}] {data.get('event')}: {str(data.get('content', ''))[:60]}")
43
44 return run_id, session_id, last_event_index
45
46asyncio.run(start_resumable_stream())

Each SSE event includes:

  • event_index: Sequential integer for ordering and resumption
  • run_id: The run identifier for reconnection
  • session_id: The session identifier

Reconnecting via /resume

On disconnect (page refresh, network loss), reconnect to /resume with the last event_index:

1async def resume_stream(agent_id: str, run_id: str, session_id: str, last_event_index: int):
2 form_data = {"last_event_index": str(last_event_index)}
3 if session_id:
4 form_data["session_id"] = session_id
5
6 async with httpx.AsyncClient(base_url=BASE_URL, timeout=120) as client:
7 async with client.stream(
8 "POST", f"/agents/{agent_id}/runs/{run_id}/resume", data=form_data
9 ) as response:
10 buffer = ""
11 async for chunk in response.aiter_text():
12 buffer += chunk
13 while "\n\n" in buffer:
14 event_str, buffer = buffer.split("\n\n", 1)
15 for line in event_str.strip().split("\n"):
16 if not line.startswith("data: "):
17 continue
18 data = json.loads(line[6:])
19 event_type = data.get("event")
20
21 if event_type in ("catch_up", "replay", "subscribed"):
22 print(f"[META] {event_type}: {data}")
23 else:
24 print(f"[{data.get('event_index')}] {event_type}: {str(data.get('content', ''))[:60]}")

Resume Endpoints

The resume endpoint follows the same pattern for agents, teams, and workflows:

1POST /agents/{agent_id}/runs/{run_id}/resume
2POST /teams/{team_id}/runs/{run_id}/resume
3POST /workflows/{workflow_id}/runs/{run_id}/resume
4
5Content-Type: multipart/form-data
6last_event_index=N&session_id=S

Resume behavior depends on run state:

ScenarioConditionBehavior
Catch up + liveRun still active in bufferReplays missed events, then streams live events
ReplayRun completed, still in bufferReplays all missed events
DB fallbackBuffer expired (30 min)Falls back to database

Meta Events

The /resume stream includes meta events before data events:

EventMeaning
catch_upRun still active. Missed events follow, then live events.
replayRun already completed. All missed events follow.
subscribedCatch-up complete. Now receiving live events.
errorRun not found or other issue.

Multi-Container Deployments

The detached task and event buffer live in-process on the instance that started the run. In a multi-replica setup, a /resume request that lands on a different instance misses the buffer and falls back to the database (no live tail until the run completes).

Route /resume requests by run_id from the URL path (sticky session / consistent hashing at the load balancer) so they reach the originating instance. Only /resume needs affinity. The initial run-start request can hit any instance.

Developer Resources