Nested Selector

When choices contains nested lists, they become Steps containers for sequential execution:

1from typing import List, Union
2
3from kern.agent import Agent
4from kern.models.openai import OpenAIChat
5from kern.workflow.router import Router
6from kern.workflow.step import Step
7from kern.workflow.types import StepInput
8from kern.workflow.workflow import Workflow
9
10step_a = Agent(name="step_a", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step A")
11step_b = Agent(name="step_b", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step B")
12step_c = Agent(name="step_c", model=OpenAIChat(id="gpt-4o-mini"), instructions="Step C")
13
14
15def nested_selector(step_input: StepInput, step_choices: list) -> Union[str, Step, List[Step]]:
16 """
17 When choices contains nested lists like [step_a, [step_b, step_c]],
18 the nested list becomes a Steps container in step_choices.
19 """
20 user_input = step_input.input.lower()
21
22 # step_choices[0] = Step for step_a
23 # step_choices[1] = Steps container with [step_b, step_c]
24
25 if "single" in user_input:
26 return step_choices[0] # Just step_a
27 else:
28 return step_choices[1] # Steps container with step_b -> step_c
29
30
31workflow = Workflow(
32 name="Nested Choices Routing",
33 steps=[
34 Router(
35 name="Nested Router",
36 selector=nested_selector,
37 choices=[step_a, [step_b, step_c]], # Nested list becomes Steps container
38 ),
39 ],
40)
41
42if __name__ == "__main__":
43 workflow.print_response("Run the sequence", stream=True)