Step HITL

Pause individual steps for confirmation, user input, or output review.

Steps support three HITL modes: confirmation (approve/reject before execution), user input (collect parameters before execution), and output review (review/edit/reject after execution).

All HITL settings are configured via HumanReview.

Confirmation

Pause before executing a step. The user confirms to proceed or rejects to skip/cancel.

1from kern.workflow import Workflow, OnReject
2from kern.workflow.step import Step
3from kern.workflow.types import HumanReview
4from kern.db.sqlite import SqliteDb
5
6workflow = Workflow(
7 name="data_pipeline",
8 db=SqliteDb(db_file="workflow.db"),
9 steps=[
10 Step(name="fetch_data", agent=fetch_agent),
11 Step(
12 name="process_data",
13 agent=process_agent,
14 human_review=HumanReview(
15 requires_confirmation=True,
16 confirmation_message="Process sensitive data?",
17 on_reject=OnReject.skip,
18 ),
19 ),
20 Step(name="save_results", agent=save_agent),
21 ],
22)
23
24run_output = workflow.run("Process user data")
25
26if run_output.is_paused:
27 for req in run_output.steps_requiring_confirmation:
28 print(f"Step: {req.step_name}")
29 print(f"Message: {req.confirmation_message}")
30
31 if input("Confirm? (y/n): ").lower() == "y":
32 req.confirm()
33 else:
34 req.reject()
35
36 run_output = workflow.continue_run(run_output)
37
38print(run_output.content)

Parameters

FieldTypeDescription
requires_confirmationboolPause for user confirmation before execution
confirmation_messagestrMessage shown to the user
on_rejectOnRejectAction when rejected: skip (default), cancel

OnReject Options

ValueBehavior
OnReject.skipSkip this step and continue with the next (default)
OnReject.cancelCancel the entire workflow
OnReject.retryRe-execute the step. Used with output review

User Input

Collect parameters from the user before step execution. Input values are passed to the step via step_input.additional_data["user_input"].

1from kern.workflow import Workflow
2from kern.workflow.step import Step
3from kern.workflow.types import HumanReview, StepInput, StepOutput, UserInputField
4from kern.db.sqlite import SqliteDb
5
6def process_with_params(step_input: StepInput) -> StepOutput:
7 user_input = step_input.additional_data.get("user_input", {})
8 threshold = user_input.get("threshold", 0.5)
9 mode = user_input.get("mode", "fast")
10
11 return StepOutput(content=f"Processed with threshold={threshold}, mode={mode}")
12
13workflow = Workflow(
14 name="configurable_pipeline",
15 db=SqliteDb(db_file="workflow.db"),
16 steps=[
17 Step(name="analyze", agent=analyze_agent),
18 Step(
19 name="process",
20 executor=process_with_params,
21 human_review=HumanReview(
22 requires_user_input=True,
23 user_input_message="Configure processing:",
24 user_input_schema=[
25 UserInputField(
26 name="threshold",
27 field_type="float",
28 description="Processing threshold (0.0-1.0)",
29 required=True,
30 ),
31 UserInputField(
32 name="mode",
33 field_type="str",
34 description="Mode: 'fast' or 'accurate'",
35 required=True,
36 ),
37 UserInputField(
38 name="batch_size",
39 field_type="int",
40 description="Records per batch",
41 required=False,
42 ),
43 ],
44 ),
45 ),
46 Step(name="report", agent=report_agent),
47 ],
48)
49
50run_output = workflow.run("Process Q4 data")
51
52if run_output.is_paused:
53 for req in run_output.steps_requiring_user_input:
54 print(f"Step: {req.step_name}")
55 print(f"Message: {req.user_input_message}")
56
57 values = {}
58 for field in req.user_input_schema:
59 marker = "*" if field.required else ""
60 prompt = f"{field.name}{marker} ({field.field_type}): "
61 value = input(prompt)
62
63 if value:
64 if field.field_type == "int":
65 values[field.name] = int(value)
66 elif field.field_type == "float":
67 values[field.name] = float(value)
68 elif field.field_type == "bool":
69 values[field.name] = value.lower() in ("true", "yes", "1")
70 else:
71 values[field.name] = value
72
73 req.set_user_input(**values)
74
75 run_output = workflow.continue_run(run_output)
76
77print(run_output.content)

Parameters

FieldTypeDescription
requires_user_inputboolPause to collect user input before execution
user_input_messagestrMessage shown to the user
user_input_schemaList[UserInputField]Schema defining expected input fields

UserInputField

FieldTypeDescription
namestrField name (key in user input dict)
field_typestrType: str, int, float, bool
descriptionstrDescription shown to user
requiredboolWhether field is required (default: True)
allowed_valuesList[Any]Optional list of valid values

