Executor HITL
Pause workflow execution when an agent or team inside a step calls a tool that requires human approval or input.
Executor-level HITL pauses a workflow during a step, when the agent or team running inside the step calls a tool marked with requires_confirmation, requires_user_input, or external_execution. The pause propagates from the agent/team up to the workflow, and the user resolves it the same way they would on a standalone agent run.
1from kern.agent import Agent2from kern.db.sqlite import SqliteDb3from kern.models.openai import OpenAIResponses4from kern.tools import tool5from kern.workflow.step import Step6from kern.workflow.workflow import Workflow789@tool(requires_confirmation=True)10def get_the_weather(city: str) -> str:11 return f"It is 70 degrees and cloudy in {city}"121314weather_agent = Agent(15 name="WeatherAgent",16 model=OpenAIResponses(id="gpt-5.4"),17 tools=[get_the_weather],18 db=SqliteDb(db_file="workflow.db"),19)2021workflow = Workflow(22 name="WeatherWorkflow",23 db=SqliteDb(db_file="workflow.db"),24 steps=[Step(name="get_weather", agent=weather_agent)],25)2627response = workflow.run("What is the weather in Tokyo?")2829if response.is_paused:30 for step_req in response.step_requirements or []:31 if step_req.requires_executor_input:32 for executor_req in step_req.executor_requirements or []:33 executor_req.confirm() # or executor_req.reject(note="...")3435 response = workflow.continue_run(response)3637print(response.content)When to Use
| Use Case | Level |
|---|---|
| Approve a specific tool call (DB write, payment, email send) | Executor |
| Collect user-provided values for a tool argument | Executor |
| Defer tool execution to an external system | Executor |
| Gate the whole step before the agent runs | Step-level |
| Review the step's final output | Output Review |
| Pick which branch a router takes | Router HITL |
Use executor-level HITL when the gate is on the tool, not on the step. The agent can call other tools freely; only the marked tool pauses execution.
Tool Decorators
Three decorator flags trigger executor-level pauses. They behave the same as on standalone agents.
| Decorator | Pauses For | Resolve With |
|---|---|---|
@tool(requires_confirmation=True) | User approval | executor_req.confirm() or executor_req.reject(note=...) |
@tool(requires_user_input=True, user_input_fields=[...]) | User-supplied argument values | executor_req.provide_user_input({...}) |
@tool(external_execution=True) | Out-of-process execution; user supplies the result | Set tool_execution.result on the requirement |
1from kern.tools import tool234@tool(requires_confirmation=True)5def delete_user(user_id: str) -> str:6 ...789@tool(requires_user_input=True, user_input_fields=["recipient"])10def send_money(amount: float, recipient: str, note: str) -> str:11 ...121314@tool(external_execution=True)15def run_query(sql: str) -> str:16 ...The Pause Object
An executor pause sets run_output.pause_kind == "executor". The matching StepRequirement has requires_executor_input == True and holds the paused tool calls in executor_requirements:
1step_req.executor_name # "WeatherAgent"2step_req.executor_type # "agent" | "team"3executor_req = step_req.executor_requirements[0]4executor_req.tool_execution.tool_name # "get_the_weather"5executor_req.tool_execution.tool_args # {"city": "Tokyo"}6executor_req.needs_confirmation # True for requires_confirmation7executor_req.needs_user_input # True for requires_user_input8executor_req.needs_external_execution # True for external_executionSee Pause Anatomy for the full structure of every pause object and its fields.
Resolving Each Type
Confirmation
1for step_req in response.step_requirements or []:2 if step_req.requires_executor_input:3 for executor_req in step_req.executor_requirements or []:4 if executor_req.needs_confirmation:5 if user_approves():6 executor_req.confirm()7 else:8 executor_req.reject(note="User declined")910response = workflow.continue_run(response)When rejected, the agent receives the rejection note and decides what to do next (it may try another tool, or surface the rejection in its response).
User Input
1for step_req in response.step_requirements or []:2 if step_req.requires_executor_input:3 for executor_req in step_req.executor_requirements or []:4 if executor_req.needs_user_input:5 values = {6 field.name: prompt(field.name)7 for field in executor_req.user_input_schema or []8 }9 executor_req.provide_user_input(values)1011response = workflow.continue_run(response)The values overwrite the corresponding tool arguments before the tool runs.
External Execution
@tool(external_execution=True) means the agent never runs the tool itself. The workflow pauses, hands you the tool name and arguments, and you run it however you want — call a backend service, dispatch a job, hit a privileged API. Set tool_execution.result to whatever the tool was supposed to return, and the agent resumes with that result.
1for step_req in response.step_requirements or []:2 if step_req.requires_executor_input:3 for executor_req in step_req.executor_requirements or []:4 if executor_req.needs_external_execution:5 tool_name = executor_req.tool_execution.tool_name6 tool_args = executor_req.tool_execution.tool_args78 # Replace this with your own dispatch logic, for example:9 # result = my_backend.run(tool_name, **tool_args)10 # result = subprocess.run(...).stdout11 # result = await microservice.call(tool_name, tool_args)12 result = my_dispatcher(tool_name, tool_args)1314 executor_req.tool_execution.result = result1516response = workflow.continue_run(response)Streaming
In streaming mode the workflow yields a StepExecutorPausedEvent when the agent's tool call pauses, and a StepExecutorContinuedEvent when execution resumes.
1from kern.run.workflow import (2 StepExecutorPausedEvent,3 StepExecutorContinuedEvent,4 WorkflowCompletedEvent,5)67for event in workflow.run("What is the weather in Tokyo?", stream=True):8 if isinstance(event, StepExecutorPausedEvent):9 print(f"Paused: {event.executor_name} ({event.executor_type})")10 elif isinstance(event, StepExecutorContinuedEvent):11 print(f"Resumed: {event.executor_name}")12 elif isinstance(event, WorkflowCompletedEvent):13 print("Done")1415# The full WorkflowRunOutput is persisted to the session, not yielded in the stream.16session = workflow.get_session()17run_output = session.runs[-1] if session and session.runs else None1819if run_output and run_output.is_paused:20 for step_req in run_output.step_requirements or []:21 for executor_req in step_req.executor_requirements or []:22 executor_req.confirm()2324 for event in workflow.continue_run(run_output, stream=True):25 if hasattr(event, "content") and event.content:26 print(event.content, end="", flush=True)After a streaming run pauses, read the paused run from workflow.get_session().runs[-1] — step_requirements is persisted to the database, not emitted as a stream event.
Composite Steps
Executor HITL works inside any composite step that contains a Step with an agent or team. The pause propagates up through the wrapping primitive.
| Primitive | Behavior |
|---|---|
Step | Pauses when the agent/team inside the step calls a HITL tool |
Steps | Pauses on the inner Step whose agent/team calls a HITL tool |
Condition | Pauses if the chosen branch contains a Step whose agent/team calls a HITL tool |
Loop | Pauses on the iteration where the inner Step's agent/team calls a HITL tool |
Router | Pauses if the selected branch contains a Step whose agent/team calls a HITL tool |
1from kern.workflow.condition import Condition2from kern.workflow.step import Step34workflow = Workflow(5 name="ConditionExecutorHITL",6 db=db,7 steps=[8 Step(name="gather_data", executor=gather_data),9 Condition(10 name="analysis_decision",11 evaluator=lambda step_input: True,12 steps=[Step(name="detailed_analysis", agent=analysis_agent)],13 else_steps=[Step(name="quick_summary", executor=quick_summary)],14 ),15 Step(name="report", executor=generate_report),16 ],17)The resolution code does not change: walk step_requirements, find the entry with requires_executor_input, and resolve its executor_requirements.
Sync, Async, and Continue
| Method | Sync | Async |
|---|---|---|
| Run | workflow.run(input) | await workflow.arun(input) |
| Run streaming | workflow.run(input, stream=True) | workflow.arun(input, stream=True) |
| Continue | workflow.continue_run(run_output) | await workflow.acontinue_run(run_output) |
| Continue streaming | workflow.continue_run(run_output, stream=True) | workflow.acontinue_run(run_output, stream=True) |
Events
| Event | Emitted When |
|---|---|
StepStartedEvent | Step begins executing |
StepExecutorPausedEvent | Agent/team pauses on a HITL tool call |
StepExecutorContinuedEvent | Agent/team resumes after the requirement is resolved |
StepCompletedEvent | Step finishes (after resume and any further tool calls) |
WorkflowCompletedEvent | Workflow finishes |
Cookbooks
Runnable examples in cookbook/04_workflows/08_human_in_the_loop/executor_hitl/:
| File | Demonstrates |
|---|---|
01_agent_confirmation.py | Agent tool with requires_confirmation |
02_agent_confirmation_stream.py | Same, streaming |
03_team_in_step.py | Team executor with HITL tool |
04_agent_confirmation_in_condition_step.py | Executor HITL inside a Condition |
05_agent_confirmation_in_loop_step.py | Executor HITL inside a Loop |
06_agent_confirmation_in_steps_container.py | Executor HITL inside Steps |
07_agent_confirmation_in_router_step.py | Executor HITL inside a Router branch |
08_agent_user_input_step.py | Tool with requires_user_input |
09_executor_continued_event.py | StepExecutorContinuedEvent lifecycle |