Nested Workflow with Router

Inner workflow with a `Router` step that routes to specialist agents based on the topic.

1from typing import List
2
3from kern.agent import Agent
4from kern.models.openai import OpenAIChat
5from kern.workflow import Router
6from kern.workflow.step import Step
7from kern.workflow.types import StepInput
8from kern.workflow.workflow import Workflow
9
10
11def topic_router(step_input: StepInput) -> List[Step]:
12 """Route to a specialist based on keywords in the input."""
13 text = (step_input.input or "").lower()
14
15 if any(kw in text for kw in ["code", "programming", "software", "api"]):
16 return [Step(name="tech_research", agent=tech_specialist)]
17 elif any(kw in text for kw in ["history", "war", "ancient", "century"]):
18 return [Step(name="history_research", agent=history_specialist)]
19 else:
20 return [Step(name="general_research", agent=general_specialist)]
21
22
23# Specialist agents
24tech_specialist = Agent(
25 name="Tech Specialist",
26 model=OpenAIChat(id="gpt-4o-mini"),
27 instructions="You are a technology expert. Provide detailed technical explanations.",
28)
29
30history_specialist = Agent(
31 name="History Specialist",
32 model=OpenAIChat(id="gpt-4o-mini"),
33 instructions="You are a historian. Provide detailed historical context and analysis.",
34)
35
36general_specialist = Agent(
37 name="General Specialist",
38 model=OpenAIChat(id="gpt-4o-mini"),
39 instructions="You are a general knowledge expert. Provide clear, informative answers.",
40)
41
42# Inner workflow: routed research
43inner_workflow = Workflow(
44 name="Routed Research",
45 description="Routes to the right specialist based on the topic",
46 steps=[
47 Router(
48 name="specialist_router",
49 selector=topic_router,
50 choices=[
51 Step(name="tech_research", agent=tech_specialist),
52 Step(name="history_research", agent=history_specialist),
53 Step(name="general_research", agent=general_specialist),
54 ],
55 ),
56 ],
57)
58
59# Outer workflow
60editor = Agent(
61 name="Editor",
62 model=OpenAIChat(id="gpt-4o-mini"),
63 instructions="You are an editor. Polish and improve the specialist's research into a clear article.",
64)
65
66outer_workflow = Workflow(
67 name="Smart Research and Edit",
68 description="Routes to the right specialist, then edits the result",
69 steps=[
70 Step(name="research_phase", workflow=inner_workflow),
71 Step(name="editing_phase", agent=editor),
72 ],
73)
74
75
76if __name__ == "__main__":
77 outer_workflow.print_response(
78 input="Explain how REST APIs work and best practices for designing them",
79 stream=True,
80 )

Run the Example

1git clone https://github.com/kern-ai/kern.git
2cd kern/cookbook/04_workflows/06_advanced_concepts/workflow_as_a_step
3
4pip install kern-ai openai
5
6python nested_workflow_with_router.py