Async MySQL for Workflows
Kern supports using MySQL asynchronously, with the AsyncMySQLDb class.
Usage
Run MySQL
Install docker desktop and run MySQL on port 3306 using:
1docker run -d \2 --name mysql \3 -e MYSQL_ROOT_PASSWORD=ai \4 -e MYSQL_DATABASE=ai \5 -e MYSQL_USER=ai \6 -e MYSQL_PASSWORD=ai \7 -p 3306:3306 \8 -d mysql:81"""Use Async MySQL as the database for a workflow.2Run `uv pip install openai sqlalchemy asyncmy kern-ai` to install dependencies.3"""45import asyncio6import uuid7from typing import List89from kern.agent import Agent10from kern.db.base import SessionType11from kern.db.mysql import AsyncMySQLDb12from kern.tools.hackernews import HackerNewsTools13from kern.workflow.types import WorkflowExecutionInput14from kern.workflow.workflow import Workflow15from pydantic import BaseModel1617db_url = "mysql+asyncmy://ai:ai@localhost:3306/ai"18db = AsyncMySQLDb(db_url=db_url)1920class ResearchTopic(BaseModel):21 topic: str22 key_points: List[str]23 summary: str2425# Create researcher agent26researcher = Agent(27 name="Researcher",28 tools=[HackerNewsTools()],29 instructions="Research the given topic thoroughly and provide key insights",30 output_schema=ResearchTopic,31)3233# Create writer agent34writer = Agent(35 name="Writer",36 instructions="Write a well-structured blog post based on the research provided",37)3839# Define the workflow40async def blog_workflow(workflow: Workflow, execution_input: WorkflowExecutionInput):41 """42 A workflow that researches a topic and writes a blog post about it.43 """44 topic = execution_input.input4546 # Step 1: Research the topic47 research_result = await researcher.arun(f"Research this topic: {topic}")4849 # Step 2: Write the blog post50 if research_result and research_result.content:51 blog_result = await writer.arun(52 f"Write a blog post about {topic}. Use this research: {research_result.content.model_dump_json()}"53 )54 return blog_result.content5556 return "Failed to complete workflow"5758# Create and run the workflow59workflow = Workflow(60 name="Blog Generator",61 steps=blog_workflow,62 db=db,63)6465async def main():66 """Run the workflow with a sample topic"""67 session_id = str(uuid.uuid4())6869 await workflow.aprint_response(70 input="The future of artificial intelligence",71 session_id=session_id,72 markdown=True,73 )74 session_data = await db.get_session(75 session_id=session_id, session_type=SessionType.WORKFLOW76 )77 print("\n=== SESSION DATA ===")78 print(session_data.to_dict())7980if __name__ == "__main__":81 asyncio.run(main())Params
| Parameter | Type | Default | Description |
|---|---|---|---|
id | Optional[str] | - | The ID of the database instance. UUID by default. |
db_url | Optional[str] | - | The database URL to connect to. |
db_engine | Optional[AsyncEngine] | - | The SQLAlchemy async database engine to use. |
db_schema | Optional[str] | - | The database schema to use. |
session_table | Optional[str] | - | Name of the table to store Agent, Team and Workflow sessions. |
memory_table | Optional[str] | - | Name of the table to store memories. |
metrics_table | Optional[str] | - | Name of the table to store metrics. |
eval_table | Optional[str] | - | Name of the table to store evaluation runs data. |
knowledge_table | Optional[str] | - | Name of the table to store knowledge content. |