# SpiffWorkflow ## Introduction SpiffWorkflow is a flexible, Python-based workflow execution library designed to provide robust workflow and business process management capabilities. The library serves as the workflow engine underlying Spiff Arena and focuses on executing BPMN 2.0 (Business Process Model and Notation) workflows with full state management. It enables developers to parse BPMN XML files, execute complex workflows with multiple task types, manage long-running processes with serialization/deserialization, and integrate custom Python scripting for business logic execution. The library implements a comprehensive state-driven execution model where workflows are parsed into specifications and then instantiated as running workflows with task trees. It supports all major BPMN elements including user tasks, script tasks, service tasks, gateways (parallel, exclusive, inclusive, event-based), events (start, end, boundary, intermediate), subprocesses, and call activities. Additionally, it provides DMN (Decision Model and Notation) integration for business rules, customizable script engines for Python code execution within tasks, event-based inter-task communication, and comprehensive serialization for workflow persistence and recovery. ## API Reference ### Parse BPMN Workflow Files Parse BPMN XML files into executable workflow specifications with validation. ```python from SpiffWorkflow.bpmn.parser import BpmnParser, BpmnValidator # Create parser with optional validator validator = BpmnValidator() parser = BpmnParser(validator=validator) # Add single BPMN file parser.add_bpmn_file('path/to/process.bpmn') # Add multiple files parser.add_bpmn_files(['process1.bpmn', 'process2.bpmn']) # Add files by glob pattern parser.add_bpmn_files_by_glob('processes/**/*.bpmn') # Get specific process specification spec = parser.get_spec('Process_1') # Get subprocess specifications subprocess_specs = parser.get_subprocess_specs('Process_1') # Get all available process specifications all_specs = parser.find_all_specs() for process_id, spec in all_specs.items(): print(f"Process: {process_id} - {spec.description}") # Get collaboration (for processes with message passing) collaboration, spec_map = parser.get_collaboration('Collaboration_1') ``` ### Execute BPMN Workflows Create and execute workflow instances with automatic task progression and manual task handling. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow import TaskState # Parse BPMN and get specifications parser = BpmnParser() parser.add_bpmn_file('approval_process.bpmn') spec = parser.get_spec('approval_process') subprocess_specs = parser.get_subprocess_specs('approval_process') # Create workflow instance workflow = BpmnWorkflow(spec, subprocess_specs) # Set initial workflow data workflow.data['request_amount'] = 5000 workflow.data['requester'] = 'john.doe@example.com' # Main execution loop while not workflow.is_completed(): # Execute all automated tasks (gateways, script tasks, etc) workflow.do_engine_steps() # Get ready manual tasks (user tasks) ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) if ready_tasks: for task in ready_tasks: print(f"Task: {task.task_spec.name}") print(f"Description: {task.task_spec.description}") # Collect user input (from form, API, etc) if task.task_spec.name == 'manager_approval': task.set_data(approved=True, comments='Approved by manager') # Complete the task task.run() else: # Handle waiting tasks (timers, message events) workflow.refresh_waiting_tasks() break # Access final results if workflow.is_completed(): print(f"Workflow completed successfully") print(f"Final data: {workflow.data}") print(f"Total tasks: {len(workflow.get_tasks(state=TaskState.COMPLETED))}") ``` ### Custom Script Engine Extend the Python script engine with custom functions and services for use in BPMN script tasks. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow.bpmn.script_engine import PythonScriptEngine, TaskDataEnvironment import datetime # Define custom functions def calculate_discount(amount, customer_tier): """Custom business logic for discount calculation""" discounts = {'gold': 0.2, 'silver': 0.1, 'bronze': 0.05} return amount * (1 - discounts.get(customer_tier, 0)) def format_currency(amount): """Format amount as currency""" return f"${amount:,.2f}" def send_notification(recipient, message): """Send notification (mock implementation)""" print(f"Notification to {recipient}: {message}") return True # Create custom environment with functions custom_env = TaskDataEnvironment({ 'calculate_discount': calculate_discount, 'format_currency': format_currency, 'send_notification': send_notification, 'datetime': datetime, 'timedelta': datetime.timedelta, }) # Create custom script engine script_engine = PythonScriptEngine(environment=custom_env) # Parse workflow parser = BpmnParser() parser.add_bpmn_file('order_process.bpmn') spec = parser.get_spec('order_process') subprocess_specs = parser.get_subprocess_specs('order_process') # Create workflow with custom script engine workflow = BpmnWorkflow(spec, subprocess_specs, script_engine=script_engine) # Set workflow data workflow.data['order_amount'] = 1000 workflow.data['customer_tier'] = 'gold' workflow.data['customer_email'] = 'customer@example.com' # Execute workflow - script tasks can now use custom functions workflow.do_engine_steps() # Example BPMN script task would contain: # final_amount = calculate_discount(order_amount, customer_tier) # formatted = format_currency(final_amount) # send_notification(customer_email, f"Your order total is {formatted}") ``` ### Serialize and Restore Workflows Save workflow state to storage and restore for long-running processes. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer from SpiffWorkflow import TaskState import json # Create serializer serializer = BpmnWorkflowSerializer() # Parse and create workflow parser = BpmnParser() parser.add_bpmn_file('multi_step_process.bpmn') spec = parser.get_spec('multi_step_process') subprocess_specs = parser.get_subprocess_specs('multi_step_process') workflow = BpmnWorkflow(spec, subprocess_specs) workflow.data['process_id'] = 'PRO-12345' workflow.data['start_time'] = '2025-01-15T10:00:00' # Execute some steps workflow.do_engine_steps() ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) if ready_tasks: first_task = ready_tasks[0] first_task.set_data(step1_complete=True, user_input='Initial data') first_task.run() # Serialize workflow to dictionary workflow_dict = serializer.to_dict(workflow) # Convert to JSON for storage (database, file, Redis, etc) workflow_json = json.dumps(workflow_dict, indent=2) # Save to file with open('workflow_state.json', 'w') as f: f.write(workflow_json) # Save to database (pseudo-code) # db.save_workflow_state(workflow_id='PRO-12345', state=workflow_json) print(f"Workflow serialized: {len(workflow_json)} bytes") # Later: restore workflow from storage with open('workflow_state.json', 'r') as f: restored_dict = json.loads(f.read()) # Deserialize workflow restored_workflow = serializer.from_dict(restored_dict) # Continue execution from saved state restored_workflow.do_engine_steps() next_tasks = restored_workflow.get_tasks(state=TaskState.READY, manual=True) print(f"Restored workflow has {len(next_tasks)} tasks ready") print(f"Workflow data preserved: {restored_workflow.data}") ``` ### Task Filtering and Navigation Query and navigate workflow tasks using various filters. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow import TaskState parser = BpmnParser() parser.add_bpmn_file('complex_process.bpmn') spec = parser.get_spec('complex_process') workflow = BpmnWorkflow(spec) workflow.do_engine_steps() # Get all ready manual tasks ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) print(f"Ready manual tasks: {len(ready_tasks)}") # Get tasks in specific lane customer_tasks = workflow.get_tasks( state=TaskState.READY, manual=True, lane='Customer' ) for task in customer_tasks: print(f"Customer task: {task.task_spec.name}") # Get all completed tasks completed_tasks = workflow.get_tasks(state=TaskState.COMPLETED) print(f"Completed: {len(completed_tasks)} tasks") # Get waiting tasks (events, timers) waiting_tasks = workflow.get_tasks(state=TaskState.WAITING) for task in waiting_tasks: print(f"Waiting: {task.task_spec.name}") # Get next ready automated task next_task = workflow.get_next_task(state=TaskState.READY, manual=False) if next_task: print(f"Next automated task: {next_task.task_spec.name}") next_task.run() # Get tasks by spec name specific_tasks = [t for t in workflow.get_tasks() if t.task_spec.name == 'approval_task'] # Iterate through subprocesses for task_id, subprocess in workflow.subprocesses.items(): parent_task = workflow.get_task_from_id(task_id) subprocess_tasks = subprocess.get_tasks() print(f"Subprocess {parent_task.task_spec.name}: {len(subprocess_tasks)} tasks") # Check workflow status print(f"Workflow completed: {workflow.is_completed()}") print(f"Workflow success: {workflow.is_completed() and not workflow.cancelled}") # Access last completed task if workflow.last_task: print(f"Last task: {workflow.last_task.task_spec.name}") print(f"Last task data: {workflow.last_task.data}") ``` ### Event Handling and Messaging Send and receive events for inter-process communication and external triggers. ```python from SpiffWorkflow.bpmn import BpmnWorkflow, BpmnEvent from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow.bpmn.specs.event_definitions.message import MessageEventDefinition from SpiffWorkflow.bpmn.specs.event_definitions.signal import SignalEventDefinition from SpiffWorkflow import TaskState # Parse workflow with message/signal events parser = BpmnParser() parser.add_bpmn_file('order_fulfillment.bpmn') spec = parser.get_spec('order_fulfillment') workflow = BpmnWorkflow(spec) # Start workflow and run to waiting state workflow.data['order_id'] = 'ORD-12345' workflow.do_engine_steps() # Create and send message event message_def = MessageEventDefinition('payment_received') message_event = BpmnEvent( message_def, payload={ 'order_id': 'ORD-12345', 'payment_amount': 599.99, 'payment_method': 'credit_card', 'transaction_id': 'TXN-98765' } ) # Send event to workflow try: workflow.send_event(message_event) workflow.refresh_waiting_tasks() workflow.do_engine_steps() print("Payment event processed successfully") except Exception as e: print(f"Event not handled: {e}") # Send signal event (broadcasts to all processes) signal_def = SignalEventDefinition('order_shipped') signal_event = BpmnEvent( signal_def, payload={ 'order_id': 'ORD-12345', 'tracking_number': 'TRK-ABC123', 'carrier': 'FedEx' } ) workflow.catch(signal_event) workflow.refresh_waiting_tasks() workflow.do_engine_steps() # Get uncaught external events external_events = workflow.get_events() for event in external_events: print(f"Uncaught event: {event.event_definition.name}") print(f"Event payload: {event.payload}") # Forward to external system or another workflow # Check for waiting events waiting_event_details = workflow.waiting_events() for event_detail in waiting_event_details: print(f"Waiting for event: {event_detail['event_type']}") print(f"Task: {event_detail['task_name']}") # Multi-instance correlation workflow.correlations = { 'order_correlation': {'order_id': 'ORD-12345'} } # Check if workflow is waiting for events waiting_tasks = workflow.get_tasks(state=TaskState.WAITING) for task in waiting_tasks: if hasattr(task.task_spec, 'event_definition'): print(f"Waiting for: {task.task_spec.event_definition.name}") ``` ### Task Data Management Manage task and workflow data with proper scoping and inheritance. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow import TaskState parser = BpmnParser() parser.add_bpmn_file('data_flow_process.bpmn') spec = parser.get_spec('data_flow_process') workflow = BpmnWorkflow(spec) # Set workflow-level data (global scope) workflow.data['company_name'] = 'Acme Corp' workflow.data['fiscal_year'] = 2025 workflow.data['department'] = 'Sales' workflow.do_engine_steps() # Get ready task ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) if ready_tasks: task = ready_tasks[0] # Task inherits workflow data print(f"Company: {task.data.get('company_name')}") # Set task-specific data task.set_data( employee_id='EMP-123', submission_date='2025-01-15', expense_amount=1250.50 ) # Update multiple fields task.update_data({ 'manager_email': 'manager@acme.com', 'category': 'travel', 'receipt_attached': True }) # Access individual fields amount = task.get_data('expense_amount', default=0) print(f"Expense amount: ${amount}") # Set internal data (not inherited by child tasks) task.internal_data['processing_notes'] = 'Requires manager approval' task.internal_data['attempt_count'] = 1 # Complete task task.run() # Data flows to next task workflow.do_engine_steps() next_task = workflow.get_next_task(state=TaskState.READY) if next_task: # Next task has access to previous task's data print(f"Next task sees employee_id: {next_task.data.get('employee_id')}") # But not internal data print(f"Internal data not inherited: {next_task.data.get('processing_notes')}") # Access data from completed tasks completed = workflow.get_tasks(state=TaskState.COMPLETED) for task in completed: print(f"Task {task.task_spec.name} data: {task.data}") # Data objects (BPMN data stores) if 'data_objects' in workflow.data: print(f"Data objects: {workflow.data['data_objects']}") ``` ### Custom Parser and Task Types Extend parser with custom task types and parsers. ```python from SpiffWorkflow.bpmn.parser import BpmnParser, TaskParser from SpiffWorkflow.bpmn.parser.util import full_tag from SpiffWorkflow.bpmn.specs.defaults import UserTask from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer, DEFAULT_CONFIG from SpiffWorkflow.bpmn.serializer.helpers.bpmn_converter import BpmnConverter from copy import deepcopy # Define custom task type class ApprovalTask(UserTask): """Custom task with approval-specific logic""" def __init__(self, wf_spec, name, approval_level='standard', **kwargs): super().__init__(wf_spec, name, **kwargs) self.approval_level = approval_level self.requires_justification = approval_level in ['high', 'critical'] def _on_ready_hook(self, task): """Called when task becomes ready""" super()._on_ready_hook(task) task.data['approval_level'] = self.approval_level task.data['requires_justification'] = self.requires_justification task.data['approval_status'] = 'pending' # Define custom parser for the task class ApprovalTaskParser(TaskParser): """Parser for custom approval task""" def parse_node(self, node): """Extract custom attributes from BPMN XML""" # Get standard attributes task_spec = super().parse_node(node) # Extract custom attribute approval_level = node.get('approvalLevel', 'standard') task_spec.approval_level = approval_level return task_spec # Create custom parser class class CustomBpmnParser(BpmnParser): """Parser with custom task type support""" OVERRIDE_PARSER_CLASSES = { full_tag('userTask'): (ApprovalTaskParser, ApprovalTask), # Add more custom mappings as needed } # Configure serializer for custom task class ApprovalTaskConverter(BpmnConverter): """Serializer for approval task""" def to_dict(self, spec): dct = super().to_dict(spec) dct['approval_level'] = spec.approval_level return dct def from_dict(self, dct): return ApprovalTask(**dct) # Create custom serializer configuration CUSTOM_CONFIG = deepcopy(DEFAULT_CONFIG) CUSTOM_CONFIG[ApprovalTask] = ApprovalTaskConverter # Register custom serializer registry = BpmnWorkflowSerializer.configure(CUSTOM_CONFIG) serializer = BpmnWorkflowSerializer(registry) # Use custom parser parser = CustomBpmnParser() parser.add_bpmn_file('approval_workflow.bpmn') spec = parser.get_spec('approval_process') from SpiffWorkflow.bpmn import BpmnWorkflow workflow = BpmnWorkflow(spec) workflow.do_engine_steps() # Custom tasks work with custom attributes ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) for task in ready_tasks: if isinstance(task.task_spec, ApprovalTask): print(f"Approval level: {task.data['approval_level']}") print(f"Requires justification: {task.data['requires_justification']}") ``` ### Workflow Callbacks and Hooks Implement callbacks for workflow events and task lifecycle. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow import TaskState import logging # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define callback functions def will_complete_task_callback(workflow, task): """Called before a task completes""" logger.info(f"About to complete task: {task.task_spec.name}") logger.info(f"Task data: {task.data}") # Validation before completion if task.task_spec.name == 'submit_order': if not task.data.get('customer_email'): raise ValueError("Customer email is required") # Log to external system # audit_log.record_task_start(task.id, task.task_spec.name) def did_complete_task_callback(workflow, task): """Called after a task completes""" logger.info(f"Completed task: {task.task_spec.name}") logger.info(f"Final task data: {task.data}") # Send notifications if task.task_spec.name == 'approve_request': approved = task.data.get('approved', False) if approved: logger.info("Sending approval notification") # send_notification(task.data['requester_email'], 'Request approved') # Update external systems # external_system.update_task_status(task.id, 'completed') def will_refresh_task_callback(workflow, task): """Called before refreshing a waiting task""" logger.info(f"Refreshing task: {task.task_spec.name}") # Check external conditions if hasattr(task.task_spec, 'event_definition'): logger.info(f"Checking for event: {task.task_spec.event_definition.name}") def did_refresh_task_callback(workflow, task): """Called after refreshing a waiting task""" if task.state == TaskState.READY: logger.info(f"Task {task.task_spec.name} is now ready") # Parse and create workflow parser = BpmnParser() parser.add_bpmn_file('order_process.bpmn') spec = parser.get_spec('order_process') workflow = BpmnWorkflow(spec) # Set initial data workflow.data['order_id'] = 'ORD-12345' workflow.data['customer_email'] = 'customer@example.com' # Execute with callbacks try: workflow.do_engine_steps( will_complete_task=will_complete_task_callback, did_complete_task=did_complete_task_callback ) # Handle manual tasks ready_tasks = workflow.get_tasks(state=TaskState.READY, manual=True) for task in ready_tasks: logger.info(f"Processing manual task: {task.task_spec.name}") # Collect input task.set_data(approved=True) # Complete with callbacks will_complete_task_callback(workflow, task) task.run() did_complete_task_callback(workflow, task) # Continue automated execution workflow.do_engine_steps( will_complete_task=will_complete_task_callback, did_complete_task=did_complete_task_callback ) # Refresh waiting tasks with callbacks workflow.refresh_waiting_tasks( will_refresh_task=will_refresh_task_callback, did_refresh_task=did_refresh_task_callback ) except Exception as e: logger.error(f"Workflow error: {e}") # Handle error tasks error_tasks = workflow.get_tasks(state=TaskState.ERROR) for task in error_tasks: logger.error(f"Task in error state: {task.task_spec.name}") logger.error(f"Error details: {task.data.get('error_message')}") ``` ### Subprocess and Call Activity Management Work with subprocesses and call activities for workflow composition. ```python from SpiffWorkflow.bpmn import BpmnWorkflow from SpiffWorkflow.bpmn.parser import BpmnParser from SpiffWorkflow import TaskState # Parse workflow with subprocesses parser = BpmnParser() parser.add_bpmn_files(['main_process.bpmn', 'reusable_subprocess.bpmn']) # Get main process and its subprocess specifications main_spec = parser.get_spec('main_process') subprocess_specs = parser.get_subprocess_specs('main_process') # Create workflow workflow = BpmnWorkflow(main_spec, subprocess_specs) workflow.data['order_id'] = 'ORD-12345' workflow.data['items'] = ['item1', 'item2', 'item3'] # Execute workflow workflow.do_engine_steps() # Access active subprocesses active_subprocesses = workflow.get_active_subprocesses() print(f"Active subprocesses: {len(active_subprocesses)}") for subprocess in active_subprocesses: print(f"Subprocess depth: {subprocess.depth}") print(f"Parent workflow: {subprocess.parent_workflow}") print(f"Subprocess tasks: {len(subprocess.get_tasks())}") # Access subprocess data print(f"Subprocess data: {subprocess.data}") # Get ready tasks in subprocess subprocess_ready = subprocess.get_tasks(state=TaskState.READY) for task in subprocess_ready: print(f"Subprocess task: {task.task_spec.name}") # Access all subprocesses (including completed) for task_id, subprocess in workflow.subprocesses.items(): parent_task = workflow.get_task_from_id(task_id) print(f"Subprocess parent: {parent_task.task_spec.name}") print(f"Subprocess completed: {subprocess.is_completed()}") # Get subprocess spec name subprocess_spec_name = subprocess.spec.name print(f"Subprocess spec: {subprocess_spec_name}") # Navigate from subprocess task to parent subprocess_task = None for sp in active_subprocesses: tasks = sp.get_tasks() if tasks: subprocess_task = tasks[0] break if subprocess_task: # Get parent task in main workflow parent_task = workflow.get_task_from_id(subprocess_task.workflow.parent_task_id) print(f"Parent task: {parent_task.task_spec.name}") # Get top-level workflow top_workflow = subprocess_task.workflow.top_workflow print(f"Top workflow: {top_workflow.spec.name}") # Get all tasks across workflow and subprocesses all_tasks = workflow.get_tasks() print(f"Total tasks (including subprocesses): {len(all_tasks)}") # Get tasks only from main workflow main_workflow_tasks = workflow.get_tasks(skip_subprocesses=True) print(f"Main workflow tasks only: {len(main_workflow_tasks)}") ``` ## Summary SpiffWorkflow provides a comprehensive Python library for executing BPMN 2.0 workflows with full state management and persistence. The library excels at long-running business processes that require human interaction, complex approval workflows with multiple decision points, process automation combining automated and manual tasks, integration of business rules through DMN, and event-driven process communication. It is particularly well-suited for applications that need to persist workflow state between executions, handle complex branching and parallel execution paths, integrate custom business logic through script engines, and maintain full audit trails of process execution. The library follows a clean separation between workflow specifications (parsed from BPMN XML) and workflow instances (runtime execution state), making it easy to version process definitions separately from running workflows. Integration patterns typically involve parsing BPMN files during application startup, creating workflow instances when business processes are triggered, executing automated tasks immediately with `do_engine_steps()`, presenting manual tasks to users through web interfaces or APIs, serializing workflow state to databases for persistence, and restoring workflows to continue execution after system restarts. The extensible architecture allows customization of task types, parsers, script engines, and data handling to meet specific business requirements while maintaining compatibility with standard BPMN 2.0 notation.