Background Workflow Execution

How to execute workflows as non-blocking background tasks

Execute workflows as non-blocking background tasks by passing background=True to Workflow.arun(). The workflow continues running even if the client disconnects. Combine with stream=True for resumable SSE streaming with automatic event buffering and reconnection.

See Background Execution for the full guide covering polling, resumable streaming, and the /resume endpoint.

Note

Background execution requires async workflows using .arun(). Poll for results using workflow.get_run(run_id) and check completion status with .has_completed().

Example

1import asyncio
2
3from kern.agent import Agent
4from kern.db.sqlite import SqliteDb
5from kern.models.openai import OpenAIResponses
6from kern.team import Team
7from kern.tools.hackernews import HackerNewsTools
8from kern.tools.yfinance import YFinanceTools
9from kern.utils.pprint import pprint_run_response
10from kern.workflow.step import Step
11from kern.workflow.workflow import Workflow
12
13# Define agents
14hackernews_agent = Agent(
15 name="Hackernews Agent",
16 model=OpenAIResponses(id="gpt-5.2"),
17 tools=[HackerNewsTools()],
18 role="Extract key insights and content from Hackernews posts",
19)
20finance_agent = Agent(
21 name="Finance Agent",
22 model=OpenAIResponses(id="gpt-5.2"),
23 tools=[YFinanceTools()],
24 role="Get stock prices and financial data",
25)
26
27# Define research team for complex analysis
28research_team = Team(
29 name="Research Team",
30 members=[hackernews_agent, finance_agent],
31 instructions="Research tech topics and related stocks",
32)
33
34content_planner = Agent(
35 name="Content Planner",
36 model=OpenAIResponses(id="gpt-5.2"),
37 instructions=[
38 "Plan a content schedule over 4 weeks for the provided topic and research content",
39 "Ensure that I have posts for 3 posts per week",
40 ],
41)
42
43# Define steps
44research_step = Step(
45 name="Research Step",
46 team=research_team,
47)
48
49content_planning_step = Step(
50 name="Content Planning Step",
51 agent=content_planner,
52)
53
54content_creation_workflow = Workflow(
55 name="Content Creation Workflow",
56 description="Automated content creation from blog posts to social media",
57 db=SqliteDb(
58 session_table="workflow_session",
59 db_file="tmp/workflow.db",
60 ),
61 steps=[research_step, content_planning_step],
62)
63
64
65async def main():
66 print("Starting Async Background Workflow Test")
67
68 # Start background execution (async)
69 bg_response = await content_creation_workflow.arun(
70 input="AI trends in 2024", background=True
71 )
72 print(f"Initial Response: {bg_response.status} - {bg_response.content}")
73 print(f"Run ID: {bg_response.run_id}")
74
75 # Poll every 5 seconds until completion
76 poll_count = 0
77
78 while True:
79 poll_count += 1
80 print(f"\nPoll #{poll_count} (every 5s)")
81
82 result = content_creation_workflow.get_run(bg_response.run_id)
83
84 if result is None:
85 print("Workflow not found yet, still waiting...")
86 if poll_count > 50:
87 print(f"Timeout after {poll_count} attempts")
88 break
89 await asyncio.sleep(5)
90 continue
91
92 if result.has_completed():
93 break
94
95 if poll_count > 200:
96 print(f"Timeout after {poll_count} attempts")
97 break
98
99 await asyncio.sleep(5)
100
101 final_result = content_creation_workflow.get_run(bg_response.run_id)
102
103 print("\nFinal Result:")
104 print("=" * 50)
105 pprint_run_response(final_result, markdown=True)
106
107
108if __name__ == "__main__":
109 asyncio.run(main())
Note

You can also use WebSocket for background workflows. See the Workflow Websocket example.

Developer Resources