Accessing User Input

User input is available in the step function via step_input.additional_data["user_input"]:

1def my_step(step_input: StepInput) -> StepOutput:
2 user_input = step_input.additional_data.get("user_input", {})
3
4 threshold = user_input.get("threshold")
5 mode = user_input.get("mode")
6
7 return StepOutput(content=f"Done with {threshold}, {mode}")

For agent-based steps, user input is automatically appended to the message.

Output Review

Pause after a step executes so the user can review the output before the workflow continues. If the user rejects, the step re-executes (up to max_retries). See the dedicated Output Review page for full details including editing output and reject with feedback.

1from kern.workflow import Workflow, OnReject
2from kern.workflow.step import Step
3from kern.workflow.types import HumanReview
4from kern.db.sqlite import SqliteDb
5
6workflow = Workflow(
7 name="content_pipeline",
8 db=SqliteDb(db_file="workflow.db"),
9 steps=[
10 Step(name="research", agent=research_agent),
11 Step(
12 name="draft",
13 agent=draft_agent,
14 human_review=HumanReview(
15 requires_output_review=True,
16 output_review_message="Review the draft. Approve to continue or reject to regenerate.",
17 on_reject=OnReject.retry,
18 max_retries=3,
19 ),
20 ),
21 Step(name="publish", agent=publish_agent),
22 ],
23)
24
25run_output = workflow.run("Write a blog post about AI agents")
26
27while run_output.is_paused:
28 for req in run_output.steps_requiring_output_review:
29 print(f"Output: {req.step_output.content}")
30 print(f"Message: {req.output_review_message}")
31
32 if input("Approve? (y/n): ").lower() == "y":
33 req.confirm()
34 else:
35 feedback = input("Feedback (optional): ").strip()
36 if feedback:
37 req.reject(feedback=feedback)
38 else:
39 req.reject()
40
41 run_output = workflow.continue_run(run_output)
42
43print(run_output.content)

Parameters

FieldTypeDescription
requires_output_reviewboolPause after execution for user review
output_review_messagestrMessage shown during review
on_rejectOnRejectAction when rejected. Use OnReject.retry to re-execute
max_retriesintMax re-executions on rejection (default: 3)

Timeout

Set a deadline for the user to respond. If it expires, on_timeout fires automatically. See the dedicated Timeout page for full details.

1from kern.workflow import OnTimeout
2
3Step(
4 name="review_report",
5 agent=report_agent,
6 human_review=HumanReview(
7 requires_output_review=True,
8 output_review_message="Review the report.",
9 on_reject=OnReject.retry,
10 max_retries=2,
11 timeout=120, # 2 minutes
12 on_timeout=OnTimeout.approve,
13 ),
14)
OnTimeout ValueBehavior
OnTimeout.approveAuto-approve and continue
OnTimeout.rejectAuto-reject (triggers retry if on_reject=OnReject.retry)
OnTimeout.cancelCancel the workflow

The @pause Decorator

Use the @pause decorator to configure HITL directly on functions:

1from kern.workflow.decorators import pause
2from kern.workflow.types import StepInput, StepOutput, UserInputField
3
4@pause(
5 requires_confirmation=True,
6 confirmation_message="Execute this step?",
7)
8def step_with_confirmation(step_input: StepInput) -> StepOutput:
9 return StepOutput(content="Executed after confirmation")
10
11@pause(
12 requires_user_input=True,
13 user_input_message="Enter parameters:",
14 user_input_schema=[
15 UserInputField(name="value", field_type="str", required=True),
16 ],
17)
18def step_with_input(step_input: StepInput) -> StepOutput:
19 value = step_input.additional_data["user_input"]["value"]
20 return StepOutput(content=f"Received: {value}")
21
22# Decorator config is auto-detected when used in a Step
23workflow = Workflow(
24 steps=[
25 Step(name="confirm_step", executor=step_with_confirmation),
26 Step(name="input_step", executor=step_with_input),
27 ],
28 ...
29)

Streaming

Handle HITL in streaming workflows:

1from kern.run.workflow import StepPausedEvent
2
3for event in workflow.run("input", stream=True, stream_events=True):
4 if isinstance(event, StepPausedEvent):
5 print(f"Paused at: {event.step_name}")
6
7session = workflow.get_session()
8run_output = session.runs[-1]
9
10while run_output.is_paused:
11 for req in run_output.steps_requiring_confirmation:
12 req.confirm()
13
14 for event in workflow.continue_run(run_output, stream=True, stream_events=True):
15 pass
16
17 session = workflow.get_session()
18 run_output = session.runs[-1]

Developer Resources