Custom Functions in Workflows

How to use custom functions in workflows

Custom functions provide maximum flexibility by allowing you to define specific logic for step execution. Use them to preprocess inputs, orchestrate agents and teams, and postprocess outputs with complete programmatic control.

Key Capabilities

  • Custom Logic: Implement complex business rules and data transformations
  • Agent Integration: Call agents and teams within your custom processing logic
  • Data Flow Control: Transform outputs between steps for optimal data handling

Implementation Pattern Define a Step with a custom function as the executor. The function must accept a StepInput object and return a StepOutput object, ensuring seamless integration with the workflow system.

Custom function step workflow diagram

Example

1content_planning_step = Step(
2 name="Content Planning Step",
3 executor=custom_content_planning_function,
4)
5
6def custom_content_planning_function(step_input: StepInput) -> StepOutput:
7 """
8 Custom function that does intelligent content planning with context awareness
9 """
10 message = step_input.input
11 previous_step_content = step_input.previous_step_content
12
13 # Create intelligent planning prompt
14 planning_prompt = f"""
15 STRATEGIC CONTENT PLANNING REQUEST:
16
17 Core Topic: {message}
18
19 Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
20
21 Planning Requirements:
22 1. Create a comprehensive content strategy based on the research
23 2. Leverage the research findings effectively
24 3. Identify content formats and channels
25 4. Provide timeline and priority recommendations
26 5. Include engagement and distribution strategies
27
28 Please create a detailed, actionable content plan.
29 """
30
31 try:
32 response = content_planner.run(planning_prompt)
33
34 enhanced_content = f"""
35 ## Strategic Content Plan
36
37 **Planning Topic:** {message}
38
39 **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}
40
41 **Content Strategy:**
42 {response.content}
43
44 **Custom Planning Enhancements:**
45 - Research Integration: {"High" if previous_step_content else "Baseline"}
46 - Strategic Alignment: Optimized for multi-channel distribution
47 - Execution Ready: Detailed action items included
48 """.strip()
49
50 return StepOutput(content=enhanced_content)
51
52 except Exception as e:
53 return StepOutput(
54 content=f"Custom content planning failed: {str(e)}",
55 success=False,
56 )

Standard Pattern All custom functions follow this consistent structure:

1def custom_content_planning_function(step_input: StepInput) -> StepOutput:
2 # 1. Custom preprocessing
3 # 2. Call agents/teams as needed
4 # 3. Custom postprocessing
5 return StepOutput(content=enhanced_content)

Class-based executor

You can also use a class-based executor by defining a class that implements the __call__ method.

1class CustomExecutor:
2 def __call__(self, step_input: StepInput) -> StepOutput:
3 # 1. Custom preprocessing
4 # 2. Call agents/teams as needed
5 # 3. Custom postprocessing
6 return StepOutput(content=enhanced_content)
7
8content_planning_step = Step(
9 name="Content Planning Step",
10 executor=CustomExecutor(),
11)

When is this useful?:

  • Configuration at initialization: Pass in settings, API keys, or behavior flags when creating the executor
  • Stateful execution: Maintain counters, caches, or track information across multiple workflow runs
  • Reusable components: Create configured executor instances that can be shared across multiple workflows
1class CustomExecutor:
2 def __init__(self, max_retries: int = 3, use_cache: bool = True):
3 # Configuration passed during instantiation
4 self.max_retries = max_retries
5 self.use_cache = use_cache
6 self.call_count = 0 # Stateful tracking
7
8 def __call__(self, step_input: StepInput) -> StepOutput:
9 self.call_count += 1
10
11 # Access instance configuration and state
12 if self.use_cache and self.call_count > 1:
13 return StepOutput(content="Using cached result")
14
15 # Your custom logic with access to self.max_retries, etc.
16 return StepOutput(content=enhanced_content)
17
18# Instantiate with specific configuration
19content_planning_step = Step(
20 name="Content Planning Step",
21 executor=CustomExecutor(max_retries=5, use_cache=False),
22)

Also supports async execution by defining the __call__ method to be an async function.

1class CustomExecutor:
2 async def __call__(self, step_input: StepInput) -> StepOutput:
3 # 1. Custom preprocessing
4 # 2. Call agents/teams as needed
5 # 3. Custom postprocessing
6 return StepOutput(content=enhanced_content)
7
8content_planning_step = Step(
9 name="Content Planning Step",
10 executor=CustomExecutor(),
11)

For a detailed example see Class-based Executor.

Streaming execution with custom function step on AgentOS:

If you are running an agent or team within the custom function step, you can enable streaming on the AgentOS chat page by setting stream=True and stream_events=True when calling run() or arun() and yielding the events.

Note

Using the AgentOS, runs will be asynchronous and responses will be streamed. This means you must keep the custom function step asynchronous, by using .arun() instead of .run() to run your Agents or Teams.

1content_planner = Agent(
2 name="Content Planner",
3 model=OpenAIResponses(id="gpt-5.2"),
4 instructions=[
5 "Plan a content schedule over 4 weeks for the provided topic and research content",
6 "Ensure that I have posts for 3 posts per week",
7 ],
8 db=InMemoryDb(),
9)
10
11async def custom_content_planning_function(
12 step_input: StepInput,
13) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:
14 """
15 Custom function that does intelligent content planning with context awareness.
16
17 Note: This function calls content_planner.arun() internally, and all events
18 from that agent call will automatically get workflow context injected by
19 the workflow execution system - no manual intervention required!
20 """
21 message = step_input.input
22 previous_step_content = step_input.previous_step_content
23
24 # Create intelligent planning prompt
25 planning_prompt = f"""
26 STRATEGIC CONTENT PLANNING REQUEST:
27
28 Core Topic: {message}
29
30 Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}
31
32 Planning Requirements:
33 1. Create a comprehensive content strategy based on the research
34 2. Leverage the research findings effectively
35 3. Identify content formats and channels
36 4. Provide timeline and priority recommendations
37 5. Include engagement and distribution strategies
38
39 Please create a detailed, actionable content plan.
40 """
41
42 try:
43 response_iterator = content_planner.arun(
44 planning_prompt, stream=True, stream_events=True
45 )
46 async for event in response_iterator:
47 yield event
48
49 response = content_planner.get_last_run_output()
50
51 enhanced_content = f"""
52 ## Strategic Content Plan
53
54 **Planning Topic:** {message}
55
56 **Research Integration:** {"✓ Research-based" if previous_step_content else "✗ No research foundation"}
57
58 **Content Strategy:**
59 {response.content}
60
61 **Custom Planning Enhancements:**
62 - Research Integration: {"High" if previous_step_content else "Baseline"}
63 - Strategic Alignment: Optimized for multi-channel distribution
64 - Execution Ready: Detailed action items included
65 """.strip()
66
67 yield StepOutput(content=enhanced_content)
68
69 except Exception as e:
70 yield StepOutput(
71 content=f"Custom content planning failed: {str(e)}",
72 success=False,
73 )
Note

Streaming in case of a class-based executor also works the same way by defining the __call__ method to yield the events.

Developer Resources