Step with custom function streaming on AgentOS
This example demonstrates how to use named steps with custom function executors and streaming on AgentOS.
This example demonstrates how to use Step objects with custom function executors, and how to stream their responses using the AgentOS.
The agent and team running inside the custom function step can also stream their results directly to the AgentOS.
1from typing import AsyncIterator, Union23from kern.agent.agent import Agent4from kern.db.in_memory import InMemoryDb56# Import the workflows7from kern.db.sqlite import SqliteDb8from kern.models.openai import OpenAIResponses9from kern.os import AgentOS10from kern.team import Team11from kern.tools.hackernews import HackerNewsTools12from kern.tools.yfinance import YFinanceTools13from kern.workflow.step import Step, StepInput, StepOutput, WorkflowRunOutputEvent14from kern.workflow.workflow import Workflow1516# Define agents17hackernews_agent = Agent(18 name="Hackernews Agent",19 model=OpenAIResponses(id="gpt-5.2"),20 tools=[HackerNewsTools()],21 instructions="Extract key insights and content from Hackernews posts",22)2324finance_agent = Agent(25 name="Finance Agent",26 model=OpenAIResponses(id="gpt-5.2"),27 tools=[YFinanceTools()],28 instructions="Get stock prices and financial data",29)3031# Define research team for complex analysis32research_team = Team(33 name="Research Team",34 members=[hackernews_agent, finance_agent],35 instructions="Analyze content and create comprehensive social media strategy",36)3738content_planner = Agent(39 name="Content Planner",40 model=OpenAIResponses(id="gpt-5.2"),41 instructions=[42 "Plan a content schedule over 4 weeks for the provided topic and research content",43 "Ensure that I have posts for 3 posts per week",44 ],45 db=InMemoryDb(),46)474849async def custom_content_planning_function(50 step_input: StepInput,51) -> AsyncIterator[Union[WorkflowRunOutputEvent, StepOutput]]:52 """53 Custom function that does intelligent content planning with context awareness.5455 Note: This function calls content_planner.arun() internally, and all events56 from that agent call will automatically get workflow context injected by57 the workflow execution system - no manual intervention required!58 """59 message = step_input.input60 previous_step_content = step_input.previous_step_content6162 # Create intelligent planning prompt63 planning_prompt = f"""64 STRATEGIC CONTENT PLANNING REQUEST:6566 Core Topic: {message}6768 Research Results: {previous_step_content[:500] if previous_step_content else "No research results"}6970 Planning Requirements:71 1. Create a comprehensive content strategy based on the research72 2. Leverage the research findings effectively73 3. Identify content formats and channels74 4. Provide timeline and priority recommendations75 5. Include engagement and distribution strategies7677 Please create a detailed, actionable content plan.78 """7980 try:81 response_iterator = content_planner.arun(82 planning_prompt, stream=True, stream_events=True83 )84 async for event in response_iterator:85 yield event8687 response = content_planner.get_last_run_output()8889 enhanced_content = f"""90 ## Strategic Content Plan9192 **Planning Topic:** {message}9394 **Research Integration:** {"Research-based" if previous_step_content else "No research foundation"}9596 **Content Strategy:**97 {response.content}9899 **Custom Planning Enhancements:**100 - Research Integration: {"High" if previous_step_content else "Baseline"}101 - Strategic Alignment: Optimized for multi-channel distribution102 - Execution Ready: Detailed action items included103 """.strip()104105 yield StepOutput(content=enhanced_content)106107 except Exception as e:108 yield StepOutput(109 content=f"Custom content planning failed: {str(e)}",110 success=False,111 )112113114# Define steps using different executor types115116research_step = Step(117 name="Research Step",118 team=research_team,119)120121content_planning_step = Step(122 name="Content Planning Step",123 executor=custom_content_planning_function,124)125126streaming_content_workflow = Workflow(127 name="Streaming Content Creation Workflow",128 description="Automated content creation with streaming custom execution functions",129 db=SqliteDb(130 session_table="workflow_session",131 db_file="tmp/workflow.db",132 ),133 steps=[134 research_step,135 content_planning_step,136 ],137)138139140# Initialize the AgentOS with the workflows141agent_os = AgentOS(142 description="Example OS setup",143 workflows=[streaming_content_workflow],144)145app = agent_os.get_app()146147if __name__ == "__main__":148 agent_os.serve(app="workflow_with_custom_function_stream:app", reload=True)