Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Add Docs
Durable Workflow
https://github.com/durable-workflow/workflow
Admin
Durable Workflow is a PHP workflow engine that enables developers to define and manage long-running
...
Tokens:
13,827
Snippets:
163
Trust Score:
8.6
Update:
2 weeks ago
Context
Skills
Chat
Benchmark
91.9
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Durable Workflow Durable Workflow is a powerful durable workflow engine for PHP/Laravel that enables developers to write long-running, persistent, distributed workflows (orchestrations) powered by Laravel Queues. It provides tools for defining complex asynchronous processes such as agentic AI workflows, financial transactions, data pipelines, microservices, job tracking, and user signup flows as sequences of activities that can run in parallel or series. The library is built on Laravel's queue system and database layer to store and manage workflow state using event sourcing. This ensures workflows are scalable, reliable, and can be resumed after failures. Workflows use PHP generators with the `yield` keyword to pause execution and wait for activities to complete, while maintaining determinism through state replay. Activities are individual units of work that perform specific tasks like API calls, data processing, or sending emails. ## Installation Install via Composer and run migrations to set up the workflow tables. ```bash # Install the package composer require laravel-workflow/laravel-workflow # Publish migrations php artisan vendor:publish --provider="Workflow\Providers\WorkflowServiceProvider" --tag="migrations" # Run migrations php artisan migrate # Start queue worker to process workflows php artisan queue:work ``` ## Creating a Workflow Extend the `Workflow` class and implement the `execute()` method as a generator that yields activities. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\activity; class OrderProcessingWorkflow extends Workflow { public function execute($orderId, $customerId) { // Execute activities in sequence $order = yield activity(ValidateOrderActivity::class, $orderId); $payment = yield activity(ProcessPaymentActivity::class, $order, $customerId); $shipment = yield activity(CreateShipmentActivity::class, $order); yield activity(SendConfirmationEmailActivity::class, $customerId, $order, $shipment); return [ 'orderId' => $orderId, 'paymentId' => $payment['id'], 'trackingNumber' => $shipment['tracking'], ]; } } ``` ## Creating an Activity Extend the `Activity` class and implement the `execute()` method to perform the actual work. ```php <?php namespace App\Workflows; use Workflow\Activity; use App\Models\Order; class ValidateOrderActivity extends Activity { // Retry configuration public $tries = 3; public $timeout = 60; // Custom backoff strategy (seconds between retries) public function backoff() { return [1, 5, 10]; } public function execute($orderId) { $order = Order::findOrFail($orderId); if ($order->total <= 0) { throw new \InvalidArgumentException('Order total must be positive'); } // Activities can use non-deterministic operations $order->validated_at = now(); $order->save(); return $order->toArray(); } } ``` ## Starting and Monitoring Workflows Use `WorkflowStub` to create, start, monitor, and retrieve workflow results. ```php <?php use Workflow\WorkflowStub; use App\Workflows\OrderProcessingWorkflow; // Create and start a workflow $workflow = WorkflowStub::make(OrderProcessingWorkflow::class); $workflow->start($orderId, $customerId); // Get the workflow ID for tracking $workflowId = $workflow->id(); // Later, load an existing workflow by ID $workflow = WorkflowStub::load($workflowId); // Check workflow status $workflow->running(); // true if still executing $workflow->completed(); // true if finished successfully $workflow->failed(); // true if failed $workflow->status(); // Returns status class name // Wait for completion (in tests or scripts) while ($workflow->running()) { sleep(1); $workflow->fresh(); } // Get the final output $result = $workflow->output(); // ['orderId' => 123, 'paymentId' => 'pay_xxx', 'trackingNumber' => 'TRACK123'] // Resume a failed workflow after fixing the issue $workflow->resume(); ``` ## Parallel Activity Execution Execute multiple activities concurrently using `all()` to wait for all results. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, all}; class DataEnrichmentWorkflow extends Workflow { public function execute($userId) { // Start all activities immediately (they run in parallel) $profilePromise = activity(FetchUserProfileActivity::class, $userId); $ordersPromise = activity(FetchUserOrdersActivity::class, $userId); $recommendationsPromise = activity(GenerateRecommendationsActivity::class, $userId); // Wait for all to complete and collect results $results = yield all([ $profilePromise, $ordersPromise, $recommendationsPromise, ]); return [ 'profile' => $results[0], 'orders' => $results[1], 'recommendations' => $results[2], ]; } } ``` ## Mixing Series and Parallel Execution Combine sequential and parallel execution patterns for complex workflows. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, all}; class ECommerceCheckoutWorkflow extends Workflow { public function execute($cartId, $userId) { // Step 1: Validate cart (must complete first) $cart = yield activity(ValidateCartActivity::class, $cartId); // Step 2: Run inventory check and fraud detection in parallel [$inventory, $fraudCheck] = yield all([ activity(CheckInventoryActivity::class, $cart), activity(FraudDetectionActivity::class, $userId, $cart), ]); // Step 3: Process payment (after validations pass) $payment = yield activity(ProcessPaymentActivity::class, $cart, $userId); // Step 4: Run fulfillment tasks in parallel [$shipment, $notification, $analytics] = yield all([ activity(CreateShipmentActivity::class, $cart, $payment), activity(SendNotificationActivity::class, $userId, $payment), activity(UpdateAnalyticsActivity::class, $cart, $payment), ]); return [ 'paymentId' => $payment['id'], 'shipmentId' => $shipment['id'], ]; } } ``` ## Signals for External Events Use signals to trigger events from outside the workflow and `await()` to pause until conditions are met. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Workflow\SignalMethod; use function Workflow\{activity, await}; class ApprovalWorkflow extends Workflow { private bool $approved = false; private ?string $approverComment = null; #[SignalMethod] public function approve(string $comment = ''): void { $this->approved = true; $this->approverComment = $comment; } #[SignalMethod] public function reject(string $reason): void { $this->approved = false; $this->approverComment = $reason; } public function execute($requestId, $requesterId) { // Send approval request notification yield activity(SendApprovalRequestActivity::class, $requestId, $requesterId); // Wait until approved or rejected signal is received yield await(fn() => $this->approverComment !== null); if ($this->approved) { yield activity(ProcessApprovedRequestActivity::class, $requestId); return ['status' => 'approved', 'comment' => $this->approverComment]; } yield activity(NotifyRejectionActivity::class, $requestId, $this->approverComment); return ['status' => 'rejected', 'reason' => $this->approverComment]; } } // Sending a signal from outside $workflow = WorkflowStub::load($workflowId); $workflow->approve('Looks good, approved!'); // or $workflow->reject('Budget exceeded'); ``` ## Queries for Workflow State Use queries to retrieve workflow state without affecting execution. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Workflow\QueryMethod; use Workflow\SignalMethod; use function Workflow\{activity, await}; class FileProcessingWorkflow extends Workflow { private int $processedCount = 0; private int $totalFiles = 0; private string $currentFile = ''; #[QueryMethod] public function getProgress(): array { return [ 'processed' => $this->processedCount, 'total' => $this->totalFiles, 'current' => $this->currentFile, 'percentage' => $this->totalFiles > 0 ? round(($this->processedCount / $this->totalFiles) * 100) : 0, ]; } #[SignalMethod] public function cancel(): void { $this->canceled = true; } private bool $canceled = false; public function execute(array $files) { $this->totalFiles = count($files); foreach ($files as $file) { if ($this->canceled) { return ['status' => 'canceled', 'processed' => $this->processedCount]; } $this->currentFile = $file; yield activity(ProcessFileActivity::class, $file); $this->processedCount++; } return ['status' => 'completed', 'processed' => $this->processedCount]; } } // Query workflow state from outside $workflow = WorkflowStub::load($workflowId); $progress = $workflow->getProgress(); // ['processed' => 5, 'total' => 10, 'current' => 'file6.txt', 'percentage' => 50] ``` ## Timers and Delays Suspend workflow execution for a specified duration using timers. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, timer, minutes, hours, days}; class SubscriptionRenewalWorkflow extends Workflow { public function execute($subscriptionId, $userId) { // Send reminder 3 days before expiration yield activity(SendRenewalReminderActivity::class, $userId); // Wait for 2 days yield days(2); // Send final reminder yield activity(SendFinalReminderActivity::class, $userId); // Wait for 1 day yield timer(86400); // 86400 seconds = 1 day // Process automatic renewal $result = yield activity(ProcessRenewalActivity::class, $subscriptionId); // Wait 1 hour before sending confirmation yield hours(1); yield activity(SendConfirmationActivity::class, $userId, $result); return $result; } } ``` ## Await with Timeout Wait for a signal or condition with a timeout, continuing if the timeout expires. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Workflow\SignalMethod; use function Workflow\{activity, awaitWithTimeout, minutes}; class PaymentConfirmationWorkflow extends Workflow { private bool $paymentConfirmed = false; private ?array $paymentDetails = null; #[SignalMethod] public function confirmPayment(array $details): void { $this->paymentConfirmed = true; $this->paymentDetails = $details; } public function execute($orderId, $amount) { // Create pending payment $payment = yield activity(CreatePendingPaymentActivity::class, $orderId, $amount); // Wait up to 30 minutes for payment confirmation $confirmed = yield awaitWithTimeout(1800, fn() => $this->paymentConfirmed); if ($confirmed) { // Payment was confirmed within timeout yield activity(FulfillOrderActivity::class, $orderId, $this->paymentDetails); return ['status' => 'confirmed', 'payment' => $this->paymentDetails]; } // Timeout reached - cancel the payment yield activity(CancelPaymentActivity::class, $payment['id']); yield activity(NotifyPaymentTimeoutActivity::class, $orderId); return ['status' => 'timeout', 'paymentId' => $payment['id']]; } } ``` ## Child Workflows Compose workflows by calling child workflows for modular, reusable workflow components. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, child, all}; class OrderFulfillmentChildWorkflow extends Workflow { public function execute($orderId) { $inventory = yield activity(ReserveInventoryActivity::class, $orderId); $package = yield activity(PackageOrderActivity::class, $orderId, $inventory); $shipment = yield activity(ShipPackageActivity::class, $package); return $shipment; } } class MasterOrderWorkflow extends Workflow { public function execute(array $orderIds) { // Process payment first yield activity(ValidatePaymentActivity::class, $orderIds); // Execute child workflows in parallel for each order $childWorkflows = array_map( fn($orderId) => child(OrderFulfillmentChildWorkflow::class, $orderId), $orderIds ); $shipments = yield all($childWorkflows); // Send combined notification yield activity(SendOrderConfirmationActivity::class, $orderIds, $shipments); return [ 'orderIds' => $orderIds, 'shipments' => $shipments, ]; } } ``` ## Saga Pattern with Compensation Implement the saga pattern with compensation logic for distributed transactions. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\activity; class BookingWorkflow extends Workflow { public function execute($userId, $flightId, $hotelId, $carId) { try { // Book flight and register compensation $flight = yield activity(BookFlightActivity::class, $userId, $flightId); $this->addCompensation(fn() => activity(CancelFlightActivity::class, $flight['bookingId'])); // Book hotel and register compensation $hotel = yield activity(BookHotelActivity::class, $userId, $hotelId); $this->addCompensation(fn() => activity(CancelHotelActivity::class, $hotel['bookingId'])); // Book car and register compensation $car = yield activity(BookCarActivity::class, $userId, $carId); $this->addCompensation(fn() => activity(CancelCarActivity::class, $car['bookingId'])); // Charge payment $payment = yield activity(ChargePaymentActivity::class, $userId, $flight, $hotel, $car); return [ 'status' => 'success', 'flight' => $flight, 'hotel' => $hotel, 'car' => $car, 'payment' => $payment, ]; } catch (\Throwable $e) { // Run compensations in reverse order yield from $this->compensate(); return [ 'status' => 'failed', 'error' => $e->getMessage(), ]; } } } // Configure compensation behavior class ParallelCompensationWorkflow extends Workflow { public function execute() { // Run compensations in parallel instead of sequentially $this->setParallelCompensation(true); // Continue running compensations even if one fails $this->setContinueWithError(true); // ... rest of workflow } } ``` ## Continue As New Prevent unbounded event history by continuing as a new workflow execution. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, continueAsNew, timer}; class LongRunningPollerWorkflow extends Workflow { public function execute(int $iteration = 0, int $maxIterations = 100) { // Poll for new data $data = yield activity(PollForDataActivity::class); if ($data) { yield activity(ProcessDataActivity::class, $data); } // Wait before next poll yield timer(60); // Wait 1 minute $iteration++; // Continue as new workflow after max iterations to prevent history growth if ($iteration >= $maxIterations) { // Start fresh with reset iteration counter return yield continueAsNew(0, $maxIterations); } // Continue as new with incremented counter return yield continueAsNew($iteration, $maxIterations); } } ``` ## Side Effects Execute non-deterministic operations that should only run once using `sideEffect()`. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Illuminate\Support\Str; use function Workflow\{activity, sideEffect, now}; class OrderCreationWorkflow extends Workflow { public function execute($customerId, array $items) { // Generate a unique order ID (only runs once, cached on replay) $orderId = yield sideEffect(fn() => 'ORD-' . Str::uuid()); // Get deterministic timestamp (use WorkflowStub::now() instead of Carbon::now()) $createdAt = now(); // Another side effect - random confirmation code $confirmationCode = yield sideEffect(fn() => strtoupper(Str::random(8))); yield activity(CreateOrderActivity::class, $orderId, $customerId, $items, $createdAt); yield activity(SendConfirmationActivity::class, $customerId, $confirmationCode); return [ 'orderId' => $orderId, 'confirmationCode' => $confirmationCode, 'createdAt' => $createdAt->toIso8601String(), ]; } } ``` ## Async Closures Execute anonymous functions as child workflows for quick inline operations. ```php <?php namespace App\Workflows; use Workflow\Workflow; use function Workflow\{activity, async}; class ReportGenerationWorkflow extends Workflow { public function execute($reportId) { // Execute a closure as a child workflow $results = yield async(function () use ($reportId) { // This closure runs as a separate child workflow $data = yield activity(FetchReportDataActivity::class, $reportId); $processed = yield activity(ProcessReportDataActivity::class, $data); $formatted = yield activity(FormatReportActivity::class, $processed); return $formatted; }); yield activity(SaveReportActivity::class, $reportId, $results); return $results; } } ``` ## Workflow Versioning Handle workflow code changes safely with versioning to support running workflows. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Workflow\WorkflowStub; use function Workflow\{activity, getVersion}; class EvolvingWorkflow extends Workflow { public function execute($orderId) { // Get version for this change point (min: -1/default, max: 2) $version = yield getVersion('payment-processing', WorkflowStub::DEFAULT_VERSION, 2); // Route to appropriate activity based on version $payment = match ($version) { WorkflowStub::DEFAULT_VERSION => yield activity(LegacyPaymentActivity::class, $orderId), 1 => yield activity(PaymentActivityV1::class, $orderId), 2 => yield activity(PaymentActivityV2::class, $orderId), }; // Another versioned change point $notificationVersion = yield getVersion('notification-system', WorkflowStub::DEFAULT_VERSION, 1); if ($notificationVersion === WorkflowStub::DEFAULT_VERSION) { yield activity(SendEmailActivity::class, $orderId); } else { yield activity(SendMultiChannelNotificationActivity::class, $orderId); } return $payment; } } ``` ## Webhooks Expose workflows and signals as HTTP endpoints for external integrations. ```php <?php namespace App\Workflows; use Workflow\Workflow; use Workflow\Webhook; use Workflow\SignalMethod; use function Workflow\{activity, await}; #[Webhook] // Enable webhook for starting this workflow class PaymentWebhookWorkflow extends Workflow { private ?array $webhookPayload = null; #[SignalMethod] #[Webhook] // Enable webhook for this signal public function handlePaymentCallback(array $payload): void { $this->webhookPayload = $payload; } public function execute($orderId, $amount) { yield activity(InitiatePaymentActivity::class, $orderId, $amount); // Wait for webhook callback yield await(fn() => $this->webhookPayload !== null); return $this->webhookPayload; } } // Register webhook routes in routes/web.php or routes/api.php use Workflow\Webhooks; Webhooks::routes(); // Webhook endpoints are automatically created: // POST /webhooks/start/payment-webhook-workflow // POST /webhooks/signal/payment-webhook-workflow/{workflowId}/handle-payment-callback ``` ```bash # Start workflow via webhook curl -X POST http://your-app.com/webhooks/start/payment-webhook-workflow \ -H "Content-Type: application/json" \ -d '{"orderId": 123, "amount": 99.99}' # Send signal via webhook curl -X POST http://your-app.com/webhooks/signal/payment-webhook-workflow/1/handle-payment-callback \ -H "Content-Type: application/json" \ -d '{"status": "success", "transactionId": "txn_abc123"}' ``` ## Testing Workflows Use the built-in fake and mock system for unit testing workflows. ```php <?php namespace Tests\Feature; use Tests\TestCase; use Workflow\WorkflowStub; use App\Workflows\OrderProcessingWorkflow; use App\Workflows\ValidateOrderActivity; use App\Workflows\ProcessPaymentActivity; use App\Workflows\CreateShipmentActivity; class OrderWorkflowTest extends TestCase { public function test_order_processing_workflow() { // Enable fake mode WorkflowStub::fake(); // Mock activity results WorkflowStub::mock(ValidateOrderActivity::class, [ 'id' => 123, 'total' => 99.99, ]); WorkflowStub::mock(ProcessPaymentActivity::class, [ 'id' => 'pay_123', 'status' => 'succeeded', ]); WorkflowStub::mock(CreateShipmentActivity::class, [ 'tracking' => 'TRACK123', ]); // Create and run workflow $workflow = WorkflowStub::make(OrderProcessingWorkflow::class); $workflow->start(123, 456); // Assert activities were dispatched WorkflowStub::assertDispatched(ValidateOrderActivity::class); WorkflowStub::assertDispatched(ProcessPaymentActivity::class); WorkflowStub::assertDispatched(CreateShipmentActivity::class, 1); // Exactly once // Assert workflow completed with expected output $this->assertTrue($workflow->completed()); $this->assertEquals([ 'orderId' => 123, 'paymentId' => 'pay_123', 'trackingNumber' => 'TRACK123', ], $workflow->output()); } public function test_activity_dispatched_with_arguments() { WorkflowStub::fake(); WorkflowStub::mock(ValidateOrderActivity::class, function ($context, $orderId) { // Dynamic mock based on arguments return ['id' => $orderId, 'validated' => true]; }); $workflow = WorkflowStub::make(OrderProcessingWorkflow::class); $workflow->start(999, 1); // Assert with argument validation WorkflowStub::assertDispatched(ValidateOrderActivity::class, function ($orderId) { return $orderId === 999; }); } } ``` ## Configuration Publish and customize the workflow configuration file. ```bash php artisan vendor:publish --provider="Workflow\Providers\WorkflowServiceProvider" --tag="config" ``` ```php <?php // config/workflows.php return [ // Folder where workflow classes are stored 'workflows_folder' => 'Workflows', // Custom model classes (extend for custom database connection) 'stored_workflow_model' => Workflow\Models\StoredWorkflow::class, 'stored_workflow_exception_model' => Workflow\Models\StoredWorkflowException::class, 'stored_workflow_log_model' => Workflow\Models\StoredWorkflowLog::class, 'stored_workflow_signal_model' => Workflow\Models\StoredWorkflowSignal::class, 'stored_workflow_timer_model' => Workflow\Models\StoredWorkflowTimer::class, // Serializer for workflow data 'serializer' => Workflow\Serializers\Y::class, // Auto-prune completed workflows older than this 'prune_age' => '1 month', // Webhook configuration 'webhooks_route' => env('WORKFLOW_WEBHOOKS_ROUTE', 'webhooks'), 'webhook_auth' => [ 'method' => env('WORKFLOW_WEBHOOKS_AUTH_METHOD', 'none'), // none, signature, token, custom 'signature' => [ 'header' => env('WORKFLOW_WEBHOOKS_SIGNATURE_HEADER', 'X-Signature'), 'secret' => env('WORKFLOW_WEBHOOKS_SECRET'), ], 'token' => [ 'header' => env('WORKFLOW_WEBHOOKS_TOKEN_HEADER', 'Authorization'), 'token' => env('WORKFLOW_WEBHOOKS_TOKEN'), ], 'custom' => [ 'class' => env('WORKFLOW_WEBHOOKS_CUSTOM_AUTH_CLASS', null), ], ], ]; ``` ## Activity Options Configure retry behavior, timeouts, queues, and connections for activities. ```php <?php namespace App\Workflows; use Workflow\Activity; class RobustActivity extends Activity { // Number of retry attempts (0 = infinite) public $tries = 5; // Maximum execution time in seconds (0 = no limit) public $timeout = 120; // Queue connection public $connection = 'redis'; // Queue name public $queue = 'high-priority'; // Custom backoff strategy (seconds between retries) public function backoff() { return [1, 5, 15, 30, 60]; // Escalating delays } public function execute($data) { // Access the workflow ID from within an activity $workflowId = $this->workflowId(); // Send heartbeat for long-running activities $this->heartbeat(); // Get webhook URLs for external callbacks $signalUrl = $this->webhookUrl('paymentReceived'); return $this->processData($data); } } ``` ## Summary Durable Workflow provides a comprehensive solution for building reliable, long-running business processes in Laravel applications. Common use cases include multi-step order processing with payment and fulfillment, approval workflows with human-in-the-loop interactions, scheduled data pipelines and ETL processes, distributed transactions with saga compensation, subscription management and billing workflows, AI agent orchestration, and any process requiring durability across failures. Integration follows a clean pattern: define workflows as generator classes that yield activities, implement activities as discrete units of work, use signals for external events, queries for state inspection, and timers for delays. The library integrates seamlessly with Laravel's queue system (supporting Redis, SQS, database, and Beanstalkd), provides built-in testing utilities, webhook endpoints for external systems, and a monitoring UI called Waterline. Workflows can be composed using child workflows, handle failures gracefully with automatic retries and saga compensation, and evolve safely with versioning support for running workflows.