# Symfony Messenger Component The Symfony Messenger component provides a robust message bus implementation for PHP applications, enabling them to send and receive messages to/from other applications or via message queues. It supports synchronous and asynchronous message handling, making it ideal for implementing CQRS (Command Query Responsibility Segregation) patterns, event-driven architectures, and background job processing. The component features a flexible middleware architecture that allows customization of message handling, including validation, transaction management, and retry strategies. It integrates seamlessly with various message brokers through transport abstraction, supports batch processing, message deduplication, and provides comprehensive error handling with automatic retry mechanisms. The component requires PHP 8.4+ and integrates with the broader Symfony ecosystem. ## MessageBus - Core Message Dispatcher The `MessageBus` is the central component that dispatches messages through a chain of middleware handlers. It accepts any object as a message and optionally wraps it in an `Envelope` with stamps that control middleware behavior. The bus returns an `Envelope` containing the message and any stamps added during processing. ```php recipient, $message->subject, $message->body); return 'Email sent to ' . $message->recipient; }; // Configure the handlers locator $handlersLocator = new HandlersLocator([ SendEmailNotification::class => [$emailHandler], ]); // Create the message bus with middleware $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // Dispatch a message synchronously $envelope = $bus->dispatch(new SendEmailNotification( recipient: 'user@example.com', subject: 'Welcome!', body: 'Thank you for signing up.' )); // Dispatch with stamps (e.g., delay delivery by 5 seconds) $envelope = $bus->dispatch( new SendEmailNotification('user@example.com', 'Reminder', 'Your trial ends soon.'), [new DelayStamp(5000)] // 5000ms delay ); // Access handler results via HandledStamp use Symfony\Component\Messenger\Stamp\HandledStamp; $handledStamp = $envelope->last(HandledStamp::class); $result = $handledStamp?->getResult(); // 'Email sent to user@example.com' ``` ## Envelope - Message Wrapper with Stamps The `Envelope` class wraps messages with stamps (metadata) that control how middleware processes the message. Envelopes are immutable - methods like `with()` and `withoutAll()` return new instances. Stamps can be used for routing, delays, validation groups, and tracking handler results. ```php with( new TransportNamesStamp(['high-priority']), new ValidationStamp(['Default']) ); // Remove stamps by type $envelope = $envelope->withoutAll(DelayStamp::class); $envelope = $envelope->withoutStampsOfType(DelayStamp::class); // Also removes subclasses // Retrieve stamps $delayStamp = $envelope->last(DelayStamp::class); // Get last stamp of type $allDelayStamps = $envelope->all(DelayStamp::class); // Get all stamps of type $allStamps = $envelope->all(); // Get all stamps grouped by type // Get the wrapped message $originalMessage = $envelope->getMessage(); // OrderPlaced instance // Check handler results after dispatch if ($handledStamp = $envelope->last(HandledStamp::class)) { $result = $handledStamp->getResult(); $handlerName = $handledStamp->getHandlerName(); } ``` ## AsMessageHandler Attribute - Handler Configuration The `#[AsMessageHandler]` attribute configures classes or methods as message handlers with options for bus selection, transport filtering, priority, and more. The handler method receives the message object and can return a result that's captured in a `HandledStamp`. ```php email echo "Sending welcome email to user {$event->userId}"; } } // Handler with configuration options #[AsMessageHandler( bus: 'event.bus', // Only handle from this bus fromTransport: 'high-priority', // Only handle from this transport priority: 10, // Higher priority = handled first handles: UserRegistered::class, // Explicit message type method: 'handleRegistration' // Custom method name )] class NotificationHandler { public function handleRegistration(UserRegistered $event): void { // Process the event } } // Multiple handlers in one class using repeatable attribute #[AsMessageHandler(handles: UserRegistered::class, method: 'onUserRegistered')] #[AsMessageHandler(handles: OrderCreated::class, method: 'onOrderCreated')] class AuditLogHandler { public function onUserRegistered(UserRegistered $event): void { // Log user registration } public function onOrderCreated(OrderCreated $event): void { // Log order creation } } // Method-level attribute for invokable classes class InvoiceHandler { #[AsMessageHandler] public function processInvoice(OrderCreated $order): string { // Generate invoice return "Invoice generated for order {$order->orderId}"; } } ``` ## AsMessage Attribute - Message Routing Configuration The `#[AsMessage]` attribute configures message routing to specific transports and serialization options directly on message classes. This provides a declarative way to define where messages should be sent without external configuration. ```php handle(new GetUserById($userId)); } } // Controller using the query service class UserController { public function __construct( private UserQueryService $queryService ) {} public function show(int $id): array { try { $user = $this->queryService->findUser($id); return [ 'id' => $user->id, 'name' => $user->name, 'email' => $user->email, ]; } catch (\Symfony\Component\Messenger\Exception\LogicException $e) { // Handle case: no handler or multiple handlers throw new \RuntimeException('Failed to fetch user: ' . $e->getMessage()); } } } // Example handler class GetUserByIdHandler { public function __invoke(GetUserById $query): UserDto { // Fetch from database return new UserDto( id: $query->userId, name: 'John Doe', email: 'john@example.com' ); } } ``` ## Worker - Async Message Consumer The `Worker` class consumes messages from transports (queues) and dispatches them to the message bus. It supports multiple receivers, rate limiting, and provides events for monitoring and controlling the consumption process. ```php [function (ProcessImageUpload $message) { echo "Processing image: {$message->imageId}\n"; // Resize, optimize, generate thumbnails... return "Processed: {$message->path}"; }], ]); $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // Create worker with receivers $worker = new Worker( receivers: ['async' => $transport], bus: $bus, eventDispatcher: null, // Optional EventDispatcherInterface logger: null, // Optional LoggerInterface rateLimiters: null // Optional rate limiters per transport ); // Run the worker (blocks until stopped) $worker->run([ 'sleep' => 1000000, // Microseconds to sleep when no messages (1 second) 'queues' => null, // Specific queue names or null for all 'time_limit' => 3600, // Stop after N seconds 'fetch_size' => 10, // Number of messages to fetch per iteration ]); // Worker can be stopped programmatically $worker->stop(); // Send keepalive for long-running tasks $worker->keepalive(seconds: 30); // Access worker metadata $metadata = $worker->getMetadata(); $transportNames = $metadata->getTransportNames(); ``` ## HandlersLocator - Handler Registry The `HandlersLocator` maps message types to their handlers. It supports mapping by class name, parent classes, interfaces, and wildcard patterns. Handlers are returned as `HandlerDescriptor` objects which contain the callable and options. ```php [ $sendWelcomeEmail, new HandlerDescriptor( $sendWelcomeEmail, ['from_transport' => 'high-priority', 'alias' => 'welcome-mailer'] ), ], // Interface-based handler (matches all DomainEvent implementations) DomainEvent::class => [$logAllEvents], // Parent class handler (matches UserRegistered, UserUpdated) UserEvent::class => [[$auditHandler, 'handle']], // Wildcard handler for entire namespace 'App\Message\*' => [function (object $message) { echo "Handling message from App\Message namespace\n"; }], // Catch-all handler '*' => [function (object $message) { echo "Fallback handler for: " . $message::class . "\n"; }], ]); // Use with HandleMessageMiddleware use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\MessageBus; $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // All matching handlers will be called in order $bus->dispatch(new UserRegistered()); // Output: // Welcome email sent // Event logged: UserRegistered // Audit: User event recorded ``` ## DelayStamp - Delayed Message Delivery The `DelayStamp` schedules message delivery for a future time. It supports specifying delays in milliseconds, by DateInterval, or until a specific DateTime. The transport uses this stamp to delay processing. ```php dispatch( new SendReminder(1, 'Your subscription expires tomorrow'), [new DelayStamp(60000)] // 60 seconds ); // Delay using DateInterval $inOneHour = DelayStamp::delayFor(new \DateInterval('PT1H')); $envelope = $bus->dispatch( new SendReminder(2, 'Weekly report ready'), [$inOneHour] ); // Delay by days $inThreeDays = DelayStamp::delayFor(new \DateInterval('P3D')); $envelope = $bus->dispatch( new SendReminder(3, 'Trial ending soon'), [$inThreeDays] ); // Delay until specific time $tomorrow9am = new \DateTimeImmutable('tomorrow 9:00'); $untilTomorrow = DelayStamp::delayUntil($tomorrow9am); $envelope = $bus->dispatch( new SendReminder(4, 'Good morning! Check your tasks'), [$untilTomorrow] ); // Combine with other stamps use Symfony\Component\Messenger\Stamp\TransportNamesStamp; $envelope = $bus->dispatch( new SendReminder(5, 'Follow up on order'), [ new DelayStamp(3600000), // 1 hour delay new TransportNamesStamp(['notifications']), // Specific transport ] ); // Retrieve delay from envelope $stamp = $envelope->last(DelayStamp::class); $delayMs = $stamp?->getDelay(); // milliseconds ``` ## TransportNamesStamp - Override Message Routing The `TransportNamesStamp` allows overriding the default transport routing configured in the Messenger routing configuration. This is useful for dynamically routing messages based on runtime conditions. ```php dispatch($order, [ new TransportNamesStamp('high-priority'), ]); // Route to multiple transports $bus->dispatch($order, [ new TransportNamesStamp(['async', 'audit-log', 'analytics']), ]); // Dynamic routing based on business logic function dispatchOrder(ProcessOrder $order, MessageBusInterface $bus): void { $transports = ['async']; if ($order->isPriority) { $transports = ['high-priority']; } if ($order->amount > 1000) { $transports[] = 'fraud-check'; } $transports[] = 'audit-log'; // Always log $bus->dispatch($order, [ new TransportNamesStamp(array_unique($transports)), ]); } // Retrieve transport names from envelope $envelope = $bus->dispatch($order, [ new TransportNamesStamp(['async', 'notifications']), ]); $stamp = $envelope->last(TransportNamesStamp::class); $names = $stamp?->getTransportNames(); // ['async', 'notifications'] ``` ## DispatchAfterCurrentBusStamp - Deferred Dispatch The `DispatchAfterCurrentBusStamp` marks messages to be handled after the current message handler completes. This ensures that sub-dispatched messages only run after the main transaction commits, preventing issues with database consistency. ```php createOrderInDatabase($command); // These messages will be handled AFTER this handler completes // and the database transaction is committed $this->bus->dispatch( new SendOrderConfirmation($orderId), [new DispatchAfterCurrentBusStamp()] ); $this->bus->dispatch( new UpdateInventory($command->items), [new DispatchAfterCurrentBusStamp()] ); return $orderId; } private function createOrderInDatabase(CreateOrder $command): int { // Database insert logic return 12345; } } // Configure bus with DispatchAfterCurrentBusMiddleware $handlersLocator = new HandlersLocator([ CreateOrder::class => [new CreateOrderHandler($bus)], SendOrderConfirmation::class => [function ($msg) { echo "Confirmation sent\n"; }], UpdateInventory::class => [function ($msg) { echo "Inventory updated\n"; }], ]); $bus = new MessageBus([ // Must be before any transaction middleware new DispatchAfterCurrentBusMiddleware(), new HandleMessageMiddleware($handlersLocator), ]); // Dispatch - secondary messages handled after main handler completes $bus->dispatch(new CreateOrder(['item1', 'item2'], 'customer-123')); // Output order: // 1. CreateOrderHandler runs, returns orderId // 2. SendOrderConfirmation handled // 3. UpdateInventory handled ``` ## DeduplicateStamp - Message Deduplication The `DeduplicateStamp` prevents duplicate messages from being processed using a lock-based mechanism. It requires the Symfony Lock component and supports configurable TTL and queue-only deduplication modes. ```php dispatch( new SyncExternalData('resource-123', 'api'), [new DeduplicateStamp('sync-resource-123')] ); // Custom TTL (lock expires after 600 seconds) $bus->dispatch( new SyncExternalData('resource-456', 'api'), [new DeduplicateStamp( key: 'sync-resource-456', ttl: 600.0 // seconds )] ); // Deduplication only while message is in queue (not during processing) $bus->dispatch( new ProcessWebhook('webhook-789', ['event' => 'updated']), [new DeduplicateStamp( key: 'webhook-789', ttl: 300.0, onlyDeduplicateInQueue: true )] ); // Using Lock Key object for advanced scenarios $lockKey = new Key('custom-lock-key'); $bus->dispatch( new SyncExternalData('resource-999', 'webhook'), [new DeduplicateStamp($lockKey, ttl: 120.0)] ); // Dynamic key generation based on message content function dispatchWithDeduplication( SyncExternalData $message, MessageBusInterface $bus ): void { $key = sprintf('sync-%s-%s', $message->source, $message->resourceId); $bus->dispatch($message, [ new DeduplicateStamp($key, ttl: 300.0), ]); } // Retrieve stamp info $envelope = $bus->dispatch( new SyncExternalData('test', 'api'), [new DeduplicateStamp('test-key', 180.0)] ); $stamp = $envelope->last(DeduplicateStamp::class); $key = $stamp?->getKey(); // Lock\Key instance $ttl = $stamp?->getTtl(); // 180.0 $queueOnly = $stamp?->onlyDeduplicateInQueue(); // false ``` ## MultiplierRetryStrategy - Exponential Backoff Retry The `MultiplierRetryStrategy` implements retry logic with configurable exponential backoff, jitter, and maximum delay. It determines whether a message should be retried and calculates the waiting time between attempts. ```php isRetryable($envelope, $exception)) { $waitTime = $exponentialStrategy->getWaitingTime($envelope, $exception); echo "Retry in {$waitTime}ms\n"; // ~1000ms for first retry } // After adding RedeliveryStamp (simulating retry count) $envelopeAfterRetry = $envelope->with(new RedeliveryStamp(1)); // 1st retry done $waitTime = $exponentialStrategy->getWaitingTime($envelopeAfterRetry, $exception); echo "Wait time after 1 retry: {$waitTime}ms\n"; // ~2000ms $envelopeAfter3Retries = $envelope->with(new RedeliveryStamp(3)); $waitTime = $exponentialStrategy->getWaitingTime($envelopeAfter3Retries, $exception); echo "Wait time after 3 retries: {$waitTime}ms\n"; // ~8000ms // Check retry exhaustion $envelopeAfter5Retries = $envelope->with(new RedeliveryStamp(5)); $canRetry = $exponentialStrategy->isRetryable($envelopeAfter5Retries, $exception); echo "Can retry after 5 attempts: " . ($canRetry ? 'yes' : 'no') . "\n"; // no ``` ## InMemoryTransport - Testing Transport The `InMemoryTransport` is a transport implementation that stores messages in memory, making it ideal for testing. It tracks sent, acknowledged, and rejected messages, and supports delayed message simulation. ```php send(new Envelope(new OrderCreated(1))); $envelope2 = $transport->send(new Envelope(new OrderCreated(2))); $envelope3 = $transport->send(new Envelope(new EmailSent('test@example.com'))); // Messages get auto-assigned TransportMessageIdStamp $messageId = $envelope1->last(TransportMessageIdStamp::class)?->getId(); echo "Message ID: {$messageId}\n"; // 1 // Retrieve messages (simulates consuming from queue) $messages = $transport->get(); // Returns array of Envelopes echo "Messages in queue: " . count(iterator_to_array($messages)) . "\n"; // 3 // Acknowledge and reject foreach ($transport->get() as $envelope) { if ($envelope->getMessage() instanceof OrderCreated) { $transport->ack($envelope); // Mark as successfully processed } else { $transport->reject($envelope); // Mark as failed } } // Inspect transport state for assertions $sent = $transport->getSent(); // All sent messages $acknowledged = $transport->getAcknowledged(); // Successfully processed $rejected = $transport->getRejected(); // Failed messages echo "Sent: " . count($sent) . "\n"; // 3 echo "Acknowledged: " . count($acknowledged) . "\n"; // 2 (OrderCreated) echo "Rejected: " . count($rejected) . "\n"; // 1 (EmailSent) // Testing delayed messages with MockClock $clock = new MockClock('2024-01-15 10:00:00'); $transportWithClock = new InMemoryTransport(clock: $clock); $transportWithClock->send(new Envelope( new OrderCreated(100), [new DelayStamp(60000)] // 60 second delay )); // Message not available yet $available = $transportWithClock->get(); echo "Available now: " . count(iterator_to_array($available)) . "\n"; // 0 // Advance time $clock->modify('+2 minutes'); // Now available $available = $transportWithClock->get(); echo "Available after 2 min: " . count(iterator_to_array($available)) . "\n"; // 1 // Reset transport between tests $transport->reset(); echo "After reset: " . count($transport->getSent()) . "\n"; // 0 ``` ## ReceiverInterface and SenderInterface - Transport Contracts The `ReceiverInterface` and `SenderInterface` define the contracts for receiving messages from and sending messages to transports. These interfaces allow creating custom transport implementations for different message brokers. ```php redis = $redis; $this->queue = $queue; } /** * Receive messages from the transport. * * @param int $fetchSize Hint about how many messages to fetch * @return iterable * @throws TransportException */ public function get(int $fetchSize = 1): iterable { try { for ($i = 0; $i < $fetchSize; $i++) { $data = $this->redis->lPop($this->queue); if (!$data) { break; } $decoded = json_decode($data, true); $message = unserialize($decoded['body']); $id = $decoded['id']; yield new Envelope($message, [ new TransportMessageIdStamp($id), ]); } } catch (\RedisException $e) { throw new TransportException('Failed to receive message', 0, $e); } } /** * Acknowledge message was successfully handled. */ public function ack(Envelope $envelope): void { // In Redis LPOP, message is already removed // For other patterns, you'd remove from processing queue $id = $envelope->last(TransportMessageIdStamp::class)?->getId(); $this->redis->hDel('processing', (string) $id); } /** * Reject message (won't be retried). */ public function reject(Envelope $envelope): void { $id = $envelope->last(TransportMessageIdStamp::class)?->getId(); // Move to dead letter queue $this->redis->lPush('dead-letters', json_encode([ 'id' => $id, 'body' => serialize($envelope->getMessage()), 'rejected_at' => time(), ])); $this->redis->hDel('processing', (string) $id); } /** * Send message to the transport. * * @return Envelope With TransportMessageIdStamp added */ public function send(Envelope $envelope): Envelope { $id = uniqid('msg_', true); try { $this->redis->rPush($this->queue, json_encode([ 'id' => $id, 'body' => serialize($envelope->getMessage()), 'sent_at' => time(), ])); } catch (\RedisException $e) { throw new TransportException('Failed to send message', 0, $e); } return $envelope->with(new TransportMessageIdStamp($id)); } } // Usage $redis = new \Redis(); $redis->connect('localhost'); $transport = new RedisTransport($redis, 'my-app-queue'); // Send a message $envelope = $transport->send(new Envelope(new \stdClass())); $messageId = $envelope->last(TransportMessageIdStamp::class)->getId(); // Receive and process foreach ($transport->get(10) as $receivedEnvelope) { try { // Process message... $transport->ack($receivedEnvelope); } catch (\Exception $e) { $transport->reject($receivedEnvelope); } } ``` ## HandlerFailedException - Error Handling The `HandlerFailedException` is thrown when one or more handlers fail while processing a message. It wraps all handler exceptions and provides access to the envelope for retry logic and error recovery. ```php amount > 1000) { throw new \RuntimeException('Amount exceeds limit'); } return 'Card charged'; }; $sendReceipt = function (ProcessPayment $payment) { throw new \RuntimeException('Email service unavailable'); }; $updateInventory = function (ProcessPayment $payment) { return 'Inventory updated'; }; $handlersLocator = new HandlersLocator([ ProcessPayment::class => [$chargeCard, $sendReceipt, $updateInventory], ]); $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); try { $bus->dispatch(new ProcessPayment('order-123', 500.00)); } catch (HandlerFailedException $e) { // Get the envelope with partial results $envelope = $e->getEnvelope(); // Get all exceptions (keyed by handler name) $exceptions = $e->getWrappedExceptions(); echo "Message: {$e->getMessage()}\n"; // "Handling "ProcessPayment" failed: Email service unavailable" foreach ($exceptions as $handlerName => $exception) { echo "Handler '{$handlerName}' failed: {$exception->getMessage()}\n"; } // Check which handlers succeeded via HandledStamp use Symfony\Component\Messenger\Stamp\HandledStamp; $handledStamps = $envelope->all(HandledStamp::class); foreach ($handledStamps as $stamp) { echo "Handler '{$stamp->getHandlerName()}' succeeded with: {$stamp->getResult()}\n"; } // Retry logic based on exception type foreach ($e->getWrappedExceptions() as $handlerName => $exception) { if ($exception instanceof \RuntimeException) { // Log and potentially reschedule error_log("Retryable failure in {$handlerName}: {$exception->getMessage()}"); } } // Get nested exceptions (returns first or can iterate) $firstException = $e->getPrevious(); // Re-throw if critical if (count($exceptions) > 1) { throw $e; // Multiple failures - escalate } } ``` ## RedispatchMessage - Cross-Transport Message Routing The `RedispatchMessage` allows re-routing a message to different transports at runtime. This is useful for forwarding messages between queues or implementing saga patterns where messages need to be sent to specific transports based on processing results. ```php dispatch(new RedispatchMessage( envelope: $order, transportNames: 'eu-processing' )); // Redispatch to multiple transports $bus->dispatch(new RedispatchMessage( envelope: new ShipOrder(123, 'warehouse-A'), transportNames: ['shipping-queue', 'tracking-queue', 'notifications'] )); // Redispatch an envelope with existing stamps $envelope = new Envelope( new ProcessOrder(456, 'US'), [new \Symfony\Component\Messenger\Stamp\DelayStamp(5000)] ); $bus->dispatch(new RedispatchMessage( envelope: $envelope, transportNames: 'us-processing' )); // Saga pattern: route based on processing result class OrderProcessedHandler { public function __construct( private MessageBusInterface $bus ) {} public function __invoke(ProcessOrder $command): void { // Process the order $result = $this->processOrder($command); // Route to appropriate transport based on result $transport = match ($result['status']) { 'approved' => 'fulfillment-queue', 'review' => 'manual-review-queue', 'rejected' => 'rejection-notifications', }; $this->bus->dispatch(new RedispatchMessage( new ShipOrder($command->orderId, $result['warehouse']), $transport )); } private function processOrder(ProcessOrder $command): array { // Business logic return ['status' => 'approved', 'warehouse' => 'warehouse-B']; } } // String representation for logging $redispatch = new RedispatchMessage( new ShipOrder(789, 'warehouse-C'), ['queue-a', 'queue-b'] ); echo (string) $redispatch; // "ShipOrder via queue-a, queue-b" ``` ## Summary The Symfony Messenger component provides a comprehensive solution for building message-driven applications in PHP. Its core use cases include implementing asynchronous job processing where time-consuming tasks like email sending, image processing, or API calls are offloaded to background workers. The component excels at CQRS implementations through its flexible bus architecture - a command bus for write operations and a query bus using HandleTrait for reads. Event-driven architectures benefit from the handlers locator's support for interface-based and wildcard handlers, allowing multiple listeners to react to domain events. Integration patterns typically involve configuring the MessageBus with appropriate middleware (validation, transaction handling, send/handle), setting up transports for different message queues (AMQP, Redis, Doctrine), and running workers to consume messages asynchronously. The retry strategy and failure transport mechanisms provide robust error handling for production environments. For testing, the InMemoryTransport allows complete isolation without external dependencies. The component integrates seamlessly with Symfony's service container through the AsMessageHandler and AsMessage attributes, enabling automatic handler discovery and routing configuration. Common patterns include using DispatchAfterCurrentBusStamp for transactional outbox patterns, DeduplicateStamp for idempotent message handling, and DelayStamp for scheduled tasks and rate limiting.