# Durable Workflow Durable Workflow is a powerful durable workflow engine for PHP/Laravel that enables developers to write long-running, persistent, distributed workflows (orchestrations) powered by Laravel Queues. It provides tools for defining complex asynchronous processes such as agentic AI workflows, financial transactions, data pipelines, microservices, job tracking, and user signup flows as sequences of activities that can run in parallel or series. The library is built on Laravel's queue system and database layer to store and manage workflow state using event sourcing. This ensures workflows are scalable, reliable, and can be resumed after failures. Workflows use PHP generators with the `yield` keyword to pause execution and wait for activities to complete, while maintaining determinism through state replay. Activities are individual units of work that perform specific tasks like API calls, data processing, or sending emails. ## Installation Install via Composer and run migrations to set up the workflow tables. ```bash # Install the package composer require laravel-workflow/laravel-workflow # Publish migrations php artisan vendor:publish --provider="Workflow\Providers\WorkflowServiceProvider" --tag="migrations" # Run migrations php artisan migrate # Start queue worker to process workflows php artisan queue:work ``` ## Creating a Workflow Extend the `Workflow` class and implement the `execute()` method as a generator that yields activities. ```php $orderId, 'paymentId' => $payment['id'], 'trackingNumber' => $shipment['tracking'], ]; } } ``` ## Creating an Activity Extend the `Activity` class and implement the `execute()` method to perform the actual work. ```php total <= 0) { throw new \InvalidArgumentException('Order total must be positive'); } // Activities can use non-deterministic operations $order->validated_at = now(); $order->save(); return $order->toArray(); } } ``` ## Starting and Monitoring Workflows Use `WorkflowStub` to create, start, monitor, and retrieve workflow results. ```php start($orderId, $customerId); // Get the workflow ID for tracking $workflowId = $workflow->id(); // Later, load an existing workflow by ID $workflow = WorkflowStub::load($workflowId); // Check workflow status $workflow->running(); // true if still executing $workflow->completed(); // true if finished successfully $workflow->failed(); // true if failed $workflow->status(); // Returns status class name // Wait for completion (in tests or scripts) while ($workflow->running()) { sleep(1); $workflow->fresh(); } // Get the final output $result = $workflow->output(); // ['orderId' => 123, 'paymentId' => 'pay_xxx', 'trackingNumber' => 'TRACK123'] // Resume a failed workflow after fixing the issue $workflow->resume(); ``` ## Parallel Activity Execution Execute multiple activities concurrently using `all()` to wait for all results. ```php $results[0], 'orders' => $results[1], 'recommendations' => $results[2], ]; } } ``` ## Mixing Series and Parallel Execution Combine sequential and parallel execution patterns for complex workflows. ```php $payment['id'], 'shipmentId' => $shipment['id'], ]; } } ``` ## Signals for External Events Use signals to trigger events from outside the workflow and `await()` to pause until conditions are met. ```php approved = true; $this->approverComment = $comment; } #[SignalMethod] public function reject(string $reason): void { $this->approved = false; $this->approverComment = $reason; } public function execute($requestId, $requesterId) { // Send approval request notification yield activity(SendApprovalRequestActivity::class, $requestId, $requesterId); // Wait until approved or rejected signal is received yield await(fn() => $this->approverComment !== null); if ($this->approved) { yield activity(ProcessApprovedRequestActivity::class, $requestId); return ['status' => 'approved', 'comment' => $this->approverComment]; } yield activity(NotifyRejectionActivity::class, $requestId, $this->approverComment); return ['status' => 'rejected', 'reason' => $this->approverComment]; } } // Sending a signal from outside $workflow = WorkflowStub::load($workflowId); $workflow->approve('Looks good, approved!'); // or $workflow->reject('Budget exceeded'); ``` ## Queries for Workflow State Use queries to retrieve workflow state without affecting execution. ```php $this->processedCount, 'total' => $this->totalFiles, 'current' => $this->currentFile, 'percentage' => $this->totalFiles > 0 ? round(($this->processedCount / $this->totalFiles) * 100) : 0, ]; } #[SignalMethod] public function cancel(): void { $this->canceled = true; } private bool $canceled = false; public function execute(array $files) { $this->totalFiles = count($files); foreach ($files as $file) { if ($this->canceled) { return ['status' => 'canceled', 'processed' => $this->processedCount]; } $this->currentFile = $file; yield activity(ProcessFileActivity::class, $file); $this->processedCount++; } return ['status' => 'completed', 'processed' => $this->processedCount]; } } // Query workflow state from outside $workflow = WorkflowStub::load($workflowId); $progress = $workflow->getProgress(); // ['processed' => 5, 'total' => 10, 'current' => 'file6.txt', 'percentage' => 50] ``` ## Timers and Delays Suspend workflow execution for a specified duration using timers. ```php paymentConfirmed = true; $this->paymentDetails = $details; } public function execute($orderId, $amount) { // Create pending payment $payment = yield activity(CreatePendingPaymentActivity::class, $orderId, $amount); // Wait up to 30 minutes for payment confirmation $confirmed = yield awaitWithTimeout(1800, fn() => $this->paymentConfirmed); if ($confirmed) { // Payment was confirmed within timeout yield activity(FulfillOrderActivity::class, $orderId, $this->paymentDetails); return ['status' => 'confirmed', 'payment' => $this->paymentDetails]; } // Timeout reached - cancel the payment yield activity(CancelPaymentActivity::class, $payment['id']); yield activity(NotifyPaymentTimeoutActivity::class, $orderId); return ['status' => 'timeout', 'paymentId' => $payment['id']]; } } ``` ## Child Workflows Compose workflows by calling child workflows for modular, reusable workflow components. ```php child(OrderFulfillmentChildWorkflow::class, $orderId), $orderIds ); $shipments = yield all($childWorkflows); // Send combined notification yield activity(SendOrderConfirmationActivity::class, $orderIds, $shipments); return [ 'orderIds' => $orderIds, 'shipments' => $shipments, ]; } } ``` ## Saga Pattern with Compensation Implement the saga pattern with compensation logic for distributed transactions. ```php addCompensation(fn() => activity(CancelFlightActivity::class, $flight['bookingId'])); // Book hotel and register compensation $hotel = yield activity(BookHotelActivity::class, $userId, $hotelId); $this->addCompensation(fn() => activity(CancelHotelActivity::class, $hotel['bookingId'])); // Book car and register compensation $car = yield activity(BookCarActivity::class, $userId, $carId); $this->addCompensation(fn() => activity(CancelCarActivity::class, $car['bookingId'])); // Charge payment $payment = yield activity(ChargePaymentActivity::class, $userId, $flight, $hotel, $car); return [ 'status' => 'success', 'flight' => $flight, 'hotel' => $hotel, 'car' => $car, 'payment' => $payment, ]; } catch (\Throwable $e) { // Run compensations in reverse order yield from $this->compensate(); return [ 'status' => 'failed', 'error' => $e->getMessage(), ]; } } } // Configure compensation behavior class ParallelCompensationWorkflow extends Workflow { public function execute() { // Run compensations in parallel instead of sequentially $this->setParallelCompensation(true); // Continue running compensations even if one fails $this->setContinueWithError(true); // ... rest of workflow } } ``` ## Continue As New Prevent unbounded event history by continuing as a new workflow execution. ```php = $maxIterations) { // Start fresh with reset iteration counter return yield continueAsNew(0, $maxIterations); } // Continue as new with incremented counter return yield continueAsNew($iteration, $maxIterations); } } ``` ## Side Effects Execute non-deterministic operations that should only run once using `sideEffect()`. ```php 'ORD-' . Str::uuid()); // Get deterministic timestamp (use WorkflowStub::now() instead of Carbon::now()) $createdAt = now(); // Another side effect - random confirmation code $confirmationCode = yield sideEffect(fn() => strtoupper(Str::random(8))); yield activity(CreateOrderActivity::class, $orderId, $customerId, $items, $createdAt); yield activity(SendConfirmationActivity::class, $customerId, $confirmationCode); return [ 'orderId' => $orderId, 'confirmationCode' => $confirmationCode, 'createdAt' => $createdAt->toIso8601String(), ]; } } ``` ## Async Closures Execute anonymous functions as child workflows for quick inline operations. ```php yield activity(LegacyPaymentActivity::class, $orderId), 1 => yield activity(PaymentActivityV1::class, $orderId), 2 => yield activity(PaymentActivityV2::class, $orderId), }; // Another versioned change point $notificationVersion = yield getVersion('notification-system', WorkflowStub::DEFAULT_VERSION, 1); if ($notificationVersion === WorkflowStub::DEFAULT_VERSION) { yield activity(SendEmailActivity::class, $orderId); } else { yield activity(SendMultiChannelNotificationActivity::class, $orderId); } return $payment; } } ``` ## Webhooks Expose workflows and signals as HTTP endpoints for external integrations. ```php webhookPayload = $payload; } public function execute($orderId, $amount) { yield activity(InitiatePaymentActivity::class, $orderId, $amount); // Wait for webhook callback yield await(fn() => $this->webhookPayload !== null); return $this->webhookPayload; } } // Register webhook routes in routes/web.php or routes/api.php use Workflow\Webhooks; Webhooks::routes(); // Webhook endpoints are automatically created: // POST /webhooks/start/payment-webhook-workflow // POST /webhooks/signal/payment-webhook-workflow/{workflowId}/handle-payment-callback ``` ```bash # Start workflow via webhook curl -X POST http://your-app.com/webhooks/start/payment-webhook-workflow \ -H "Content-Type: application/json" \ -d '{"orderId": 123, "amount": 99.99}' # Send signal via webhook curl -X POST http://your-app.com/webhooks/signal/payment-webhook-workflow/1/handle-payment-callback \ -H "Content-Type: application/json" \ -d '{"status": "success", "transactionId": "txn_abc123"}' ``` ## Testing Workflows Use the built-in fake and mock system for unit testing workflows. ```php 123, 'total' => 99.99, ]); WorkflowStub::mock(ProcessPaymentActivity::class, [ 'id' => 'pay_123', 'status' => 'succeeded', ]); WorkflowStub::mock(CreateShipmentActivity::class, [ 'tracking' => 'TRACK123', ]); // Create and run workflow $workflow = WorkflowStub::make(OrderProcessingWorkflow::class); $workflow->start(123, 456); // Assert activities were dispatched WorkflowStub::assertDispatched(ValidateOrderActivity::class); WorkflowStub::assertDispatched(ProcessPaymentActivity::class); WorkflowStub::assertDispatched(CreateShipmentActivity::class, 1); // Exactly once // Assert workflow completed with expected output $this->assertTrue($workflow->completed()); $this->assertEquals([ 'orderId' => 123, 'paymentId' => 'pay_123', 'trackingNumber' => 'TRACK123', ], $workflow->output()); } public function test_activity_dispatched_with_arguments() { WorkflowStub::fake(); WorkflowStub::mock(ValidateOrderActivity::class, function ($context, $orderId) { // Dynamic mock based on arguments return ['id' => $orderId, 'validated' => true]; }); $workflow = WorkflowStub::make(OrderProcessingWorkflow::class); $workflow->start(999, 1); // Assert with argument validation WorkflowStub::assertDispatched(ValidateOrderActivity::class, function ($orderId) { return $orderId === 999; }); } } ``` ## Configuration Publish and customize the workflow configuration file. ```bash php artisan vendor:publish --provider="Workflow\Providers\WorkflowServiceProvider" --tag="config" ``` ```php 'Workflows', // Custom model classes (extend for custom database connection) 'stored_workflow_model' => Workflow\Models\StoredWorkflow::class, 'stored_workflow_exception_model' => Workflow\Models\StoredWorkflowException::class, 'stored_workflow_log_model' => Workflow\Models\StoredWorkflowLog::class, 'stored_workflow_signal_model' => Workflow\Models\StoredWorkflowSignal::class, 'stored_workflow_timer_model' => Workflow\Models\StoredWorkflowTimer::class, // Serializer for workflow data 'serializer' => Workflow\Serializers\Y::class, // Auto-prune completed workflows older than this 'prune_age' => '1 month', // Webhook configuration 'webhooks_route' => env('WORKFLOW_WEBHOOKS_ROUTE', 'webhooks'), 'webhook_auth' => [ 'method' => env('WORKFLOW_WEBHOOKS_AUTH_METHOD', 'none'), // none, signature, token, custom 'signature' => [ 'header' => env('WORKFLOW_WEBHOOKS_SIGNATURE_HEADER', 'X-Signature'), 'secret' => env('WORKFLOW_WEBHOOKS_SECRET'), ], 'token' => [ 'header' => env('WORKFLOW_WEBHOOKS_TOKEN_HEADER', 'Authorization'), 'token' => env('WORKFLOW_WEBHOOKS_TOKEN'), ], 'custom' => [ 'class' => env('WORKFLOW_WEBHOOKS_CUSTOM_AUTH_CLASS', null), ], ], ]; ``` ## Activity Options Configure retry behavior, timeouts, queues, and connections for activities. ```php workflowId(); // Send heartbeat for long-running activities $this->heartbeat(); // Get webhook URLs for external callbacks $signalUrl = $this->webhookUrl('paymentReceived'); return $this->processData($data); } } ``` ## Summary Durable Workflow provides a comprehensive solution for building reliable, long-running business processes in Laravel applications. Common use cases include multi-step order processing with payment and fulfillment, approval workflows with human-in-the-loop interactions, scheduled data pipelines and ETL processes, distributed transactions with saga compensation, subscription management and billing workflows, AI agent orchestration, and any process requiring durability across failures. Integration follows a clean pattern: define workflows as generator classes that yield activities, implement activities as discrete units of work, use signals for external events, queries for state inspection, and timers for delays. The library integrates seamlessly with Laravel's queue system (supporting Redis, SQS, database, and Beanstalkd), provides built-in testing utilities, webhook endpoints for external systems, and a monitoring UI called Waterline. Workflows can be composed using child workflows, handle failures gracefully with automatic retries and saga compensation, and evolve safely with versioning support for running workflows.