Example demonstrating background execution with structured output.

Combines background execution (non-blocking, async) with Pydantic output_schema so the completed run returns typed, structured data.

1"""
2Example demonstrating background execution with structured output.
3
4Combines background execution (non-blocking, async) with Pydantic output_schema
5so the completed run returns typed, structured data.
6
7Requirements:
8- PostgreSQL running (./cookbook/scripts/run_pgvector.sh)
9- OPENAI_API_KEY set
10
11Usage:
12 .venvs/demo/bin/python cookbook/02_agents/other/background_execution_structured.py
13"""
14
15import asyncio
16from typing import List
17
18from kern.agent import Agent
19from kern.db.postgres import PostgresDb
20from kern.models.openai import OpenAIResponses
21from kern.run.base import RunStatus
22from pydantic import BaseModel, Field
23
24# ---------------------------------------------------------------------------
25# Output Schema
26# ---------------------------------------------------------------------------
27
28
29class CityFact(BaseModel):
30 city: str = Field(..., description="Name of the city")
31 country: str = Field(..., description="Country the city is in")
32 population: str = Field(..., description="Approximate population")
33 fun_fact: str = Field(..., description="An interesting fact about the city")
34
35
36class CityFactsResponse(BaseModel):
37 cities: List[CityFact] = Field(..., description="List of city facts")
38
39
40# ---------------------------------------------------------------------------
41# Config
42# ---------------------------------------------------------------------------
43
44db = PostgresDb(
45 db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
46 session_table="bg_structured_sessions",
47)
48
49
50# ---------------------------------------------------------------------------
51# Create and Run Background Examples
52# ---------------------------------------------------------------------------
53
54
55async def example_structured_background_run():
56 """Background run that returns structured data via output_schema."""
57 print("=" * 60)
58 print("Background Execution with Structured Output")
59 print("=" * 60)
60
61 agent = Agent(
62 name="CityFactsAgent",
63 model=OpenAIResponses(id="gpt-5-mini"),
64 description="An agent that provides structured facts about cities.",
65 db=db,
66 )
67
68 # Start a background run with structured output
69 run_output = await agent.arun(
70 "Give me facts about Tokyo, Paris, and New York.",
71 output_schema=CityFactsResponse,
72 background=True,
73 )
74
75 print(f"Run ID: {run_output.run_id}")
76 print(f"Status: {run_output.status}")
77 assert run_output.status == RunStatus.pending
78
79 # Poll for completion
80 print("\nPolling for completion...")
81 for i in range(30):
82 await asyncio.sleep(1)
83 result = await agent.aget_run_output(
84 run_id=run_output.run_id,
85 session_id=run_output.session_id,
86 )
87 if result is None:
88 print(f" [{i + 1}s] Not in DB yet")
89 continue
90
91 print(f" [{i + 1}s] Status: {result.status}")
92
93 if result.status == RunStatus.completed:
94 print("\nCompleted! Structured output:")
95
96 # Parse the JSON content into our Pydantic model
97 try:
98 content = result.content
99 if isinstance(content, str):
100 import json
101
102 content = json.loads(content)
103 parsed = CityFactsResponse.model_validate(content)
104 for city_fact in parsed.cities:
105 print(f"\n {city_fact.city}, {city_fact.country}")
106 print(f" Population: {city_fact.population}")
107 print(f" Fun fact: {city_fact.fun_fact}")
108 except Exception:
109 print(f" Raw content: {result.content}")
110 break
111 elif result.status == RunStatus.error:
112 print(f"\nFailed: {result.content}")
113 break
114 else:
115 print("\nTimed out waiting for completion")
116
117
118async def example_multiple_background_runs():
119 """Launch multiple background runs concurrently and collect results."""
120 from uuid import uuid4
121
122 print()
123 print("=" * 60)
124 print("Multiple Concurrent Background Runs")
125 print("=" * 60)
126
127 agent = Agent(
128 name="QuizAgent",
129 model=OpenAIResponses(id="gpt-5-mini"),
130 description="An agent that answers trivia questions.",
131 db=db,
132 )
133
134 questions = [
135 "What is the tallest mountain in the world? Answer in one sentence.",
136 "What is the deepest ocean trench? Answer in one sentence.",
137 "What is the longest river in the world? Answer in one sentence.",
138 ]
139
140 # Launch all runs concurrently, each with its own session to avoid conflicts
141 runs = []
142 for question in questions:
143 session_id = str(uuid4())
144 run_output = await agent.arun(question, background=True, session_id=session_id)
145 runs.append(run_output)
146 print(f"Launched: {run_output.run_id} - {question[:50]}...")
147
148 # Poll all runs until all complete
149 print("\nWaiting for all runs to complete...")
150 results = {}
151 for attempt in range(30):
152 await asyncio.sleep(1)
153 all_done = True
154 for run in runs:
155 if run.run_id in results:
156 continue
157 result = await agent.aget_run_output(
158 run_id=run.run_id,
159 session_id=run.session_id,
160 )
161 if result and result.status in (RunStatus.completed, RunStatus.error):
162 results[run.run_id] = result
163 else:
164 all_done = False
165
166 if all_done:
167 break
168
169 # Print results
170 print(f"\nCompleted {len(results)}/{len(runs)} runs:")
171 for i, run in enumerate(runs):
172 result = results.get(run.run_id)
173 if result:
174 print(f"\n Q: {questions[i]}")
175 print(f" A: {result.content}")
176 print(f" Status: {result.status}")
177 else:
178 print(f"\n Q: {questions[i]}")
179 print(" Status: Still running or not found")
180
181
182async def main():
183 await example_structured_background_run()
184 await example_multiple_background_runs()
185 print("\nAll examples completed!")
186
187
188if __name__ == "__main__":
189 asyncio.run(main())

Run the Example

1# Clone and setup repo
2git clone https://github.com/kern-ai/kern.git
3cd kern/cookbook/02_agents/14_advanced
4
5# Create and activate virtual environment
6./scripts/demo_setup.sh
7source .venvs/demo/bin/activate
8
9# Optiona: Run PgVector (needs docker)
10./cookbook/scripts/run_pgvector.sh
11
12# Export relevant API keys
13export OPENAI_API_KEY="***"
14
15python background_execution_structured.py