Migrating to Workflows 2.0

Migrate your Workflows from 1.0 to 2.0.

Migrating from Workflows 1.0

Workflows 2.0 is a completely new approach to agent automation, and requires an upgrade from the Workflows 1.0 implementation. It introduces a new, more flexible and powerful way to build workflows.

Key Differences

Workflows 1.0Workflows 2.0Migration Path
Linear onlyMultiple patternsAdd Parallel/Condition as needed
Agent-focusedMixed componentsConvert functions to Steps
Limited branchingSmart routingReplace if/else with Router
Manual loopsBuilt-in LoopUse Loop component

Migration Steps

  1. Assess current workflow: Identify parallel opportunities
  2. Add conditions: Convert if/else logic to Condition components
  3. Extract functions: Move custom logic to function-based steps
  4. Enable streaming: For event-based information
  5. Add state management: Use workflow_session_state for data sharing

Example of Blog Post Generator Workflow

Lets take an example that demonstrates how to build a sophisticated blog post generator that combines web research capabilities with professional writing expertise. The workflow uses a multi-stage approach:

  1. Intelligent web research and source gathering
  2. Content extraction and processing
  3. Professional blog post writing with proper citations

Here's the code for the blog post generator in Workflows 1.0:

1import json
2from textwrap import dedent
3from typing import Dict, Iterator, Optional
4
5from kern.agent import Agent
6from kern.models.openai import OpenAIChat
7from kern.run.workflow import WorkflowCompletedEvent
8from kern.storage.sqlite import SqliteDb
9from kern.tools.duckduckgo import DuckDuckGoTools
10from kern.tools.newspaper4k import Newspaper4kTools
11from kern.utils.log import logger
12from kern.utils.pprint import pprint_run_response
13from kern.workflow import RunOutput, Workflow
14from pydantic import BaseModel, Field
15
16
17class NewsArticle(BaseModel):
18 title: str = Field(..., description="Title of the article.")
19 url: str = Field(..., description="Link to the article.")
20 summary: Optional[str] = Field(
21 ..., description="Summary of the article if available."
22 )
23
24
25class SearchResults(BaseModel):
26 articles: list[NewsArticle]
27
28
29class ScrapedArticle(BaseModel):
30 title: str = Field(..., description="Title of the article.")
31 url: str = Field(..., description="Link to the article.")
32 summary: Optional[str] = Field(
33 ..., description="Summary of the article if available."
34 )
35 content: Optional[str] = Field(
36 ...,
37 description="Full article content in markdown format. None if content is unavailable.",
38 )
39
40
41class BlogPostGenerator(Workflow):
42 """Advanced workflow for generating professional blog posts with proper research and citations."""
43
44 description: str = dedent("""\
45 An intelligent blog post generator that creates engaging, well-researched content.
46 This workflow orchestrates multiple AI agents to research, analyze, and craft
47 compelling blog posts that combine journalistic rigor with engaging storytelling.
48 The system excels at creating content that is both informative and optimized for
49 digital consumption.
50 """)
51
52 # Search Agent: Handles intelligent web searching and source gathering
53 searcher: Agent = Agent(
54 model=OpenAIChat(id="gpt-5-mini"),
55 tools=[DuckDuckGoTools()],
56 description=dedent("""\
57 You are BlogResearch-X, an elite research assistant specializing in discovering
58 high-quality sources for compelling blog content. Your expertise includes:
59
60 - Finding authoritative and trending sources
61 - Evaluating content credibility and relevance
62 - Identifying diverse perspectives and expert opinions
63 - Discovering unique angles and insights
64 - Ensuring comprehensive topic coverage\
65 """),
66 instructions=dedent("""\
67 1. Search Strategy
68 - Find 10-15 relevant sources and select the 5-7 best ones
69 - Prioritize recent, authoritative content
70 - Look for unique angles and expert insights
71 2. Source Evaluation
72 - Verify source credibility and expertise
73 - Check publication dates for timeliness
74 - Assess content depth and uniqueness
75 3. Diversity of Perspectives
76 - Include different viewpoints
77 - Gather both mainstream and expert opinions
78 - Find supporting data and statistics\
79 """),
80 output_schema=SearchResults,
81 )
82
83 # Content Scraper: Extracts and processes article content
84 article_scraper: Agent = Agent(
85 model=OpenAIChat(id="gpt-5-mini"),
86 tools=[Newspaper4kTools()],
87 description=dedent("""\
88 You are ContentBot-X, a specialist in extracting and processing digital content
89 for blog creation. Your expertise includes:
90
91 - Efficient content extraction
92 - Smart formatting and structuring
93 - Key information identification
94 - Quote and statistic preservation
95 - Maintaining source attribution\
96 """),
97 instructions=dedent("""\
98 1. Content Extraction
99 - Extract content from the article
100 - Preserve important quotes and statistics
101 - Maintain proper attribution
102 - Handle paywalls gracefully
103 2. Content Processing
104 - Format text in clean markdown
105 - Preserve key information
106 - Structure content logically
107 3. Quality Control
108 - Verify content relevance
109 - Ensure accurate extraction
110 - Maintain readability\
111 """),
112 output_schema=ScrapedArticle,
113 )
114
115 # Content Writer Agent: Crafts engaging blog posts from research
116 writer: Agent = Agent(
117 model=OpenAIChat(id="gpt-5-mini"),
118 description=dedent("""\
119 You are BlogMaster-X, an elite content creator combining journalistic excellence
120 with digital marketing expertise. Your strengths include:
121
122 - Crafting viral-worthy headlines
123 - Writing engaging introductions
124 - Structuring content for digital consumption
125 - Incorporating research seamlessly
126 - Optimizing for SEO while maintaining quality
127 - Creating shareable conclusions\
128 """),
129 instructions=dedent("""\
130 1. Content Strategy
131 - Craft attention-grabbing headlines
132 - Write compelling introductions
133 - Structure content for engagement
134 - Include relevant subheadings
135 2. Writing Excellence
136 - Balance expertise with accessibility
137 - Use clear, engaging language
138 - Include relevant examples
139 - Incorporate statistics naturally
140 3. Source Integration
141 - Cite sources properly
142 - Include expert quotes
143 - Maintain factual accuracy
144 4. Digital Optimization
145 - Structure for scanability
146 - Include shareable takeaways
147 - Optimize for SEO
148 - Add engaging subheadings\
149 """),
150 expected_output=dedent("""\
151 # {Viral-Worthy Headline}
152
153 ## Introduction
154 {Engaging hook and context}
155
156 ## {Compelling Section 1}
157 {Key insights and analysis}
158 {Expert quotes and statistics}
159
160 ## {Engaging Section 2}
161 {Deeper exploration}
162 {Real-world examples}
163
164 ## {Practical Section 3}
165 {Actionable insights}
166 {Expert recommendations}
167
168 ## Key Takeaways
169 - {Shareable insight 1}
170 - {Practical takeaway 2}
171 - {Notable finding 3}
172
173 ## Sources
174 {Properly attributed sources with links}\
175 """),
176 markdown=True,
177 )
178
179 def run(
180 self,
181 topic: str,
182 use_search_cache: bool = True,
183 use_scrape_cache: bool = True,
184 use_cached_report: bool = True,
185 ) -> Iterator[RunOutputEvent]:
186 logger.info(f"Generating a blog post on: {topic}")
187
188 # Use the cached blog post if use_cache is True
189 if use_cached_report:
190 cached_blog_post = self.get_cached_blog_post(topic)
191 if cached_blog_post:
192 yield WorkflowCompletedEvent(
193 run_id=self.run_id,
194 content=cached_blog_post,
195 )
196 return
197
198 # Search the web for articles on the topic
199 search_results: Optional[SearchResults] = self.get_search_results(
200 topic, use_search_cache
201 )
202 # If no search_results are found for the topic, end the workflow
203 if search_results is None or len(search_results.articles) == 0:
204 yield WorkflowCompletedEvent(
205 run_id=self.run_id,
206 content=f"Sorry, could not find any articles on the topic: {topic}",
207 )
208 return
209
210 # Scrape the search results
211 scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(
212 topic, search_results, use_scrape_cache
213 )
214
215 # Prepare the input for the writer
216 writer_input = {
217 "topic": topic,
218 "articles": [v.model_dump() for v in scraped_articles.values()],
219 }
220
221 # Run the writer and yield the response
222 yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)
223
224 # Save the blog post in the cache
225 self.add_blog_post_to_cache(topic, self.writer.run_response.content)
226
227 def get_cached_blog_post(self, topic: str) -> Optional[str]:
228 logger.info("Checking if cached blog post exists")
229
230 return self.session_state.get("blog_posts", {}).get(topic)
231
232 def add_blog_post_to_cache(self, topic: str, blog_post: str):
233 logger.info(f"Saving blog post for topic: {topic}")
234 self.session_state.setdefault("blog_posts", {})
235 self.session_state["blog_posts"][topic] = blog_post
236
237 def get_cached_search_results(self, topic: str) -> Optional[SearchResults]:
238 logger.info("Checking if cached search results exist")
239 search_results = self.session_state.get("search_results", {}).get(topic)
240 return (
241 SearchResults.model_validate(search_results)
242 if search_results and isinstance(search_results, dict)
243 else search_results
244 )
245
246 def add_search_results_to_cache(self, topic: str, search_results: SearchResults):
247 logger.info(f"Saving search results for topic: {topic}")
248 self.session_state.setdefault("search_results", {})
249 self.session_state["search_results"][topic] = search_results
250
251 def get_cached_scraped_articles(
252 self, topic: str
253 ) -> Optional[Dict[str, ScrapedArticle]]:
254 logger.info("Checking if cached scraped articles exist")
255 scraped_articles = self.session_state.get("scraped_articles", {}).get(topic)
256 return (
257 ScrapedArticle.model_validate(scraped_articles)
258 if scraped_articles and isinstance(scraped_articles, dict)
259 else scraped_articles
260 )
261
262 def add_scraped_articles_to_cache(
263 self, topic: str, scraped_articles: Dict[str, ScrapedArticle]
264 ):
265 logger.info(f"Saving scraped articles for topic: {topic}")
266 self.session_state.setdefault("scraped_articles", {})
267 self.session_state["scraped_articles"][topic] = scraped_articles
268
269 def get_search_results(
270 self, topic: str, use_search_cache: bool, num_attempts: int = 3
271 ) -> Optional[SearchResults]:
272 # Get cached search_results from the session state if use_search_cache is True
273 if use_search_cache:
274 try:
275 search_results_from_cache = self.get_cached_search_results(topic)
276 if search_results_from_cache is not None:
277 search_results = SearchResults.model_validate(
278 search_results_from_cache
279 )
280 logger.info(
281 f"Found {len(search_results.articles)} articles in cache."
282 )
283 return search_results
284 except Exception as e:
285 logger.warning(f"Could not read search results from cache: {e}")
286
287 # If there are no cached search_results, use the searcher to find the latest articles
288 for attempt in range(num_attempts):
289 try:
290 searcher_response: RunOutput = self.searcher.run(topic)
291 if (
292 searcher_response is not None
293 and searcher_response.content is not None
294 and isinstance(searcher_response.content, SearchResults)
295 ):
296 article_count = len(searcher_response.content.articles)
297 logger.info(
298 f"Found {article_count} articles on attempt {attempt + 1}"
299 )
300 # Cache the search results
301 self.add_search_results_to_cache(topic, searcher_response.content)
302 return searcher_response.content
303 else:
304 logger.warning(
305 f"Attempt {attempt + 1}/{num_attempts} failed: Invalid response type"
306 )
307 except Exception as e:
308 logger.warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
309
310 logger.error(f"Failed to get search results after {num_attempts} attempts")
311 return None
312
313 def scrape_articles(
314 self, topic: str, search_results: SearchResults, use_scrape_cache: bool
315 ) -> Dict[str, ScrapedArticle]:
316 scraped_articles: Dict[str, ScrapedArticle] = {}
317
318 # Get cached scraped_articles from the session state if use_scrape_cache is True
319 if use_scrape_cache:
320 try:
321 scraped_articles_from_cache = self.get_cached_scraped_articles(topic)
322 if scraped_articles_from_cache is not None:
323 scraped_articles = scraped_articles_from_cache
324 logger.info(
325 f"Found {len(scraped_articles)} scraped articles in cache."
326 )
327 return scraped_articles
328 except Exception as e:
329 logger.warning(f"Could not read scraped articles from cache: {e}")
330
331 # Scrape the articles that are not in the cache
332 for article in search_results.articles:
333 if article.url in scraped_articles:
334 logger.info(f"Found scraped article in cache: {article.url}")
335 continue
336
337 article_scraper_response: RunOutput = self.article_scraper.run(
338 article.url
339 )
340 if (
341 article_scraper_response is not None
342 and article_scraper_response.content is not None
343 and isinstance(article_scraper_response.content, ScrapedArticle)
344 ):
345 scraped_articles[article_scraper_response.content.url] = (
346 article_scraper_response.content
347 )
348 logger.info(f"Scraped article: {article_scraper_response.content.url}")
349
350 # Save the scraped articles in the session state
351 self.add_scraped_articles_to_cache(topic, scraped_articles)
352 return scraped_articles
353
354
355# Run the workflow if the script is executed directly
356if __name__ == "__main__":
357 import random
358
359 from rich.prompt import Prompt
360
361 # Fun example prompts to showcase the generator's versatility
362 example_prompts = [
363 "Why Cats Secretly Run the Internet",
364 "The Science Behind Why Pizza Tastes Better at 2 AM",
365 "Time Travelers' Guide to Modern Social Media",
366 "How Rubber Ducks Revolutionized Software Development",
367 "The Secret Society of Office Plants: A Survival Guide",
368 "Why Dogs Think We're Bad at Smelling Things",
369 "The Underground Economy of Coffee Shop WiFi Passwords",
370 "A Historical Analysis of Dad Jokes Through the Ages",
371 ]
372
373 # Get topic from user
374 topic = Prompt.ask(
375 "[bold]Enter a blog post topic[/bold] (or press Enter for a random example)\n✨",
376 default=random.choice(example_prompts),
377 )
378
379 # Convert the topic to a URL-safe string for use in session_id
380 url_safe_topic = topic.lower().replace(" ", "-")
381
382 # Initialize the blog post generator workflow
383 # - Creates a unique session ID based on the topic
384 # - Sets up SQLite storage for caching results
385 generate_blog_post = BlogPostGenerator(
386 session_id=f"generate-blog-post-on-{url_safe_topic}",
387 db=SqliteDb(
388 db_file="tmp/agno_workflows.db",
389 ),
390 debug_mode=True,
391 )
392
393 # Execute the workflow with caching enabled
394 # Returns an iterator of RunOutput objects containing the generated content
395 blog_post: Iterator[RunOutputEvent] = generate_blog_post.run(
396 topic=topic,
397 use_search_cache=True,
398 use_scrape_cache=True,
399 use_cached_report=True,
400 )
401
402 # Print the response
403 pprint_run_response(blog_post, markdown=True)

