Conditional Workflow

Execute steps based on content analysis or business logic.

This example demonstrates Workflows 2.0 conditional execution pattern. Shows how to conditionally execute steps based on content analysis, providing intelligent selection of steps based on the actual data being processed.

When to use: When you need intelligent selection of steps based on content analysis rather than simple input parameters or some other business logic. Ideal for quality gates, content-specific processing, or adaptive workflows that respond to intermediate results.

1from kern.agent import Agent
2from kern.tools.hackernews import HackerNewsTools
3from kern.workflow.condition import Condition
4from kern.workflow.step import Step
5from kern.workflow.types import StepInput
6from kern.workflow.workflow import Workflow
7
8# === BASIC AGENTS ===
9researcher = Agent(
10 name="Researcher",
11 instructions="Research the given topic and provide detailed findings.",
12 tools=[HackerNewsTools()],
13)
14
15summarizer = Agent(
16 name="Summarizer",
17 instructions="Create a clear summary of the research findings.",
18)
19
20fact_checker = Agent(
21 name="Fact Checker",
22 instructions="Verify facts and check for accuracy in the research.",
23 tools=[HackerNewsTools()],
24)
25
26writer = Agent(
27 name="Writer",
28 instructions="Write a comprehensive article based on all available research and verification.",
29)
30
31# === CONDITION EVALUATOR ===
32
33
34def needs_fact_checking(step_input: StepInput) -> bool:
35 """Determine if the research contains claims that need fact-checking"""
36 summary = step_input.previous_step_content or ""
37
38 # Look for keywords that suggest factual claims
39 fact_indicators = [
40 "study shows",
41 "research indicates",
42 "according to",
43 "statistics",
44 "data shows",
45 "survey",
46 "report",
47 "million",
48 "billion",
49 "percent",
50 "%",
51 "increase",
52 "decrease",
53 ]
54
55 return any(indicator in summary.lower() for indicator in fact_indicators)
56
57
58# === WORKFLOW STEPS ===
59research_step = Step(
60 name="research",
61 description="Research the topic",
62 agent=researcher,
63)
64
65summarize_step = Step(
66 name="summarize",
67 description="Summarize research findings",
68 agent=summarizer,
69)
70
71# Conditional fact-checking step
72fact_check_step = Step(
73 name="fact_check",
74 description="Verify facts and claims",
75 agent=fact_checker,
76)
77
78write_article = Step(
79 name="write_article",
80 description="Write final article",
81 agent=writer,
82)
83
84# === BASIC LINEAR WORKFLOW ===
85basic_workflow = Workflow(
86 name="Basic Linear Workflow",
87 description="Research -> Summarize -> Condition(Fact Check) -> Write Article",
88 steps=[
89 research_step,
90 summarize_step,
91 Condition(
92 name="fact_check_condition",
93 description="Check if fact-checking is needed",
94 evaluator=needs_fact_checking,
95 steps=[fact_check_step],
96 ),
97 write_article,
98 ],
99)
100
101if __name__ == "__main__":
102 print("Running Basic Linear Workflow Example")
103 print("=" * 50)
104
105 try:
106 basic_workflow.print_response(
107 input="Recent breakthroughs in quantum computing",
108 stream=True,
109 )
110 except Exception as e:
111 print(f"Error: {e}")
112 import traceback
113
114 traceback.print_exc()