Access Multiple Previous Steps Output
This example demonstrates **Workflows 2.0** advanced data flow capabilities
This example demonstrates Workflows 2.0 shows how to:
- Access outputs from specific named steps (
get_step_content()) - Aggregate all previous outputs (
get_all_previous_content()) - Create comprehensive reports by combining multiple research sources
Key Features:
- Step Output Access: Retrieve data from any previous step by name or collectively.
- Custom Reporting: Combine and analyze outputs from parallel or sequential steps.
- Streaming Support: Real-time updates during execution.
1from kern.agent.agent import Agent2from kern.tools.hackernews import HackerNewsTools3from kern.tools.yfinance import YFinanceTools4from kern.workflow.step import Step5from kern.workflow.types import StepInput, StepOutput6from kern.workflow.workflow import Workflow78# Define the research agents9hackernews_agent = Agent(10 name="HackerNews Researcher",11 instructions="You are a researcher specializing in finding the latest tech news and discussions from Hacker News. Focus on startup trends, programming topics, and tech industry insights.",12 tools=[HackerNewsTools()],13)1415finance_agent = Agent(16 name="Finance Researcher",17 instructions="You are a finance researcher. Search for stock data, market trends, and financial information to gather detailed insights.",18 tools=[YFinanceTools()],19)2021reasoning_agent = Agent(22 name="Reasoning Agent",23 instructions="You are an expert analyst who creates comprehensive reports by analyzing and synthesizing information from multiple sources. Create well-structured, insightful reports.",24)2526# Create the research steps27research_hackernews = Step(28 name="research_hackernews",29 agent=hackernews_agent,30 description="Research latest tech trends from Hacker News",31)3233research_finance = Step(34 name="research_finance",35 agent=finance_agent,36 description="Research financial data and market trends",37)3839# Custom function step that has access to ALL previous step outputs404142def create_comprehensive_report(step_input: StepInput) -> StepOutput:43 """44 Custom function that creates a report using data from multiple previous steps.45 This function has access to ALL previous step outputs and the original workflow message.46 """4748 # Access original workflow input49 original_topic = step_input.input or ""5051 # Access specific step outputs by name52 hackernews_data = step_input.get_step_content("research_hackernews") or ""53 finance_data = step_input.get_step_content("research_finance") or ""5455 # Or access ALL previous content56 _ = step_input.get_all_previous_content()5758 # Create a comprehensive report combining all sources59 report = f"""60 # Comprehensive Research Report: {original_topic}6162 ## Executive Summary63 Based on research from HackerNews and web sources, here's a comprehensive analysis of {original_topic}.6465 ## HackerNews Insights66 {hackernews_data[:500]}...6768 ## Finance Research Findings69 {finance_data[:500]}...70 """7172 return StepOutput(73 step_name="comprehensive_report", content=report.strip(), success=True74 )757677comprehensive_report_step = Step(78 name="comprehensive_report",79 executor=create_comprehensive_report,80 description="Create comprehensive report from all research sources",81)8283# Final reasoning step using reasoning agent84reasoning_step = Step(85 name="final_reasoning",86 agent=reasoning_agent,87 description="Apply reasoning to create final insights and recommendations",88)8990workflow = Workflow(91 name="Enhanced Research Workflow",92 description="Multi-source research with custom data flow and reasoning",93 steps=[94 research_hackernews,95 research_finance,96 comprehensive_report_step, # Has access to both previous steps97 reasoning_step, # Gets the last step output (comprehensive report)98 ],99)100101if __name__ == "__main__":102 workflow.print_response(103 "Latest developments in artificial intelligence and machine learning",104 stream=True,105 )