Async MySQL for Workflows

Kern supports using MySQL asynchronously, with the AsyncMySQLDb class.

Usage

Run MySQL

Install docker desktop and run MySQL on port 3306 using:

1docker run -d \
2 --name mysql \
3 -e MYSQL_ROOT_PASSWORD=ai \
4 -e MYSQL_DATABASE=ai \
5 -e MYSQL_USER=ai \
6 -e MYSQL_PASSWORD=ai \
7 -p 3306:3306 \
8 -d mysql:8
1"""Use Async MySQL as the database for a workflow.
2Run `uv pip install openai sqlalchemy asyncmy kern-ai` to install dependencies.
3"""
4
5import asyncio
6import uuid
7from typing import List
8
9from kern.agent import Agent
10from kern.db.base import SessionType
11from kern.db.mysql import AsyncMySQLDb
12from kern.tools.hackernews import HackerNewsTools
13from kern.workflow.types import WorkflowExecutionInput
14from kern.workflow.workflow import Workflow
15from pydantic import BaseModel
16
17db_url = "mysql+asyncmy://ai:ai@localhost:3306/ai"
18db = AsyncMySQLDb(db_url=db_url)
19
20class ResearchTopic(BaseModel):
21 topic: str
22 key_points: List[str]
23 summary: str
24
25# Create researcher agent
26researcher = Agent(
27 name="Researcher",
28 tools=[HackerNewsTools()],
29 instructions="Research the given topic thoroughly and provide key insights",
30 output_schema=ResearchTopic,
31)
32
33# Create writer agent
34writer = Agent(
35 name="Writer",
36 instructions="Write a well-structured blog post based on the research provided",
37)
38
39# Define the workflow
40async def blog_workflow(workflow: Workflow, execution_input: WorkflowExecutionInput):
41 """
42 A workflow that researches a topic and writes a blog post about it.
43 """
44 topic = execution_input.input
45
46 # Step 1: Research the topic
47 research_result = await researcher.arun(f"Research this topic: {topic}")
48
49 # Step 2: Write the blog post
50 if research_result and research_result.content:
51 blog_result = await writer.arun(
52 f"Write a blog post about {topic}. Use this research: {research_result.content.model_dump_json()}"
53 )
54 return blog_result.content
55
56 return "Failed to complete workflow"
57
58# Create and run the workflow
59workflow = Workflow(
60 name="Blog Generator",
61 steps=blog_workflow,
62 db=db,
63)
64
65async def main():
66 """Run the workflow with a sample topic"""
67 session_id = str(uuid.uuid4())
68
69 await workflow.aprint_response(
70 input="The future of artificial intelligence",
71 session_id=session_id,
72 markdown=True,
73 )
74 session_data = await db.get_session(
75 session_id=session_id, session_type=SessionType.WORKFLOW
76 )
77 print("\n=== SESSION DATA ===")
78 print(session_data.to_dict())
79
80if __name__ == "__main__":
81 asyncio.run(main())

Params

ParameterTypeDefaultDescription
idOptional[str]-The ID of the database instance. UUID by default.
db_urlOptional[str]-The database URL to connect to.
db_engineOptional[AsyncEngine]-The SQLAlchemy async database engine to use.
db_schemaOptional[str]-The database schema to use.
session_tableOptional[str]-Name of the table to store Agent, Team and Workflow sessions.
memory_tableOptional[str]-Name of the table to store memories.
metrics_tableOptional[str]-Name of the table to store metrics.
eval_tableOptional[str]-Name of the table to store evaluation runs data.
knowledge_tableOptional[str]-Name of the table to store knowledge content.