Error Handling HITL

Pause on step failures to let users retry or skip.

Steps can pause when they encounter errors, letting users decide to retry or skip the failed step.

Error Pause Mode

Set on_error=OnError.pause to pause when a step fails:

1from kern.workflow import Workflow, OnError
2from kern.workflow.step import Step
3from kern.workflow.types import StepInput, StepOutput
4from kern.db.sqlite import SqliteDb
5import random
6
7def unreliable_api_call(step_input: StepInput) -> StepOutput:
8 if random.random() < 0.7: # 70% failure rate
9 raise Exception("API call failed: Connection timeout")
10 return StepOutput(content="API call succeeded")
11
12def process_data(step_input: StepInput) -> StepOutput:
13 return StepOutput(content=f"Processed: {step_input.previous_step_content}")
14
15workflow = Workflow(
16 name="api_workflow",
17 db=SqliteDb(db_file="workflow.db"),
18 steps=[
19 Step(
20 name="fetch_data",
21 executor=unreliable_api_call,
22 on_error=OnError.pause,
23 ),
24 Step(name="process", executor=process_data),
25 ],
26)
27
28run_output = workflow.run("Fetch and process")
29
30while run_output.is_paused:
31 for req in run_output.steps_with_errors:
32 print(f"Step '{req.step_name}' failed")
33 print(f"Error: {req.error_message}")
34 print(f"Retry count: {req.retry_count}")
35
36 choice = input("Retry or skip? (r/s): ").lower()
37 if choice == "r":
38 req.retry()
39 else:
40 req.skip()
41
42 run_output = workflow.continue_run(run_output)
43
44print(run_output.content)

OnError Options

ValueBehavior
OnError.failFail the workflow immediately (default)
OnError.skipSkip the step and continue
OnError.pausePause for user decision (retry or skip)

ErrorRequirement Properties

When a step fails with on_error=OnError.pause, an ErrorRequirement is created:

PropertyTypeDescription
step_namestrName of the failed step
error_messagestrThe exception message
error_typestrException class name (e.g., "ValueError")
retry_countintNumber of retry attempts so far

ErrorRequirement Methods

MethodDescription
req.retry()Retry the failed step
req.skip()Skip the step and continue

Retry Behavior

When you call req.retry():

  1. The step executes again with the same input
  2. retry_count increments
  3. If it fails again, the workflow pauses again
  4. You can retry indefinitely or skip after some attempts
1for req in run_output.steps_with_errors:
2 if req.retry_count < 3:
3 print(f"Retrying (attempt {req.retry_count + 1}/3)")
4 req.retry()
5 else:
6 print("Max retries reached, skipping")
7 req.skip()

Skip Behavior

When you call req.skip():

  1. The step is marked as skipped (not failed)
  2. The workflow continues with the next step
  3. step_input.previous_step_content will be None for the next step

Combining with Confirmation

A step can have both error handling and confirmation:

1Step(
2 name="risky_operation",
3 executor=risky_function,
4 requires_confirmation=True,
5 confirmation_message="Execute risky operation?",
6 on_error=OnError.pause,
7)

The confirmation happens first. If confirmed and the step fails, the error pause activates.

Streaming

Handle error HITL in streaming workflows:

1from kern.run.workflow import StepPausedEvent
2
3for event in workflow.run("input", stream=True, stream_events=True):
4 if isinstance(event, StepPausedEvent):
5 print(f"Paused at: {event.step_name}")
6
7session = workflow.get_session()
8run_output = session.runs[-1]
9
10while run_output.is_paused:
11 for req in run_output.steps_with_errors:
12 print(f"Error: {req.error_message}")
13 req.retry() # or req.skip()
14
15 for event in workflow.continue_run(run_output, stream=True, stream_events=True):
16 pass
17
18 session = workflow.get_session()
19 run_output = session.runs[-1]

Error Types

Common error scenarios and handling:

ScenarioRecommended Action
Network timeoutRetry a few times, then skip
Rate limitRetry after delay
Invalid inputSkip (retry won't help)
Resource unavailableRetry or skip based on criticality
1for req in run_output.steps_with_errors:
2 if "timeout" in req.error_message.lower():
3 if req.retry_count < 3:
4 req.retry()
5 else:
6 req.skip()
7 elif "rate limit" in req.error_message.lower():
8 import time
9 time.sleep(5) # Wait before retry
10 req.retry()
11 else:
12 req.skip() # Unknown error, skip

Developer Resources