String Selector

The simplest approach - return the step name and Router resolves it from choices:

1from typing import List, Union
2
3from kern.agent import Agent
4from kern.models.openai import OpenAIChat
5from kern.workflow.router import Router
6from kern.workflow.step import Step
7from kern.workflow.types import StepInput
8from kern.workflow.workflow import Workflow
9
10tech_expert = Agent(
11 name="tech_expert",
12 model=OpenAIChat(id="gpt-4o-mini"),
13 instructions="You are a tech expert. Provide technical analysis.",
14)
15
16biz_expert = Agent(
17 name="biz_expert",
18 model=OpenAIChat(id="gpt-4o-mini"),
19 instructions="You are a business expert. Provide business insights.",
20)
21
22generalist = Agent(
23 name="generalist",
24 model=OpenAIChat(id="gpt-4o-mini"),
25 instructions="You are a generalist. Provide general information.",
26)
27
28tech_step = Step(name="Tech Research", agent=tech_expert)
29business_step = Step(name="Business Research", agent=biz_expert)
30general_step = Step(name="General Research", agent=generalist)
31
32
33def route_by_topic(step_input: StepInput) -> Union[str, Step, List[Step]]:
34 """Selector can return step name as string - Router resolves it."""
35 topic = step_input.input.lower()
36
37 if "tech" in topic or "ai" in topic or "software" in topic:
38 return "Tech Research" # Return name as string
39 elif "business" in topic or "market" in topic or "finance" in topic:
40 return "Business Research" # Return name as string
41 else:
42 return "General Research" # Return name as string
43
44
45workflow = Workflow(
46 name="Expert Routing (String Selector)",
47 steps=[
48 Router(
49 name="Topic Router",
50 selector=route_by_topic,
51 choices=[tech_step, business_step, general_step],
52 ),
53 ],
54)
55
56if __name__ == "__main__":
57 workflow.print_response("Tell me about AI trends", stream=True)