Advanced Workflow Patterns

Combine multiple workflow patterns to build sophisticated, production-ready automation systems

Pattern Combination: Conditional Logic + Parallel Execution + Iterative Loops + Custom Processing + Dynamic Routing

This example demonstrates how deterministic patterns can be composed to create complex yet predictable workflows.

1from kern.workflow import Condition, Loop, Parallel, Router, Step, Workflow
2
3def research_post_processor(step_input) -> StepOutput:
4 """Post-process and consolidate research data from parallel conditions"""
5 research_data = step_input.previous_step_content or ""
6
7 try:
8 # Analyze research quality and completeness
9 word_count = len(research_data.split())
10 has_tech_content = any(keyword in research_data.lower()
11 for keyword in ["technology", "ai", "software", "tech"])
12 has_business_content = any(keyword in research_data.lower()
13 for keyword in ["market", "business", "revenue", "strategy"])
14
15 # Create enhanced research summary
16 enhanced_summary = f"""
17 ## Research Analysis Report
18
19 **Data Quality:** {"✓ High-quality" if word_count > 200 else "⚠ Limited data"}
20
21 **Content Coverage:**
22 - Technical Analysis: {"✓ Completed" if has_tech_content else "✗ Not available"}
23 - Business Analysis: {"✓ Completed" if has_business_content else "✗ Not available"}
24
25 **Research Findings:**
26 {research_data}
27 """.strip()
28
29 return StepOutput(
30 content=enhanced_summary,
31 success=True,
32 )
33
34 except Exception as e:
35 return StepOutput(
36 content=f"Research post-processing failed: {str(e)}",
37 success=False,
38 error=str(e)
39 )
40
41# Complex workflow combining multiple patterns
42workflow = Workflow(
43 name="Advanced Multi-Pattern Workflow",
44 steps=[
45 Parallel(
46 Condition(
47 name="Tech Check",
48 evaluator=is_tech_topic,
49 steps=[Step(name="Tech Research", agent=tech_researcher)]
50 ),
51 Condition(
52 name="Business Check",
53 evaluator=is_business_topic,
54 steps=[
55 Loop(
56 name="Deep Business Research",
57 steps=[Step(name="Market Research", agent=market_researcher)],
58 end_condition=research_quality_check,
59 max_iterations=3
60 )
61 ]
62 ),
63 name="Conditional Research Phase"
64 ),
65 Step(
66 name="Research Post-Processing",
67 executor=research_post_processor,
68 description="Consolidate and analyze research findings with quality metrics"
69 ),
70 Router(
71 name="Content Type Router",
72 selector=content_type_selector,
73 choices=[blog_post_step, social_media_step, report_step]
74 ),
75 Step(name="Final Review", agent=reviewer),
76 ]
77)
78
79workflow.print_response("Create a comprehensive analysis of sustainable technology trends and their business impact for 2024", markdown=True)

More Examples: