Migrating to Workflows 2.0
Migrate your Workflows from 1.0 to 2.0.
Migrating from Workflows 1.0
Workflows 2.0 is a completely new approach to agent automation, and requires an upgrade from the Workflows 1.0 implementation. It introduces a new, more flexible and powerful way to build workflows.
Key Differences
| Workflows 1.0 | Workflows 2.0 | Migration Path |
|---|---|---|
| Linear only | Multiple patterns | Add Parallel/Condition as needed |
| Agent-focused | Mixed components | Convert functions to Steps |
| Limited branching | Smart routing | Replace if/else with Router |
| Manual loops | Built-in Loop | Use Loop component |
Migration Steps
- Assess current workflow: Identify parallel opportunities
- Add conditions: Convert if/else logic to Condition components
- Extract functions: Move custom logic to function-based steps
- Enable streaming: For event-based information
- Add state management: Use
workflow_session_statefor data sharing
Example of Blog Post Generator Workflow
Lets take an example that demonstrates how to build a sophisticated blog post generator that combines web research capabilities with professional writing expertise. The workflow uses a multi-stage approach:
- Intelligent web research and source gathering
- Content extraction and processing
- Professional blog post writing with proper citations
Here's the code for the blog post generator in Workflows 1.0:
1import json2from textwrap import dedent3from typing import Dict, Iterator, Optional45from kern.agent import Agent6from kern.models.openai import OpenAIChat7from kern.run.workflow import WorkflowCompletedEvent8from kern.storage.sqlite import SqliteDb9from kern.tools.duckduckgo import DuckDuckGoTools10from kern.tools.newspaper4k import Newspaper4kTools11from kern.utils.log import logger12from kern.utils.pprint import pprint_run_response13from kern.workflow import RunOutput, Workflow14from pydantic import BaseModel, Field151617class NewsArticle(BaseModel):18 title: str = Field(..., description="Title of the article.")19 url: str = Field(..., description="Link to the article.")20 summary: Optional[str] = Field(21 ..., description="Summary of the article if available."22 )232425class SearchResults(BaseModel):26 articles: list[NewsArticle]272829class ScrapedArticle(BaseModel):30 title: str = Field(..., description="Title of the article.")31 url: str = Field(..., description="Link to the article.")32 summary: Optional[str] = Field(33 ..., description="Summary of the article if available."34 )35 content: Optional[str] = Field(36 ...,37 description="Full article content in markdown format. None if content is unavailable.",38 )394041class BlogPostGenerator(Workflow):42 """Advanced workflow for generating professional blog posts with proper research and citations."""4344 description: str = dedent("""\45 An intelligent blog post generator that creates engaging, well-researched content.46 This workflow orchestrates multiple AI agents to research, analyze, and craft47 compelling blog posts that combine journalistic rigor with engaging storytelling.48 The system excels at creating content that is both informative and optimized for49 digital consumption.50 """)5152 # Search Agent: Handles intelligent web searching and source gathering53 searcher: Agent = Agent(54 model=OpenAIChat(id="gpt-5-mini"),55 tools=[DuckDuckGoTools()],56 description=dedent("""\57 You are BlogResearch-X, an elite research assistant specializing in discovering58 high-quality sources for compelling blog content. Your expertise includes:5960 - Finding authoritative and trending sources61 - Evaluating content credibility and relevance62 - Identifying diverse perspectives and expert opinions63 - Discovering unique angles and insights64 - Ensuring comprehensive topic coverage\65 """),66 instructions=dedent("""\67 1. Search Strategy ��68 - Find 10-15 relevant sources and select the 5-7 best ones69 - Prioritize recent, authoritative content70 - Look for unique angles and expert insights71 2. Source Evaluation ��72 - Verify source credibility and expertise73 - Check publication dates for timeliness74 - Assess content depth and uniqueness75 3. Diversity of Perspectives ��76 - Include different viewpoints77 - Gather both mainstream and expert opinions78 - Find supporting data and statistics\79 """),80 output_schema=SearchResults,81 )8283 # Content Scraper: Extracts and processes article content84 article_scraper: Agent = Agent(85 model=OpenAIChat(id="gpt-5-mini"),86 tools=[Newspaper4kTools()],87 description=dedent("""\88 You are ContentBot-X, a specialist in extracting and processing digital content89 for blog creation. Your expertise includes:9091 - Efficient content extraction92 - Smart formatting and structuring93 - Key information identification94 - Quote and statistic preservation95 - Maintaining source attribution\96 """),97 instructions=dedent("""\98 1. Content Extraction ��99 - Extract content from the article100 - Preserve important quotes and statistics101 - Maintain proper attribution102 - Handle paywalls gracefully103 2. Content Processing ��104 - Format text in clean markdown105 - Preserve key information106 - Structure content logically107 3. Quality Control ✅108 - Verify content relevance109 - Ensure accurate extraction110 - Maintain readability\111 """),112 output_schema=ScrapedArticle,113 )114115 # Content Writer Agent: Crafts engaging blog posts from research116 writer: Agent = Agent(117 model=OpenAIChat(id="gpt-5-mini"),118 description=dedent("""\119 You are BlogMaster-X, an elite content creator combining journalistic excellence120 with digital marketing expertise. Your strengths include:121122 - Crafting viral-worthy headlines123 - Writing engaging introductions124 - Structuring content for digital consumption125 - Incorporating research seamlessly126 - Optimizing for SEO while maintaining quality127 - Creating shareable conclusions\128 """),129 instructions=dedent("""\130 1. Content Strategy ��131 - Craft attention-grabbing headlines132 - Write compelling introductions133 - Structure content for engagement134 - Include relevant subheadings135 2. Writing Excellence ✍️136 - Balance expertise with accessibility137 - Use clear, engaging language138 - Include relevant examples139 - Incorporate statistics naturally140 3. Source Integration ��141 - Cite sources properly142 - Include expert quotes143 - Maintain factual accuracy144 4. Digital Optimization ��145 - Structure for scanability146 - Include shareable takeaways147 - Optimize for SEO148 - Add engaging subheadings\149 """),150 expected_output=dedent("""\151 # {Viral-Worthy Headline}152153 ## Introduction154 {Engaging hook and context}155156 ## {Compelling Section 1}157 {Key insights and analysis}158 {Expert quotes and statistics}159160 ## {Engaging Section 2}161 {Deeper exploration}162 {Real-world examples}163164 ## {Practical Section 3}165 {Actionable insights}166 {Expert recommendations}167168 ## Key Takeaways169 - {Shareable insight 1}170 - {Practical takeaway 2}171 - {Notable finding 3}172173 ## Sources174 {Properly attributed sources with links}\175 """),176 markdown=True,177 )178179 def run(180 self,181 topic: str,182 use_search_cache: bool = True,183 use_scrape_cache: bool = True,184 use_cached_report: bool = True,185 ) -> Iterator[RunOutputEvent]:186 logger.info(f"Generating a blog post on: {topic}")187188 # Use the cached blog post if use_cache is True189 if use_cached_report:190 cached_blog_post = self.get_cached_blog_post(topic)191 if cached_blog_post:192 yield WorkflowCompletedEvent(193 run_id=self.run_id,194 content=cached_blog_post,195 )196 return197198 # Search the web for articles on the topic199 search_results: Optional[SearchResults] = self.get_search_results(200 topic, use_search_cache201 )202 # If no search_results are found for the topic, end the workflow203 if search_results is None or len(search_results.articles) == 0:204 yield WorkflowCompletedEvent(205 run_id=self.run_id,206 content=f"Sorry, could not find any articles on the topic: {topic}",207 )208 return209210 # Scrape the search results211 scraped_articles: Dict[str, ScrapedArticle] = self.scrape_articles(212 topic, search_results, use_scrape_cache213 )214215 # Prepare the input for the writer216 writer_input = {217 "topic": topic,218 "articles": [v.model_dump() for v in scraped_articles.values()],219 }220221 # Run the writer and yield the response222 yield from self.writer.run(json.dumps(writer_input, indent=4), stream=True)223224 # Save the blog post in the cache225 self.add_blog_post_to_cache(topic, self.writer.run_response.content)226227 def get_cached_blog_post(self, topic: str) -> Optional[str]:228 logger.info("Checking if cached blog post exists")229230 return self.session_state.get("blog_posts", {}).get(topic)231232 def add_blog_post_to_cache(self, topic: str, blog_post: str):233 logger.info(f"Saving blog post for topic: {topic}")234 self.session_state.setdefault("blog_posts", {})235 self.session_state["blog_posts"][topic] = blog_post236237 def get_cached_search_results(self, topic: str) -> Optional[SearchResults]:238 logger.info("Checking if cached search results exist")239 search_results = self.session_state.get("search_results", {}).get(topic)240 return (241 SearchResults.model_validate(search_results)242 if search_results and isinstance(search_results, dict)243 else search_results244 )245246 def add_search_results_to_cache(self, topic: str, search_results: SearchResults):247 logger.info(f"Saving search results for topic: {topic}")248 self.session_state.setdefault("search_results", {})249 self.session_state["search_results"][topic] = search_results250251 def get_cached_scraped_articles(252 self, topic: str253 ) -> Optional[Dict[str, ScrapedArticle]]:254 logger.info("Checking if cached scraped articles exist")255 scraped_articles = self.session_state.get("scraped_articles", {}).get(topic)256 return (257 ScrapedArticle.model_validate(scraped_articles)258 if scraped_articles and isinstance(scraped_articles, dict)259 else scraped_articles260 )261262 def add_scraped_articles_to_cache(263 self, topic: str, scraped_articles: Dict[str, ScrapedArticle]264 ):265 logger.info(f"Saving scraped articles for topic: {topic}")266 self.session_state.setdefault("scraped_articles", {})267 self.session_state["scraped_articles"][topic] = scraped_articles268269 def get_search_results(270 self, topic: str, use_search_cache: bool, num_attempts: int = 3271 ) -> Optional[SearchResults]:272 # Get cached search_results from the session state if use_search_cache is True273 if use_search_cache:274 try:275 search_results_from_cache = self.get_cached_search_results(topic)276 if search_results_from_cache is not None:277 search_results = SearchResults.model_validate(278 search_results_from_cache279 )280 logger.info(281 f"Found {len(search_results.articles)} articles in cache."282 )283 return search_results284 except Exception as e:285 logger.warning(f"Could not read search results from cache: {e}")286287 # If there are no cached search_results, use the searcher to find the latest articles288 for attempt in range(num_attempts):289 try:290 searcher_response: RunOutput = self.searcher.run(topic)291 if (292 searcher_response is not None293 and searcher_response.content is not None294 and isinstance(searcher_response.content, SearchResults)295 ):296 article_count = len(searcher_response.content.articles)297 logger.info(298 f"Found {article_count} articles on attempt {attempt + 1}"299 )300 # Cache the search results301 self.add_search_results_to_cache(topic, searcher_response.content)302 return searcher_response.content303 else:304 logger.warning(305 f"Attempt {attempt + 1}/{num_attempts} failed: Invalid response type"306 )307 except Exception as e:308 logger.warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")309310 logger.error(f"Failed to get search results after {num_attempts} attempts")311 return None312313 def scrape_articles(314 self, topic: str, search_results: SearchResults, use_scrape_cache: bool315 ) -> Dict[str, ScrapedArticle]:316 scraped_articles: Dict[str, ScrapedArticle] = {}317318 # Get cached scraped_articles from the session state if use_scrape_cache is True319 if use_scrape_cache:320 try:321 scraped_articles_from_cache = self.get_cached_scraped_articles(topic)322 if scraped_articles_from_cache is not None:323 scraped_articles = scraped_articles_from_cache324 logger.info(325 f"Found {len(scraped_articles)} scraped articles in cache."326 )327 return scraped_articles328 except Exception as e:329 logger.warning(f"Could not read scraped articles from cache: {e}")330331 # Scrape the articles that are not in the cache332 for article in search_results.articles:333 if article.url in scraped_articles:334 logger.info(f"Found scraped article in cache: {article.url}")335 continue336337 article_scraper_response: RunOutput = self.article_scraper.run(338 article.url339 )340 if (341 article_scraper_response is not None342 and article_scraper_response.content is not None343 and isinstance(article_scraper_response.content, ScrapedArticle)344 ):345 scraped_articles[article_scraper_response.content.url] = (346 article_scraper_response.content347 )348 logger.info(f"Scraped article: {article_scraper_response.content.url}")349350 # Save the scraped articles in the session state351 self.add_scraped_articles_to_cache(topic, scraped_articles)352 return scraped_articles353354355# Run the workflow if the script is executed directly356if __name__ == "__main__":357 import random358359 from rich.prompt import Prompt360361 # Fun example prompts to showcase the generator's versatility362 example_prompts = [363 "Why Cats Secretly Run the Internet",364 "The Science Behind Why Pizza Tastes Better at 2 AM",365 "Time Travelers' Guide to Modern Social Media",366 "How Rubber Ducks Revolutionized Software Development",367 "The Secret Society of Office Plants: A Survival Guide",368 "Why Dogs Think We're Bad at Smelling Things",369 "The Underground Economy of Coffee Shop WiFi Passwords",370 "A Historical Analysis of Dad Jokes Through the Ages",371 ]372373 # Get topic from user374 topic = Prompt.ask(375 "[bold]Enter a blog post topic[/bold] (or press Enter for a random example)\n✨",376 default=random.choice(example_prompts),377 )378379 # Convert the topic to a URL-safe string for use in session_id380 url_safe_topic = topic.lower().replace(" ", "-")381382 # Initialize the blog post generator workflow383 # - Creates a unique session ID based on the topic384 # - Sets up SQLite storage for caching results385 generate_blog_post = BlogPostGenerator(386 session_id=f"generate-blog-post-on-{url_safe_topic}",387 db=SqliteDb(388 db_file="tmp/agno_workflows.db",389 ),390 debug_mode=True,391 )392393 # Execute the workflow with caching enabled394 # Returns an iterator of RunOutput objects containing the generated content395 blog_post: Iterator[RunOutputEvent] = generate_blog_post.run(396 topic=topic,397 use_search_cache=True,398 use_scrape_cache=True,399 use_cached_report=True,400 )401402 # Print the response403 pprint_run_response(blog_post, markdown=True)To convert this into Workflows 2.0 structure, either we can break down the workflow into smaller steps and follow the development guide. Or for simplicity we can directly replace the run method to a single custom function executor as mentioned here.
It will look like this:
1import asyncio2import json3from textwrap import dedent4from typing import Dict, Optional56from kern.agent import Agent7from kern.db.sqlite import SqliteDb8from kern.models.openai import OpenAIChat9from kern.tools.duckduckgo import DuckDuckGoTools10from kern.tools.newspaper4k import Newspaper4kTools11from kern.utils.log import logger12from kern.utils.pprint import pprint_run_response13from kern.workflow.workflow import Workflow14from pydantic import BaseModel, Field151617# --- Response Models ---18class NewsArticle(BaseModel):19 title: str = Field(..., description="Title of the article.")20 url: str = Field(..., description="Link to the article.")21 summary: Optional[str] = Field(22 ..., description="Summary of the article if available."23 )242526class SearchResults(BaseModel):27 articles: list[NewsArticle]282930class ScrapedArticle(BaseModel):31 title: str = Field(..., description="Title of the article.")32 url: str = Field(..., description="Link to the article.")33 summary: Optional[str] = Field(34 ..., description="Summary of the article if available."35 )36 content: Optional[str] = Field(37 ...,38 description="Full article content in markdown format. None if content is unavailable.",39 )404142# --- Agents ---43research_agent = Agent(44 name="Blog Research Agent",45 model=OpenAIChat(id="gpt-5-mini"),46 tools=[DuckDuckGoTools()],47 description=dedent("""\48 You are BlogResearch-X, an elite research assistant specializing in discovering49 high-quality sources for compelling blog content. Your expertise includes:5051 - Finding authoritative and trending sources52 - Evaluating content credibility and relevance53 - Identifying diverse perspectives and expert opinions54 - Discovering unique angles and insights55 - Ensuring comprehensive topic coverage56 """),57 instructions=dedent("""\58 1. Search Strategy ��59 - Find 10-15 relevant sources and select the 5-7 best ones60 - Prioritize recent, authoritative content61 - Look for unique angles and expert insights62 2. Source Evaluation ��63 - Verify source credibility and expertise64 - Check publication dates for timeliness65 - Assess content depth and uniqueness66 3. Diversity of Perspectives ��67 - Include different viewpoints68 - Gather both mainstream and expert opinions69 - Find supporting data and statistics70 """),71 output_schema=SearchResults,72)7374content_scraper_agent = Agent(75 name="Content Scraper Agent",76 model=OpenAIChat(id="gpt-5-mini"),77 tools=[Newspaper4kTools()],78 description=dedent("""\79 You are ContentBot-X, a specialist in extracting and processing digital content80 for blog creation. Your expertise includes:8182 - Efficient content extraction83 - Smart formatting and structuring84 - Key information identification85 - Quote and statistic preservation86 - Maintaining source attribution87 """),88 instructions=dedent("""\89 1. Content Extraction ��90 - Extract content from the article91 - Preserve important quotes and statistics92 - Maintain proper attribution93 - Handle paywalls gracefully94 2. Content Processing ��95 - Format text in clean markdown96 - Preserve key information97 - Structure content logically98 3. Quality Control ✅99 - Verify content relevance100 - Ensure accurate extraction101 - Maintain readability102 """),103 output_schema=ScrapedArticle,104)105106blog_writer_agent = Agent(107 name="Blog Writer Agent",108 model=OpenAIChat(id="gpt-5-mini"),109 description=dedent("""\110 You are BlogMaster-X, an elite content creator combining journalistic excellence111 with digital marketing expertise. Your strengths include:112113 - Crafting viral-worthy headlines114 - Writing engaging introductions115 - Structuring content for digital consumption116 - Incorporating research seamlessly117 - Optimizing for SEO while maintaining quality118 - Creating shareable conclusions119 """),120 instructions=dedent("""\121 1. Content Strategy ��122 - Craft attention-grabbing headlines123 - Write compelling introductions124 - Structure content for engagement125 - Include relevant subheadings126 2. Writing Excellence ✍️127 - Balance expertise with accessibility128 - Use clear, engaging language129 - Include relevant examples130 - Incorporate statistics naturally131 3. Source Integration ��132 - Cite sources properly133 - Include expert quotes134 - Maintain factual accuracy135 4. Digital Optimization ��136 - Structure for scanability137 - Include shareable takeaways138 - Optimize for SEO139 - Add engaging subheadings140141 Format your blog post with this structure:142 # {Viral-Worthy Headline}143144 ## Introduction145 {Engaging hook and context}146147 ## {Compelling Section 1}148 {Key insights and analysis}149 {Expert quotes and statistics}150151 ## {Engaging Section 2}152 {Deeper exploration}153 {Real-world examples}154155 ## {Practical Section 3}156 {Actionable insights}157 {Expert recommendations}158159 ## Key Takeaways160 - {Shareable insight 1}161 - {Practical takeaway 2}162 - {Notable finding 3}163164 ## Sources165 {Properly attributed sources with links}166 """),167 markdown=True,168)169170171# --- Helper Functions ---172def get_cached_blog_post(session_state, topic: str) -> Optional[str]:173 """Get cached blog post from workflow session state"""174 logger.info("Checking if cached blog post exists")175 return session_state.get("blog_posts", {}).get(topic)176177178def cache_blog_post(session_state, topic: str, blog_post: str):179 """Cache blog post in workflow session state"""180 logger.info(f"Saving blog post for topic: {topic}")181 if "blog_posts" not in session_state:182 session_state["blog_posts"] = {}183 session_state["blog_posts"][topic] = blog_post184185186def get_cached_search_results(session_state, topic: str) -> Optional[SearchResults]:187 """Get cached search results from workflow session state"""188 logger.info("Checking if cached search results exist")189 search_results = session_state.get("search_results", {}).get(topic)190 if search_results and isinstance(search_results, dict):191 try:192 return SearchResults.model_validate(search_results)193 except Exception as e:194 logger.warning(f"Could not validate cached search results: {e}")195 return search_results if isinstance(search_results, SearchResults) else None196197198def cache_search_results(session_state, topic: str, search_results: SearchResults):199 """Cache search results in workflow session state"""200 logger.info(f"Saving search results for topic: {topic}")201 if "search_results" not in session_state:202 session_state["search_results"] = {}203 session_state["search_results"][topic] = search_results.model_dump()204205206def get_cached_scraped_articles(207 session_state, topic: str208) -> Optional[Dict[str, ScrapedArticle]]:209 """Get cached scraped articles from workflow session state"""210 logger.info("Checking if cached scraped articles exist")211 scraped_articles = session_state.get("scraped_articles", {}).get(topic)212 if scraped_articles and isinstance(scraped_articles, dict):213 try:214 return {215 url: ScrapedArticle.model_validate(article)216 for url, article in scraped_articles.items()217 }218 except Exception as e:219 logger.warning(f"Could not validate cached scraped articles: {e}")220 return scraped_articles if isinstance(scraped_articles, dict) else None221222223def cache_scraped_articles(224 session_state, topic: str, scraped_articles: Dict[str, ScrapedArticle]225):226 """Cache scraped articles in workflow session state"""227 logger.info(f"Saving scraped articles for topic: {topic}")228 if "scraped_articles" not in session_state:229 session_state["scraped_articles"] = {}230 session_state["scraped_articles"][topic] = {231 url: article.model_dump() for url, article in scraped_articles.items()232 }233234235async def get_search_results(236 session_state, topic: str, use_cache: bool = True, num_attempts: int = 3237) -> Optional[SearchResults]:238 """Get search results with caching support"""239240 # Check cache first241 if use_cache:242 cached_results = get_cached_search_results(session_state, topic)243 if cached_results:244 logger.info(f"Found {len(cached_results.articles)} articles in cache.")245 return cached_results246247 # Search for new results248 for attempt in range(num_attempts):249 try:250 print(251 f"🔍 Searching for articles about: {topic} (attempt {attempt + 1}/{num_attempts})"252 )253 response = await research_agent.arun(topic)254255 if (256 response257 and response.content258 and isinstance(response.content, SearchResults)259 ):260 article_count = len(response.content.articles)261 logger.info(f"Found {article_count} articles on attempt {attempt + 1}")262 print(f"✅ Found {article_count} relevant articles")263264 # Cache the results265 cache_search_results(session_state, topic, response.content)266 return response.content267 else:268 logger.warning(269 f"Attempt {attempt + 1}/{num_attempts} failed: Invalid response type"270 )271272 except Exception as e:273 logger.warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")274275 logger.error(f"Failed to get search results after {num_attempts} attempts")276 return None277278279async def scrape_articles(280 session_state,281 topic: str,282 search_results: SearchResults,283 use_cache: bool = True,284) -> Dict[str, ScrapedArticle]:285 """Scrape articles with caching support"""286287 # Check cache first288 if use_cache:289 cached_articles = get_cached_scraped_articles(session_state, topic)290 if cached_articles:291 logger.info(f"Found {len(cached_articles)} scraped articles in cache.")292 return cached_articles293294 scraped_articles: Dict[str, ScrapedArticle] = {}295296 print(f"📄 Scraping {len(search_results.articles)} articles...")297298 for i, article in enumerate(search_results.articles, 1):299 try:300 print(301 f"📖 Scraping article {i}/{len(search_results.articles)}: {article.title[:50]}..."302 )303 response = await content_scraper_agent.arun(article.url)304305 if (306 response307 and response.content308 and isinstance(response.content, ScrapedArticle)309 ):310 scraped_articles[response.content.url] = response.content311 logger.info(f"Scraped article: {response.content.url}")312 print(f"✅ Successfully scraped: {response.content.title[:50]}...")313 else:314 print(f"❌ Failed to scrape: {article.title[:50]}...")315316 except Exception as e:317 logger.warning(f"Failed to scrape {article.url}: {str(e)}")318 print(f"❌ Error scraping: {article.title[:50]}...")319320 # Cache the scraped articles321 cache_scraped_articles(session_state, topic, scraped_articles)322 return scraped_articles323324325# --- Main Execution Function ---326async def blog_generation_execution(327 session_state,328 topic: str = None,329 use_search_cache: bool = True,330 use_scrape_cache: bool = True,331 use_blog_cache: bool = True,332) -> str:333 """334 Blog post generation workflow execution function.335336 Args:337 session_state: The shared session state338 topic: Blog post topic (if not provided, uses execution_input.input)339 use_search_cache: Whether to use cached search results340 use_scrape_cache: Whether to use cached scraped articles341 use_blog_cache: Whether to use cached blog posts342 """343344 blog_topic = topic345346 if not blog_topic:347 return "❌ No blog topic provided. Please specify a topic."348349 print(f"🎨 Generating blog post about: {blog_topic}")350 print("=" * 60)351352 # Check for cached blog post first353 if use_blog_cache:354 cached_blog = get_cached_blog_post(session_state, blog_topic)355 if cached_blog:356 print("📋 Found cached blog post!")357 return cached_blog358359 # Phase 1: Research and gather sources360 print("\n🔍 PHASE 1: RESEARCH & SOURCE GATHERING")361 print("=" * 50)362363 search_results = await get_search_results(364 session_state, blog_topic, use_search_cache365 )366367 if not search_results or len(search_results.articles) == 0:368 return f"❌ Sorry, could not find any articles on the topic: {blog_topic}"369370 print(f"📊 Found {len(search_results.articles)} relevant sources:")371 for i, article in enumerate(search_results.articles, 1):372 print(f" {i}. {article.title[:60]}...")373374 # Phase 2: Content extraction375 print("\n📄 PHASE 2: CONTENT EXTRACTION")376 print("=" * 50)377378 scraped_articles = await scrape_articles(379 session_state, blog_topic, search_results, use_scrape_cache380 )381382 if not scraped_articles:383 return f"❌ Could not extract content from any articles for topic: {blog_topic}"384385 print(f"📖 Successfully extracted content from {len(scraped_articles)} articles")386387 # Phase 3: Blog post writing388 print("\n✍️ PHASE 3: BLOG POST CREATION")389 print("=" * 50)390391 # Prepare input for the writer392 writer_input = {393 "topic": blog_topic,394 "articles": [article.model_dump() for article in scraped_articles.values()],395 }396397 print("🤖 AI is crafting your blog post...")398 writer_response = await blog_writer_agent.arun(json.dumps(writer_input, indent=2))399400 if not writer_response or not writer_response.content:401 return f"❌ Failed to generate blog post for topic: {blog_topic}"402403 blog_post = writer_response.content404405 # Cache the blog post406 cache_blog_post(session_state, blog_topic, blog_post)407408 print("✅ Blog post generated successfully!")409 print(f"📝 Length: {len(blog_post)} characters")410 print(f"📚 Sources: {len(scraped_articles)} articles")411412 return blog_post413414415# --- Workflow Definition ---416blog_generator_workflow = Workflow(417 name="Blog Post Generator",418 description="Advanced blog post generator with research and content creation capabilities",419 db=SqliteDb(420 session_table="workflow_session",421 db_file="tmp/blog_generator.db",422 ),423 steps=blog_generation_execution,424 session_state={}, # Initialize empty session state for caching425)426427428if __name__ == "__main__":429 import random430431 async def main():432 # Fun example topics to showcase the generator's versatility433 example_topics = [434 "The Rise of Artificial General Intelligence: Latest Breakthroughs",435 "How Quantum Computing is Revolutionizing Cybersecurity",436 "Sustainable Living in 2024: Practical Tips for Reducing Carbon Footprint",437 "The Future of Work: AI and Human Collaboration",438 "Space Tourism: From Science Fiction to Reality",439 "Mindfulness and Mental Health in the Digital Age",440 "The Evolution of Electric Vehicles: Current State and Future Trends",441 "Why Cats Secretly Run the Internet",442 "The Science Behind Why Pizza Tastes Better at 2 AM",443 "How Rubber Ducks Revolutionized Software Development",444 ]445446 # Test with a random topic447 topic = random.choice(example_topics)448449 print("🧪 Testing Blog Post Generator v2.0")450 print("=" * 60)451 print(f"📝 Topic: {topic}")452 print()453454 # Generate the blog post455 resp = await blog_generator_workflow.arun(456 topic=topic,457 use_search_cache=True,458 use_scrape_cache=True,459 use_blog_cache=True,460 )461462 pprint_run_response(resp, markdown=True, show_time=True)463464 asyncio.run(main())For more examples and advanced patterns, see here. Each file demonstrates a specific pattern with detailed comments and real-world use cases.