Early Stop a Workflow
This example demonstrates **Workflows 2.0** early termination of a running workflow.
This example shows how to create workflows that can terminate gracefully when quality conditions aren't met, preventing downstream processing of invalid or unsafe data.
When to use: When you need safety mechanisms, quality gates, or validation checkpoints that should prevent downstream processing if conditions aren't met. Ideal for data validation pipelines, security checks, quality assurance workflows, or any process where continuing with invalid inputs could cause problems.
1from kern.agent import Agent2from kern.models.openai import OpenAIResponses3from kern.workflow import Workflow4from kern.workflow.types import StepInput, StepOutput56# Create agents with more specific validation criteria7data_validator = Agent(8 name="Data Validator",9 model=OpenAIResponses(id="gpt-5.2"),10 instructions=[11 "You are a data validator. Analyze the provided data and determine if it's valid.",12 "For data to be VALID, it must meet these criteria:",13 "- user_count: Must be a positive number (> 0)",14 "- revenue: Must be a positive number (> 0)",15 "- date: Must be in a reasonable date format (YYYY-MM-DD)",16 "",17 "Return exactly 'VALID' if all criteria are met.",18 "Return exactly 'INVALID' if any criteria fail.",19 "Also briefly explain your reasoning.",20 ],21)2223data_processor = Agent(24 name="Data Processor",25 model=OpenAIResponses(id="gpt-5.2"),26 instructions="Process and transform the validated data.",27)2829report_generator = Agent(30 name="Report Generator",31 model=OpenAIResponses(id="gpt-5.2"),32 instructions="Generate a final report from processed data.",33)343536def early_exit_validator(step_input: StepInput) -> StepOutput:37 """38 Custom function that checks data quality and stops workflow early if invalid39 """40 # Get the validation result from previous step41 validation_result = step_input.previous_step_content or ""4243 if "INVALID" in validation_result.upper():44 return StepOutput(45 content="Data validation failed. Workflow stopped early to prevent processing invalid data.",46 stop=True, # Stop the entire workflow here47 )48 else:49 return StepOutput(50 content="Data validation passed. Continuing with processing...",51 stop=False, # Continue normally52 )535455# Create workflow with conditional early termination56workflow = Workflow(57 name="Data Processing with Early Exit",58 description="Process data but stop early if validation fails",59 steps=[60 data_validator, # Step 1: Validate data61 early_exit_validator, # Step 2: Check validation and possibly stop early62 data_processor, # Step 3: Process data (only if validation passed)63 report_generator, # Step 4: Generate report (only if processing completed)64 ],65)6667if __name__ == "__main__":68 print("\n=== Testing with INVALID data ===")69 workflow.print_response(70 input="Process this data: {'user_count': -50, 'revenue': 'invalid_amount', 'date': 'bad_date'}"71 )7273 print("=== Testing with VALID data ===")74 workflow.print_response(75 input="Process this data: {'user_count': 1000, 'revenue': 50000, 'date': '2024-01-15'}"76 )