Structured I/O

Type-safe data flow between agents, teams, and custom functions using Pydantic models.

Demonstrates type-safe data flow between agents/teams/custom python functions. Each step:

  1. Receives structured (pydantic model, list, dict or raw string) input
  2. Produces structured output (e.g., ResearchFindings, ContentStrategy)

You can also use this pattern to create a custom function that can be used in any step and you can-

  1. Inspect incoming data types (raw strings or Pydantic models).
  2. Analyze structured outputs from previous steps.
  3. Generate reports while preserving type safety.
1from typing import List
2
3from kern.agent import Agent
4from kern.models.openai import OpenAIResponses
5from kern.tools.hackernews import HackerNewsTools
6from kern.tools.yfinance import YFinanceTools
7from kern.workflow.step import Step
8from kern.workflow.workflow import Workflow
9from pydantic import BaseModel, Field
10
11
12# Define structured models for each step
13class ResearchFindings(BaseModel):
14 """Structured research findings with key insights"""
15
16 topic: str = Field(description="The research topic")
17 key_insights: List[str] = Field(description="Main insights discovered", min_items=3)
18 trending_technologies: List[str] = Field(
19 description="Technologies that are trending", min_items=2
20 )
21 market_impact: str = Field(description="Potential market impact analysis")
22 sources_count: int = Field(description="Number of sources researched")
23 confidence_score: float = Field(
24 description="Confidence in findings (0.0-1.0)", ge=0.0, le=1.0
25 )
26
27
28class ContentStrategy(BaseModel):
29 """Structured content strategy based on research"""
30
31 target_audience: str = Field(description="Primary target audience")
32 content_pillars: List[str] = Field(description="Main content themes", min_items=3)
33 posting_schedule: List[str] = Field(description="Recommended posting schedule")
34 key_messages: List[str] = Field(
35 description="Core messages to communicate", min_items=3
36 )
37 hashtags: List[str] = Field(description="Recommended hashtags", min_items=5)
38 engagement_tactics: List[str] = Field(
39 description="Ways to increase engagement", min_items=2
40 )
41
42
43class FinalContentPlan(BaseModel):
44 """Final content plan with specific deliverables"""
45
46 campaign_name: str = Field(description="Name for the content campaign")
47 content_calendar: List[str] = Field(
48 description="Specific content pieces planned", min_items=6
49 )
50 success_metrics: List[str] = Field(
51 description="How to measure success", min_items=3
52 )
53 budget_estimate: str = Field(description="Estimated budget range")
54 timeline: str = Field(description="Implementation timeline")
55 risk_factors: List[str] = Field(
56 description="Potential risks and mitigation", min_items=2
57 )
58
59
60# Define agents with response models
61research_agent = Agent(
62 name="AI Research Specialist",
63 model=OpenAIResponses(id="gpt-5.2"),
64 tools=[HackerNewsTools(), YFinanceTools()],
65 role="Research AI trends and extract structured insights",
66 output_schema=ResearchFindings,
67 instructions=[
68 "Research the given topic thoroughly using available tools",
69 "Provide structured findings with confidence scores",
70 "Focus on recent developments and market trends",
71 "Make sure to structure your response according to the ResearchFindings model",
72 ],
73)
74
75strategy_agent = Agent(
76 name="Content Strategy Expert",
77 model=OpenAIResponses(id="gpt-5.2"),
78 role="Create content strategies based on research findings",
79 output_schema=ContentStrategy,
80 instructions=[
81 "Analyze the research findings provided from the previous step",
82 "Create a comprehensive content strategy based on the structured research data",
83 "Focus on audience engagement and brand building",
84 "Structure your response according to the ContentStrategy model",
85 ],
86)
87
88planning_agent = Agent(
89 name="Content Planning Specialist",
90 model=OpenAIResponses(id="gpt-5.2"),
91 role="Create detailed content plans and calendars",
92 output_schema=FinalContentPlan,
93 instructions=[
94 "Use the content strategy from the previous step to create a detailed implementation plan",
95 "Include specific timelines and success metrics",
96 "Consider budget and resource constraints",
97 "Structure your response according to the FinalContentPlan model",
98 ],
99)
100
101# Define steps
102research_step = Step(
103 name="research_insights",
104 agent=research_agent,
105)
106
107strategy_step = Step(
108 name="content_strategy",
109 agent=strategy_agent,
110)
111
112planning_step = Step(
113 name="final_planning",
114 agent=planning_agent,
115)
116
117# Create workflow
118structured_workflow = Workflow(
119 name="Structured Content Creation Pipeline",
120 description="AI-powered content creation with structured data flow",
121 steps=[research_step, strategy_step, planning_step],
122)
123
124if __name__ == "__main__":
125 print("=== Testing Structured Output Flow Between Steps ===")
126
127 # Test with simple string input
128 structured_workflow.print_response(
129 input="Latest developments in artificial intelligence and machine learning"
130 )