Nested Workflow with Condition

Inner workflow with a `Condition` step for conditional fact-checking before passing results to the outer workflow.

1from kern.agent import Agent
2from kern.models.openai import OpenAIChat
3from kern.workflow import Condition
4from kern.workflow.step import Step
5from kern.workflow.types import StepInput, StepOutput
6from kern.workflow.workflow import Workflow
7
8
9def needs_fact_check(step_input: StepInput) -> bool:
10 """Check if the previous step's content mentions statistics or numbers."""
11 prev = step_input.previous_step_content or ""
12 return any(char.isdigit() for char in prev)
13
14
15def format_for_writer(step_input: StepInput) -> StepOutput:
16 prev = step_input.previous_step_content or step_input.input
17 return StepOutput(content=prev)
18
19
20# Inner workflow: research with conditional fact-checking
21researcher = Agent(
22 name="Researcher",
23 model=OpenAIChat(id="gpt-4o-mini"),
24 instructions="Research the topic. Include specific dates and numbers where relevant.",
25)
26
27fact_checker = Agent(
28 name="Fact Checker",
29 model=OpenAIChat(id="gpt-4o-mini"),
30 instructions="Verify the facts in the provided text. Correct any inaccuracies.",
31)
32
33inner_workflow = Workflow(
34 name="Research with Fact Check",
35 description="Researches a topic and conditionally fact-checks the results",
36 steps=[
37 Step(name="research", agent=researcher),
38 Condition(
39 name="fact_check_gate",
40 description="Fact-check if content contains numbers",
41 evaluator=needs_fact_check,
42 steps=[Step(name="fact_check", agent=fact_checker)],
43 else_steps=[Step(name="pass_through", executor=format_for_writer)],
44 ),
45 ],
46)
47
48# Outer workflow
49writer = Agent(
50 name="Writer",
51 model=OpenAIChat(id="gpt-4o-mini"),
52 instructions="Write a polished paragraph from the research provided.",
53)
54
55outer_workflow = Workflow(
56 name="Research, Check, and Write",
57 description="Researches, conditionally fact-checks, then writes",
58 steps=[
59 Step(name="research_phase", workflow=inner_workflow),
60 Step(name="writing_phase", agent=writer),
61 ],
62)
63
64
65if __name__ == "__main__":
66 outer_workflow.print_response(
67 input="What are the key milestones in space exploration?",
68 stream=True,
69 )

Run the Example

1git clone https://github.com/kern-ai/kern.git
2cd kern/cookbook/04_workflows/06_advanced_concepts/workflow_as_a_step
3
4pip install kern-ai openai
5
6python nested_workflow_with_condition.py