Error Handling HITL
Pause on step failures to let users retry or skip.
Steps can pause when they encounter errors, letting users decide to retry or skip the failed step.
Error Pause Mode
Set on_error=OnError.pause to pause when a step fails:
1from kern.workflow import Workflow, OnError2from kern.workflow.step import Step3from kern.workflow.types import StepInput, StepOutput4from kern.db.sqlite import SqliteDb5import random67def unreliable_api_call(step_input: StepInput) -> StepOutput:8 if random.random() < 0.7: # 70% failure rate9 raise Exception("API call failed: Connection timeout")10 return StepOutput(content="API call succeeded")1112def process_data(step_input: StepInput) -> StepOutput:13 return StepOutput(content=f"Processed: {step_input.previous_step_content}")1415workflow = Workflow(16 name="api_workflow",17 db=SqliteDb(db_file="workflow.db"),18 steps=[19 Step(20 name="fetch_data",21 executor=unreliable_api_call,22 on_error=OnError.pause,23 ),24 Step(name="process", executor=process_data),25 ],26)2728run_output = workflow.run("Fetch and process")2930while run_output.is_paused:31 for req in run_output.steps_with_errors:32 print(f"Step '{req.step_name}' failed")33 print(f"Error: {req.error_message}")34 print(f"Retry count: {req.retry_count}")35 36 choice = input("Retry or skip? (r/s): ").lower()37 if choice == "r":38 req.retry()39 else:40 req.skip()41 42 run_output = workflow.continue_run(run_output)4344print(run_output.content)OnError Options
| Value | Behavior |
|---|---|
OnError.fail | Fail the workflow immediately (default) |
OnError.skip | Skip the step and continue |
OnError.pause | Pause for user decision (retry or skip) |
ErrorRequirement Properties
When a step fails with on_error=OnError.pause, an ErrorRequirement is created:
| Property | Type | Description |
|---|---|---|
step_name | str | Name of the failed step |
error_message | str | The exception message |
error_type | str | Exception class name (e.g., "ValueError") |
retry_count | int | Number of retry attempts so far |
ErrorRequirement Methods
| Method | Description |
|---|---|
req.retry() | Retry the failed step |
req.skip() | Skip the step and continue |
Retry Behavior
When you call req.retry():
- The step executes again with the same input
retry_countincrements- If it fails again, the workflow pauses again
- You can retry indefinitely or skip after some attempts
1for req in run_output.steps_with_errors:2 if req.retry_count < 3:3 print(f"Retrying (attempt {req.retry_count + 1}/3)")4 req.retry()5 else:6 print("Max retries reached, skipping")7 req.skip()Skip Behavior
When you call req.skip():
- The step is marked as skipped (not failed)
- The workflow continues with the next step
step_input.previous_step_contentwill beNonefor the next step
Combining with Confirmation
A step can have both error handling and confirmation:
1Step(2 name="risky_operation",3 executor=risky_function,4 requires_confirmation=True,5 confirmation_message="Execute risky operation?",6 on_error=OnError.pause,7)The confirmation happens first. If confirmed and the step fails, the error pause activates.
Streaming
Handle error HITL in streaming workflows:
1from kern.run.workflow import StepPausedEvent23for event in workflow.run("input", stream=True, stream_events=True):4 if isinstance(event, StepPausedEvent):5 print(f"Paused at: {event.step_name}")67session = workflow.get_session()8run_output = session.runs[-1]910while run_output.is_paused:11 for req in run_output.steps_with_errors:12 print(f"Error: {req.error_message}")13 req.retry() # or req.skip()14 15 for event in workflow.continue_run(run_output, stream=True, stream_events=True):16 pass17 18 session = workflow.get_session()19 run_output = session.runs[-1]Error Types
Common error scenarios and handling:
| Scenario | Recommended Action |
|---|---|
| Network timeout | Retry a few times, then skip |
| Rate limit | Retry after delay |
| Invalid input | Skip (retry won't help) |
| Resource unavailable | Retry or skip based on criticality |
1for req in run_output.steps_with_errors:2 if "timeout" in req.error_message.lower():3 if req.retry_count < 3:4 req.retry()5 else:6 req.skip()7 elif "rate limit" in req.error_message.lower():8 import time9 time.sleep(5) # Wait before retry10 req.retry()11 else:12 req.skip() # Unknown error, skip