Steps HITL
Confirm before executing a pipeline of grouped steps.
The Steps component groups multiple steps into a pipeline. It supports confirmation HITL, pausing before the entire pipeline executes.
All HITL settings are configured via HumanReview. Steps only supports requires_confirmation. Passing unsupported fields (like requires_output_review) raises a ValueError.
Pipeline Confirmation
When requires_confirmation=True, the pipeline pauses before executing any of its steps:
- Confirm: Execute all steps in the pipeline
- Reject: Skip the entire pipeline
1from kern.workflow import Workflow2from kern.workflow.step import Step3from kern.workflow.steps import Steps4from kern.workflow.types import HumanReview, StepInput, StepOutput5from kern.db.sqlite import SqliteDb67def validate_data(step_input: StepInput) -> StepOutput:8 return StepOutput(content="Validation: Schema verified")910def transform_data(step_input: StepInput) -> StepOutput:11 return StepOutput(content="Transform: Data normalized")1213def enrich_data(step_input: StepInput) -> StepOutput:14 return StepOutput(content="Enrichment: External data merged")1516workflow = Workflow(17 name="data_pipeline",18 db=SqliteDb(db_file="workflow.db"),19 steps=[20 Step(name="collect", executor=collect_data),21 Steps(22 name="advanced_processing",23 steps=[24 Step(name="validate", executor=validate_data),25 Step(name="transform", executor=transform_data),26 Step(name="enrich", executor=enrich_data),27 ],28 human_review=HumanReview(29 requires_confirmation=True,30 confirmation_message="Run advanced processing pipeline?",31 ),32 ),33 Step(name="report", executor=generate_report),34 ],35)3637run_output = workflow.run("Process data")3839if run_output.is_paused:40 for req in run_output.steps_requiring_confirmation:41 print(f"Pipeline: {req.step_name}")42 print(f"Message: {req.confirmation_message}")43 44 if input("Run pipeline? (y/n): ").lower() == "y":45 req.confirm()46 print("Executing pipeline")47 else:48 req.reject()49 print("Skipping pipeline")50 51 run_output = workflow.continue_run(run_output)5253print(run_output.content)Parameters
| Parameter | Type | Description |
|---|---|---|
requires_confirmation | bool | Pause before executing the pipeline |
confirmation_message | str | Message shown to the user |
on_reject | OnReject | Action when rejected: skip (default), cancel |
Pipeline Behavior
The confirmation happens once before the pipeline starts. Individual steps within the pipeline do not pause for confirmation (unless they have their own HITL configuration).
| User Action | Result |
|---|---|
| Confirm | Execute all steps in sequence |
| Reject | Skip all steps in the pipeline |
Streaming
Handle pipeline HITL in streaming workflows:
1from kern.run.workflow import StepPausedEvent23for event in workflow.run("input", stream=True, stream_events=True):4 if isinstance(event, StepPausedEvent):5 print(f"Paused at: {event.step_name}")67session = workflow.get_session()8run_output = session.runs[-1]910while run_output.is_paused:11 for req in run_output.steps_requiring_confirmation:12 req.confirm()13 14 for event in workflow.continue_run(run_output, stream=True, stream_events=True):15 pass16 17 session = workflow.get_session()18 run_output = session.runs[-1]