Step HITL
Pause individual steps for confirmation, user input, or output review.
Steps support three HITL modes: confirmation (approve/reject before execution), user input (collect parameters before execution), and output review (review/edit/reject after execution).
All HITL settings are configured via HumanReview.
Confirmation
Pause before executing a step. The user confirms to proceed or rejects to skip/cancel.
1from kern.workflow import Workflow, OnReject2from kern.workflow.step import Step3from kern.workflow.types import HumanReview4from kern.db.sqlite import SqliteDb56workflow = Workflow(7 name="data_pipeline",8 db=SqliteDb(db_file="workflow.db"),9 steps=[10 Step(name="fetch_data", agent=fetch_agent),11 Step(12 name="process_data",13 agent=process_agent,14 human_review=HumanReview(15 requires_confirmation=True,16 confirmation_message="Process sensitive data?",17 on_reject=OnReject.skip,18 ),19 ),20 Step(name="save_results", agent=save_agent),21 ],22)2324run_output = workflow.run("Process user data")2526if run_output.is_paused:27 for req in run_output.steps_requiring_confirmation:28 print(f"Step: {req.step_name}")29 print(f"Message: {req.confirmation_message}")30 31 if input("Confirm? (y/n): ").lower() == "y":32 req.confirm()33 else:34 req.reject()35 36 run_output = workflow.continue_run(run_output)3738print(run_output.content)Parameters
| Field | Type | Description |
|---|---|---|
requires_confirmation | bool | Pause for user confirmation before execution |
confirmation_message | str | Message shown to the user |
on_reject | OnReject | Action when rejected: skip (default), cancel |
OnReject Options
| Value | Behavior |
|---|---|
OnReject.skip | Skip this step and continue with the next (default) |
OnReject.cancel | Cancel the entire workflow |
OnReject.retry | Re-execute the step. Used with output review |
User Input
Collect parameters from the user before step execution. Input values are passed to the step via step_input.additional_data["user_input"].
1from kern.workflow import Workflow2from kern.workflow.step import Step3from kern.workflow.types import HumanReview, StepInput, StepOutput, UserInputField4from kern.db.sqlite import SqliteDb56def process_with_params(step_input: StepInput) -> StepOutput:7 user_input = step_input.additional_data.get("user_input", {})8 threshold = user_input.get("threshold", 0.5)9 mode = user_input.get("mode", "fast")10 11 return StepOutput(content=f"Processed with threshold={threshold}, mode={mode}")1213workflow = Workflow(14 name="configurable_pipeline",15 db=SqliteDb(db_file="workflow.db"),16 steps=[17 Step(name="analyze", agent=analyze_agent),18 Step(19 name="process",20 executor=process_with_params,21 human_review=HumanReview(22 requires_user_input=True,23 user_input_message="Configure processing:",24 user_input_schema=[25 UserInputField(26 name="threshold",27 field_type="float",28 description="Processing threshold (0.0-1.0)",29 required=True,30 ),31 UserInputField(32 name="mode",33 field_type="str",34 description="Mode: 'fast' or 'accurate'",35 required=True,36 ),37 UserInputField(38 name="batch_size",39 field_type="int",40 description="Records per batch",41 required=False,42 ),43 ],44 ),45 ),46 Step(name="report", agent=report_agent),47 ],48)4950run_output = workflow.run("Process Q4 data")5152if run_output.is_paused:53 for req in run_output.steps_requiring_user_input:54 print(f"Step: {req.step_name}")55 print(f"Message: {req.user_input_message}")56 57 values = {}58 for field in req.user_input_schema:59 marker = "*" if field.required else ""60 prompt = f"{field.name}{marker} ({field.field_type}): "61 value = input(prompt)62 63 if value:64 if field.field_type == "int":65 values[field.name] = int(value)66 elif field.field_type == "float":67 values[field.name] = float(value)68 elif field.field_type == "bool":69 values[field.name] = value.lower() in ("true", "yes", "1")70 else:71 values[field.name] = value72 73 req.set_user_input(**values)74 75 run_output = workflow.continue_run(run_output)7677print(run_output.content)Parameters
| Field | Type | Description |
|---|---|---|
requires_user_input | bool | Pause to collect user input before execution |
user_input_message | str | Message shown to the user |
user_input_schema | List[UserInputField] | Schema defining expected input fields |
UserInputField
| Field | Type | Description |
|---|---|---|
name | str | Field name (key in user input dict) |
field_type | str | Type: str, int, float, bool |
description | str | Description shown to user |
required | bool | Whether field is required (default: True) |
allowed_values | List[Any] | Optional list of valid values |
Accessing User Input
User input is available in the step function via step_input.additional_data["user_input"]:
1def my_step(step_input: StepInput) -> StepOutput:2 user_input = step_input.additional_data.get("user_input", {})3 4 threshold = user_input.get("threshold")5 mode = user_input.get("mode")6 7 return StepOutput(content=f"Done with {threshold}, {mode}")For agent-based steps, user input is automatically appended to the message.
Output Review
Pause after a step executes so the user can review the output before the workflow continues. If the user rejects, the step re-executes (up to max_retries). See the dedicated Output Review page for full details including editing output and reject with feedback.
1from kern.workflow import Workflow, OnReject2from kern.workflow.step import Step3from kern.workflow.types import HumanReview4from kern.db.sqlite import SqliteDb56workflow = Workflow(7 name="content_pipeline",8 db=SqliteDb(db_file="workflow.db"),9 steps=[10 Step(name="research", agent=research_agent),11 Step(12 name="draft",13 agent=draft_agent,14 human_review=HumanReview(15 requires_output_review=True,16 output_review_message="Review the draft. Approve to continue or reject to regenerate.",17 on_reject=OnReject.retry,18 max_retries=3,19 ),20 ),21 Step(name="publish", agent=publish_agent),22 ],23)2425run_output = workflow.run("Write a blog post about AI agents")2627while run_output.is_paused:28 for req in run_output.steps_requiring_output_review:29 print(f"Output: {req.step_output.content}")30 print(f"Message: {req.output_review_message}")31 32 if input("Approve? (y/n): ").lower() == "y":33 req.confirm()34 else:35 feedback = input("Feedback (optional): ").strip()36 if feedback:37 req.reject(feedback=feedback)38 else:39 req.reject()40 41 run_output = workflow.continue_run(run_output)4243print(run_output.content)Parameters
| Field | Type | Description |
|---|---|---|
requires_output_review | bool | Pause after execution for user review |
output_review_message | str | Message shown during review |
on_reject | OnReject | Action when rejected. Use OnReject.retry to re-execute |
max_retries | int | Max re-executions on rejection (default: 3) |
Timeout
Set a deadline for the user to respond. If it expires, on_timeout fires automatically. See the dedicated Timeout page for full details.
1from kern.workflow import OnTimeout23Step(4 name="review_report",5 agent=report_agent,6 human_review=HumanReview(7 requires_output_review=True,8 output_review_message="Review the report.",9 on_reject=OnReject.retry,10 max_retries=2,11 timeout=120, # 2 minutes12 on_timeout=OnTimeout.approve,13 ),14)OnTimeout Value | Behavior |
|---|---|
OnTimeout.approve | Auto-approve and continue |
OnTimeout.reject | Auto-reject (triggers retry if on_reject=OnReject.retry) |
OnTimeout.cancel | Cancel the workflow |
The @pause Decorator
Use the @pause decorator to configure HITL directly on functions:
1from kern.workflow.decorators import pause2from kern.workflow.types import StepInput, StepOutput, UserInputField34@pause(5 requires_confirmation=True,6 confirmation_message="Execute this step?",7)8def step_with_confirmation(step_input: StepInput) -> StepOutput:9 return StepOutput(content="Executed after confirmation")1011@pause(12 requires_user_input=True,13 user_input_message="Enter parameters:",14 user_input_schema=[15 UserInputField(name="value", field_type="str", required=True),16 ],17)18def step_with_input(step_input: StepInput) -> StepOutput:19 value = step_input.additional_data["user_input"]["value"]20 return StepOutput(content=f"Received: {value}")2122# Decorator config is auto-detected when used in a Step23workflow = Workflow(24 steps=[25 Step(name="confirm_step", executor=step_with_confirmation),26 Step(name="input_step", executor=step_with_input),27 ],28 ...29)Streaming
Handle HITL in streaming workflows:
1from kern.run.workflow import StepPausedEvent23for event in workflow.run("input", stream=True, stream_events=True):4 if isinstance(event, StepPausedEvent):5 print(f"Paused at: {event.step_name}")67session = workflow.get_session()8run_output = session.runs[-1]910while run_output.is_paused:11 for req in run_output.steps_requiring_confirmation:12 req.confirm()13 14 for event in workflow.continue_run(run_output, stream=True, stream_events=True):15 pass16 17 session = workflow.get_session()18 run_output = session.runs[-1]