Human-in-the-Loop in Workflows

Pause workflow execution to collect user confirmation, input, decisions, or tool approvals. State is persisted to the database so you can resume after the user responds.

Workflows pause for user action at three levels:

LevelWhat pausesConfigured onGuide
Step-levelThe workflow primitive, before or after it executesStep, Loop, Router, Condition, StepsStep HITL
Executor-levelA tool call inside the agent or team running the stepThe @tool decoratorExecutor HITL
NestedBoth gates on the same step, in sequenceBoth of the aboveNested HITL

Pause state is persisted to the workflow's database. When the user resolves the requirement, call workflow.continue_run(run_output) to resume from where it left off.

1from kern.workflow import Workflow, OnReject
2from kern.workflow.step import Step
3from kern.workflow.types import HumanReview
4from kern.db.sqlite import SqliteDb
5
6workflow = Workflow(
7 name="data_pipeline",
8 db=SqliteDb(db_file="workflow.db"), # Required for HITL
9 steps=[
10 Step(name="fetch_data", agent=fetch_agent),
11 Step(
12 name="process_data",
13 agent=process_agent,
14 human_review=HumanReview(
15 requires_confirmation=True,
16 confirmation_message="Process sensitive data?",
17 on_reject=OnReject.skip,
18 ),
19 ),
20 ],
21)
22
23run_output = workflow.run("Process user data")
24
25if run_output.is_paused:
26 for req in run_output.steps_requiring_confirmation:
27 req.confirm() # or req.reject()
28 run_output = workflow.continue_run(run_output)

Pick a Level

You want to...Use
Approve a whole step before it runsStep confirmation
Collect parameters before a step runsStep user input
Review a step's output after it runsOutput review
Let the user pick which Router branch executesRouter selection
Review each iteration of a LoopLoop iteration review
Pause a specific tool call inside an agentExecutor confirmation
Have an agent's tool ask the user for argument valuesExecutor user input
Defer a tool's execution to an external systemExternal execution
Combine a step gate and a tool gate on the same stepNested HITL
Pause when a step errorsError handling
Auto-resolve a pause if the user is slowTimeout

Step-Level vs Executor-Level

AspectStep-LevelExecutor-Level
Configured onThe workflow primitive (Step, Loop, etc.)The tool, via @tool(...)
When it pausesBefore or after the step executesDuring the step, when the agent calls the tool
pause_kind"step""executor"
Pause eventStepPausedEvent, RouterPausedEvent, StepOutputReviewEventStepExecutorPausedEvent
Continue eventStepContinuedEventStepExecutorContinuedEvent
Use caseGate the whole stepGate a specific tool call
Resolved byreq.confirm(), req.set_user_input(...), req.select(...)executor_req.confirm(), executor_req.provide_user_input(...)

Supported Primitives

PrimitiveConfirmationUser InputOutput ReviewIteration ReviewRoute SelectionExecutor HITL
Step-- (when agent / team is set)
Steps---- (via inner Step)
Condition---- (via inner Step)
Loop-- (via inner Step)
Router-- (via selected branch)
Note

User input is currently supported on Step (to collect parameters) and Router (to select routes). Other primitives support confirmation only.

Requirements

HITL workflows need a database to persist state between pauses.

1from kern.db.sqlite import SqliteDb # SQLite for development
2from kern.db.postgres import PostgresDb # PostgreSQL for production
3
4workflow = Workflow(db=SqliteDb(db_file="workflow.db"), ...)
5workflow = Workflow(db=PostgresDb(db_url="postgresql://..."), ...)

Run Output Properties

When a workflow pauses, inspect WorkflowRunOutput:

PropertyDescription
is_pausedTrue if waiting for user action
pause_kind"step" or "executor"
paused_step_nameName of the step where the pause occurred
step_requirementsList of pending requirements (last entry is the active one)
steps_requiring_confirmationFilter: steps needing confirm/reject
steps_requiring_user_inputFilter: steps needing user input values
steps_requiring_output_reviewFilter: steps needing output review
steps_requiring_routeFilter: routers needing route selection
steps_with_errorsSteps that failed with on_error="pause"

For nested HITL, always read the last entry in step_requirements to determine the current pause type. Earlier entries are history. See Nested HITL.

For the full structure of every pause object (WorkflowRunOutput, StepRequirement, and executor requirements) and the methods that resolve each, see Pause Anatomy.

OnReject Behavior

on_reject controls what happens when a user rejects a step.

ValueBehavior
OnReject.skipSkip the step and continue (default for most primitives)
OnReject.cancelCancel the entire workflow
OnReject.retryRe-execute the step. Pair with reject(feedback=...) to send feedback. See Output Review
OnReject.else_branchFor Condition only: execute else_steps (default for Condition)

Timeout

Every HITL pause can have a deadline. If the user does not respond in time, on_timeout decides what happens.

1from kern.workflow import OnTimeout
2from kern.workflow.types import HumanReview
3
4Step(
5 name="approve_deploy",
6 agent=deploy_agent,
7 human_review=HumanReview(
8 requires_confirmation=True,
9 confirmation_message="Approve deployment?",
10 timeout=300, # 5 minutes
11 on_timeout=OnTimeout.approve, # approve | reject | cancel
12 ),
13)

See Timeout for the full set of behaviors and per-primitive details.

Streaming

HITL works with streaming. Watch for pause events; read the paused run from the session.

1from kern.run.workflow import StepPausedEvent, StepExecutorPausedEvent
2
3for event in workflow.run("input", stream=True):
4 if isinstance(event, (StepPausedEvent, StepExecutorPausedEvent)):
5 pass
6
7session = workflow.get_session()
8run_output = session.runs[-1]
9
10if run_output.is_paused:
11 # resolve requirements...
12 for event in workflow.continue_run(run_output, stream=True):
13 pass

The full WorkflowRunOutput (including step_requirements) is persisted to the database, not yielded as a stream event.

The @pause Decorator

Mark custom function steps with HITL configuration.

1from kern.workflow.decorators import pause
2from kern.workflow.types import UserInputField
3
4@pause(
5 requires_user_input=True,
6 user_input_message="Enter parameters:",
7 user_input_schema=[
8 UserInputField(name="threshold", field_type="float", required=True),
9 ],
10)
11def process_data(step_input: StepInput) -> StepOutput:
12 threshold = step_input.additional_data["user_input"]["threshold"]
13 return StepOutput(content=f"Processed with threshold {threshold}")
14
15Step(name="process", executor=process_data)

Guides

Developer Resources