Loop with Parallel Steps Workflow

This example demonstrates **Workflows 2.0** most sophisticated pattern combining loop execution with parallel processing and real-time streaming.

This example shows how to create iterative workflows that execute multiple independent tasks simultaneously within each iteration, optimizing both quality and performance.

When to use: When you need iterative quality improvement with parallel task execution in each iteration. Ideal for comprehensive research workflows where multiple independent tasks contribute to overall quality, and you need to repeat until quality thresholds are met.

1from typing import List
2
3from kern.agent import Agent
4from kern.tools.hackernews import HackerNewsTools
5from kern.tools.yfinance import YFinanceTools
6from kern.workflow import Loop, Parallel, Step, Workflow
7from kern.workflow.types import StepOutput
8
9# Create agents for research
10research_agent = Agent(
11 name="Research Agent",
12 role="Research specialist",
13 tools=[HackerNewsTools(), YFinanceTools()],
14 instructions="You are a research specialist. Research the given topic thoroughly.",
15 markdown=True,
16)
17
18analysis_agent = Agent(
19 name="Analysis Agent",
20 role="Data analyst",
21 instructions="You are a data analyst. Analyze and summarize research findings.",
22 markdown=True,
23)
24
25content_agent = Agent(
26 name="Content Agent",
27 role="Content creator",
28 instructions="You are a content creator. Create engaging content based on research.",
29 markdown=True,
30)
31
32# Create research steps
33research_hackernews_step = Step(
34 name="Research HackerNews",
35 agent=research_agent,
36 description="Research trending topics on HackerNews",
37)
38
39research_web_step = Step(
40 name="Research Web",
41 agent=research_agent,
42 description="Research additional information from web sources",
43)
44
45# Create analysis steps
46trend_analysis_step = Step(
47 name="Trend Analysis",
48 agent=analysis_agent,
49 description="Analyze trending patterns in the research",
50)
51
52sentiment_analysis_step = Step(
53 name="Sentiment Analysis",
54 agent=analysis_agent,
55 description="Analyze sentiment and opinions from the research",
56)
57
58content_step = Step(
59 name="Create Content",
60 agent=content_agent,
61 description="Create content based on research findings",
62)
63
64
65# End condition function
66def research_evaluator(outputs: List[StepOutput]) -> bool:
67 """
68 Evaluate if research results are sufficient
69 Returns True to break the loop, False to continue
70 """
71 # Check if we have good research results
72 if not outputs:
73 return False
74
75 # Calculate total content length from all outputs
76 total_content_length = sum(len(output.content or "") for output in outputs)
77
78 # Check if we have substantial content (more than 500 chars total)
79 if total_content_length > 500:
80 print(
81 f"Research evaluation passed - found substantial content ({total_content_length} chars total)"
82 )
83 return True
84
85 print(
86 f"Research evaluation failed - need more substantial research (current: {total_content_length} chars)"
87 )
88 return False
89
90
91# Create workflow with loop containing parallel steps
92workflow = Workflow(
93 name="Advanced Research and Content Workflow",
94 description="Research topics with parallel execution in a loop until conditions are met, then create content",
95 steps=[
96 Loop(
97 name="Research Loop with Parallel Execution",
98 steps=[
99 Parallel(
100 research_hackernews_step,
101 research_web_step,
102 trend_analysis_step,
103 name="Parallel Research & Analysis",
104 description="Execute research and analysis in parallel for efficiency",
105 ),
106 sentiment_analysis_step,
107 ],
108 end_condition=research_evaluator,
109 max_iterations=3, # Maximum 3 iterations
110 ),
111 content_step,
112 ],
113)
114
115if __name__ == "__main__":
116 workflow.print_response(
117 input="Research the latest trends in AI and machine learning, then create a summary",
118 stream=True,
119 )