Dynamic Selector
The selector can receive step_choices as a second parameter for dynamic selection:
1from typing import List, Union23from kern.agent import Agent4from kern.models.openai import OpenAIChat5from kern.workflow.router import Router6from kern.workflow.step import Step7from kern.workflow.types import StepInput8from kern.workflow.workflow import Workflow910researcher = Agent(11 name="researcher",12 model=OpenAIChat(id="gpt-4o-mini"),13 instructions="You are a researcher.",14)1516writer = Agent(17 name="writer",18 model=OpenAIChat(id="gpt-4o-mini"),19 instructions="You are a writer.",20)2122reviewer = Agent(23 name="reviewer",24 model=OpenAIChat(id="gpt-4o-mini"),25 instructions="You are a reviewer.",26)272829def dynamic_selector(step_input: StepInput, step_choices: list) -> Union[str, Step, List[Step]]:30 """31 Selector receives step_choices - can select by name or return Step directly.32 step_choices contains the prepared Step objects from Router.choices.33 """34 user_input = step_input.input.lower()3536 # Build name map from step_choices37 step_map = {s.name: s for s in step_choices if hasattr(s, "name") and s.name}3839 print(f"Available steps: {list(step_map.keys())}")4041 # Can return step name as string42 if "research" in user_input:43 return "researcher"4445 # Can return Step object directly46 if "write" in user_input:47 return step_map.get("writer", step_choices[0])4849 # Can return list of Steps for chaining50 if "full" in user_input:51 return [step_map["researcher"], step_map["writer"], step_map["reviewer"]]5253 # Default54 return step_choices[0]555657workflow = Workflow(58 name="Dynamic Routing (step_choices)",59 steps=[60 Router(61 name="Dynamic Router",62 selector=dynamic_selector,63 choices=[researcher, writer, reviewer],64 ),65 ],66)6768if __name__ == "__main__":69 workflow.print_response("I need to research something", stream=True)