To convert this into Workflows 2.0 structure, either we can break down the workflow into smaller steps and follow the development guide. Or for simplicity we can directly replace the run method to a single custom function executor as mentioned here.

It will look like this:

1import asyncio
2import json
3from textwrap import dedent
4from typing import Dict, Optional
5
6from kern.agent import Agent
7from kern.db.sqlite import SqliteDb
8from kern.models.openai import OpenAIChat
9from kern.tools.duckduckgo import DuckDuckGoTools
10from kern.tools.newspaper4k import Newspaper4kTools
11from kern.utils.log import logger
12from kern.utils.pprint import pprint_run_response
13from kern.workflow.workflow import Workflow
14from pydantic import BaseModel, Field
15
16
17# --- Response Models ---
18class NewsArticle(BaseModel):
19 title: str = Field(..., description="Title of the article.")
20 url: str = Field(..., description="Link to the article.")
21 summary: Optional[str] = Field(
22 ..., description="Summary of the article if available."
23 )
24
25
26class SearchResults(BaseModel):
27 articles: list[NewsArticle]
28
29
30class ScrapedArticle(BaseModel):
31 title: str = Field(..., description="Title of the article.")
32 url: str = Field(..., description="Link to the article.")
33 summary: Optional[str] = Field(
34 ..., description="Summary of the article if available."
35 )
36 content: Optional[str] = Field(
37 ...,
38 description="Full article content in markdown format. None if content is unavailable.",
39 )
40
41
42# --- Agents ---
43research_agent = Agent(
44 name="Blog Research Agent",
45 model=OpenAIChat(id="gpt-5-mini"),
46 tools=[DuckDuckGoTools()],
47 description=dedent("""\
48 You are BlogResearch-X, an elite research assistant specializing in discovering
49 high-quality sources for compelling blog content. Your expertise includes:
50
51 - Finding authoritative and trending sources
52 - Evaluating content credibility and relevance
53 - Identifying diverse perspectives and expert opinions
54 - Discovering unique angles and insights
55 - Ensuring comprehensive topic coverage
56 """),
57 instructions=dedent("""\
58 1. Search Strategy
59 - Find 10-15 relevant sources and select the 5-7 best ones
60 - Prioritize recent, authoritative content
61 - Look for unique angles and expert insights
62 2. Source Evaluation
63 - Verify source credibility and expertise
64 - Check publication dates for timeliness
65 - Assess content depth and uniqueness
66 3. Diversity of Perspectives
67 - Include different viewpoints
68 - Gather both mainstream and expert opinions
69 - Find supporting data and statistics
70 """),
71 output_schema=SearchResults,
72)
73
74content_scraper_agent = Agent(
75 name="Content Scraper Agent",
76 model=OpenAIChat(id="gpt-5-mini"),
77 tools=[Newspaper4kTools()],
78 description=dedent("""\
79 You are ContentBot-X, a specialist in extracting and processing digital content
80 for blog creation. Your expertise includes:
81
82 - Efficient content extraction
83 - Smart formatting and structuring
84 - Key information identification
85 - Quote and statistic preservation
86 - Maintaining source attribution
87 """),
88 instructions=dedent("""\
89 1. Content Extraction
90 - Extract content from the article
91 - Preserve important quotes and statistics
92 - Maintain proper attribution
93 - Handle paywalls gracefully
94 2. Content Processing
95 - Format text in clean markdown
96 - Preserve key information
97 - Structure content logically
98 3. Quality Control
99 - Verify content relevance
100 - Ensure accurate extraction
101 - Maintain readability
102 """),
103 output_schema=ScrapedArticle,
104)
105
106blog_writer_agent = Agent(
107 name="Blog Writer Agent",
108 model=OpenAIChat(id="gpt-5-mini"),
109 description=dedent("""\
110 You are BlogMaster-X, an elite content creator combining journalistic excellence
111 with digital marketing expertise. Your strengths include:
112
113 - Crafting viral-worthy headlines
114 - Writing engaging introductions
115 - Structuring content for digital consumption
116 - Incorporating research seamlessly
117 - Optimizing for SEO while maintaining quality
118 - Creating shareable conclusions
119 """),
120 instructions=dedent("""\
121 1. Content Strategy
122 - Craft attention-grabbing headlines
123 - Write compelling introductions
124 - Structure content for engagement
125 - Include relevant subheadings
126 2. Writing Excellence
127 - Balance expertise with accessibility
128 - Use clear, engaging language
129 - Include relevant examples
130 - Incorporate statistics naturally
131 3. Source Integration
132 - Cite sources properly
133 - Include expert quotes
134 - Maintain factual accuracy
135 4. Digital Optimization
136 - Structure for scanability
137 - Include shareable takeaways
138 - Optimize for SEO
139 - Add engaging subheadings
140
141 Format your blog post with this structure:
142 # {Viral-Worthy Headline}
143
144 ## Introduction
145 {Engaging hook and context}
146
147 ## {Compelling Section 1}
148 {Key insights and analysis}
149 {Expert quotes and statistics}
150
151 ## {Engaging Section 2}
152 {Deeper exploration}
153 {Real-world examples}
154
155 ## {Practical Section 3}
156 {Actionable insights}
157 {Expert recommendations}
158
159 ## Key Takeaways
160 - {Shareable insight 1}
161 - {Practical takeaway 2}
162 - {Notable finding 3}
163
164 ## Sources
165 {Properly attributed sources with links}
166 """),
167 markdown=True,
168)
169
170
171# --- Helper Functions ---
172def get_cached_blog_post(session_state, topic: str) -> Optional[str]:
173 """Get cached blog post from workflow session state"""
174 logger.info("Checking if cached blog post exists")
175 return session_state.get("blog_posts", {}).get(topic)
176
177
178def cache_blog_post(session_state, topic: str, blog_post: str):
179 """Cache blog post in workflow session state"""
180 logger.info(f"Saving blog post for topic: {topic}")
181 if "blog_posts" not in session_state:
182 session_state["blog_posts"] = {}
183 session_state["blog_posts"][topic] = blog_post
184
185
186def get_cached_search_results(session_state, topic: str) -> Optional[SearchResults]:
187 """Get cached search results from workflow session state"""
188 logger.info("Checking if cached search results exist")
189 search_results = session_state.get("search_results", {}).get(topic)
190 if search_results and isinstance(search_results, dict):
191 try:
192 return SearchResults.model_validate(search_results)
193 except Exception as e:
194 logger.warning(f"Could not validate cached search results: {e}")
195 return search_results if isinstance(search_results, SearchResults) else None
196
197
198def cache_search_results(session_state, topic: str, search_results: SearchResults):
199 """Cache search results in workflow session state"""
200 logger.info(f"Saving search results for topic: {topic}")
201 if "search_results" not in session_state:
202 session_state["search_results"] = {}
203 session_state["search_results"][topic] = search_results.model_dump()
204
205
206def get_cached_scraped_articles(
207 session_state, topic: str
208) -> Optional[Dict[str, ScrapedArticle]]:
209 """Get cached scraped articles from workflow session state"""
210 logger.info("Checking if cached scraped articles exist")
211 scraped_articles = session_state.get("scraped_articles", {}).get(topic)
212 if scraped_articles and isinstance(scraped_articles, dict):
213 try:
214 return {
215 url: ScrapedArticle.model_validate(article)
216 for url, article in scraped_articles.items()
217 }
218 except Exception as e:
219 logger.warning(f"Could not validate cached scraped articles: {e}")
220 return scraped_articles if isinstance(scraped_articles, dict) else None
221
222
223def cache_scraped_articles(
224 session_state, topic: str, scraped_articles: Dict[str, ScrapedArticle]
225):
226 """Cache scraped articles in workflow session state"""
227 logger.info(f"Saving scraped articles for topic: {topic}")
228 if "scraped_articles" not in session_state:
229 session_state["scraped_articles"] = {}
230 session_state["scraped_articles"][topic] = {
231 url: article.model_dump() for url, article in scraped_articles.items()
232 }
233
234
235async def get_search_results(
236 session_state, topic: str, use_cache: bool = True, num_attempts: int = 3
237) -> Optional[SearchResults]:
238 """Get search results with caching support"""
239
240 # Check cache first
241 if use_cache:
242 cached_results = get_cached_search_results(session_state, topic)
243 if cached_results:
244 logger.info(f"Found {len(cached_results.articles)} articles in cache.")
245 return cached_results
246
247 # Search for new results
248 for attempt in range(num_attempts):
249 try:
250 print(
251 f"🔍 Searching for articles about: {topic} (attempt {attempt + 1}/{num_attempts})"
252 )
253 response = await research_agent.arun(topic)
254
255 if (
256 response
257 and response.content
258 and isinstance(response.content, SearchResults)
259 ):
260 article_count = len(response.content.articles)
261 logger.info(f"Found {article_count} articles on attempt {attempt + 1}")
262 print(f"✅ Found {article_count} relevant articles")
263
264 # Cache the results
265 cache_search_results(session_state, topic, response.content)
266 return response.content
267 else:
268 logger.warning(
269 f"Attempt {attempt + 1}/{num_attempts} failed: Invalid response type"
270 )
271
272 except Exception as e:
273 logger.warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
274
275 logger.error(f"Failed to get search results after {num_attempts} attempts")
276 return None
277
278
279async def scrape_articles(
280 session_state,
281 topic: str,
282 search_results: SearchResults,
283 use_cache: bool = True,
284) -> Dict[str, ScrapedArticle]:
285 """Scrape articles with caching support"""
286
287 # Check cache first
288 if use_cache:
289 cached_articles = get_cached_scraped_articles(session_state, topic)
290 if cached_articles:
291 logger.info(f"Found {len(cached_articles)} scraped articles in cache.")
292 return cached_articles
293
294 scraped_articles: Dict[str, ScrapedArticle] = {}
295
296 print(f"📄 Scraping {len(search_results.articles)} articles...")
297
298 for i, article in enumerate(search_results.articles, 1):
299 try:
300 print(
301 f"📖 Scraping article {i}/{len(search_results.articles)}: {article.title[:50]}..."
302 )
303 response = await content_scraper_agent.arun(article.url)
304
305 if (
306 response
307 and response.content
308 and isinstance(response.content, ScrapedArticle)
309 ):
310 scraped_articles[response.content.url] = response.content
311 logger.info(f"Scraped article: {response.content.url}")
312 print(f"✅ Successfully scraped: {response.content.title[:50]}...")
313 else:
314 print(f"❌ Failed to scrape: {article.title[:50]}...")
315
316 except Exception as e:
317 logger.warning(f"Failed to scrape {article.url}: {str(e)}")
318 print(f"❌ Error scraping: {article.title[:50]}...")
319
320 # Cache the scraped articles
321 cache_scraped_articles(session_state, topic, scraped_articles)
322 return scraped_articles
323
324
325# --- Main Execution Function ---
326async def blog_generation_execution(
327 session_state,
328 topic: str = None,
329 use_search_cache: bool = True,
330 use_scrape_cache: bool = True,
331 use_blog_cache: bool = True,
332) -> str:
333 """
334 Blog post generation workflow execution function.
335
336 Args:
337 session_state: The shared session state
338 topic: Blog post topic (if not provided, uses execution_input.input)
339 use_search_cache: Whether to use cached search results
340 use_scrape_cache: Whether to use cached scraped articles
341 use_blog_cache: Whether to use cached blog posts
342 """
343
344 blog_topic = topic
345
346 if not blog_topic:
347 return "❌ No blog topic provided. Please specify a topic."
348
349 print(f"🎨 Generating blog post about: {blog_topic}")
350 print("=" * 60)
351
352 # Check for cached blog post first
353 if use_blog_cache:
354 cached_blog = get_cached_blog_post(session_state, blog_topic)
355 if cached_blog:
356 print("📋 Found cached blog post!")
357 return cached_blog
358
359 # Phase 1: Research and gather sources
360 print("\n🔍 PHASE 1: RESEARCH & SOURCE GATHERING")
361 print("=" * 50)
362
363 search_results = await get_search_results(
364 session_state, blog_topic, use_search_cache
365 )
366
367 if not search_results or len(search_results.articles) == 0:
368 return f"❌ Sorry, could not find any articles on the topic: {blog_topic}"
369
370 print(f"📊 Found {len(search_results.articles)} relevant sources:")
371 for i, article in enumerate(search_results.articles, 1):
372 print(f" {i}. {article.title[:60]}...")
373
374 # Phase 2: Content extraction
375 print("\n📄 PHASE 2: CONTENT EXTRACTION")
376 print("=" * 50)
377
378 scraped_articles = await scrape_articles(
379 session_state, blog_topic, search_results, use_scrape_cache
380 )
381
382 if not scraped_articles:
383 return f"❌ Could not extract content from any articles for topic: {blog_topic}"
384
385 print(f"📖 Successfully extracted content from {len(scraped_articles)} articles")
386
387 # Phase 3: Blog post writing
388 print("\n✍️ PHASE 3: BLOG POST CREATION")
389 print("=" * 50)
390
391 # Prepare input for the writer
392 writer_input = {
393 "topic": blog_topic,
394 "articles": [article.model_dump() for article in scraped_articles.values()],
395 }
396
397 print("🤖 AI is crafting your blog post...")
398 writer_response = await blog_writer_agent.arun(json.dumps(writer_input, indent=2))
399
400 if not writer_response or not writer_response.content:
401 return f"❌ Failed to generate blog post for topic: {blog_topic}"
402
403 blog_post = writer_response.content
404
405 # Cache the blog post
406 cache_blog_post(session_state, blog_topic, blog_post)
407
408 print("✅ Blog post generated successfully!")
409 print(f"📝 Length: {len(blog_post)} characters")
410 print(f"📚 Sources: {len(scraped_articles)} articles")
411
412 return blog_post
413
414
415# --- Workflow Definition ---
416blog_generator_workflow = Workflow(
417 name="Blog Post Generator",
418 description="Advanced blog post generator with research and content creation capabilities",
419 db=SqliteDb(
420 session_table="workflow_session",
421 db_file="tmp/blog_generator.db",
422 ),
423 steps=blog_generation_execution,
424 session_state={}, # Initialize empty session state for caching
425)
426
427
428if __name__ == "__main__":
429 import random
430
431 async def main():
432 # Fun example topics to showcase the generator's versatility
433 example_topics = [
434 "The Rise of Artificial General Intelligence: Latest Breakthroughs",
435 "How Quantum Computing is Revolutionizing Cybersecurity",
436 "Sustainable Living in 2024: Practical Tips for Reducing Carbon Footprint",
437 "The Future of Work: AI and Human Collaboration",
438 "Space Tourism: From Science Fiction to Reality",
439 "Mindfulness and Mental Health in the Digital Age",
440 "The Evolution of Electric Vehicles: Current State and Future Trends",
441 "Why Cats Secretly Run the Internet",
442 "The Science Behind Why Pizza Tastes Better at 2 AM",
443 "How Rubber Ducks Revolutionized Software Development",
444 ]
445
446 # Test with a random topic
447 topic = random.choice(example_topics)
448
449 print("🧪 Testing Blog Post Generator v2.0")
450 print("=" * 60)
451 print(f"📝 Topic: {topic}")
452 print()
453
454 # Generate the blog post
455 resp = await blog_generator_workflow.arun(
456 topic=topic,
457 use_search_cache=True,
458 use_scrape_cache=True,
459 use_blog_cache=True,
460 )
461
462 pprint_run_response(resp, markdown=True, show_time=True)
463
464 asyncio.run(main())

For more examples and advanced patterns, see here. Each file demonstrates a specific pattern with detailed comments and real-world use cases.