Try Live
Add Docs
Rankings
Pricing
Docs
Install
Install
Docs
Pricing
More...
More...
Try Live
Rankings
Enterprise
Create API Key
Add Docs
Symfony Messenger Component
https://github.com/symfony/messenger
Admin
The Messenger component helps applications send and receive messages to/from other applications or
...
Tokens:
9,100
Snippets:
30
Trust Score:
9.3
Update:
2 weeks ago
Context
Skills
Chat
Benchmark
74.5
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Symfony Messenger Component The Symfony Messenger component provides a robust message bus implementation for PHP applications, enabling them to send and receive messages to/from other applications or via message queues. It supports synchronous and asynchronous message handling, making it ideal for implementing CQRS (Command Query Responsibility Segregation) patterns, event-driven architectures, and background job processing. The component features a flexible middleware architecture that allows customization of message handling, including validation, transaction management, and retry strategies. It integrates seamlessly with various message brokers through transport abstraction, supports batch processing, message deduplication, and provides comprehensive error handling with automatic retry mechanisms. The component requires PHP 8.4+ and integrates with the broader Symfony ecosystem. ## MessageBus - Core Message Dispatcher The `MessageBus` is the central component that dispatches messages through a chain of middleware handlers. It accepts any object as a message and optionally wraps it in an `Envelope` with stamps that control middleware behavior. The bus returns an `Envelope` containing the message and any stamps added during processing. ```php <?php use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Stamp\DelayStamp; // Define a message class class SendEmailNotification { public function __construct( public readonly string $recipient, public readonly string $subject, public readonly string $body ) {} } // Create a handler for the message $emailHandler = function (SendEmailNotification $message): string { // Process the email mail($message->recipient, $message->subject, $message->body); return 'Email sent to ' . $message->recipient; }; // Configure the handlers locator $handlersLocator = new HandlersLocator([ SendEmailNotification::class => [$emailHandler], ]); // Create the message bus with middleware $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // Dispatch a message synchronously $envelope = $bus->dispatch(new SendEmailNotification( recipient: 'user@example.com', subject: 'Welcome!', body: 'Thank you for signing up.' )); // Dispatch with stamps (e.g., delay delivery by 5 seconds) $envelope = $bus->dispatch( new SendEmailNotification('user@example.com', 'Reminder', 'Your trial ends soon.'), [new DelayStamp(5000)] // 5000ms delay ); // Access handler results via HandledStamp use Symfony\Component\Messenger\Stamp\HandledStamp; $handledStamp = $envelope->last(HandledStamp::class); $result = $handledStamp?->getResult(); // 'Email sent to user@example.com' ``` ## Envelope - Message Wrapper with Stamps The `Envelope` class wraps messages with stamps (metadata) that control how middleware processes the message. Envelopes are immutable - methods like `with()` and `withoutAll()` return new instances. Stamps can be used for routing, delays, validation groups, and tracking handler results. ```php <?php use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\TransportNamesStamp; use Symfony\Component\Messenger\Stamp\ValidationStamp; use Symfony\Component\Messenger\Stamp\HandledStamp; class OrderPlaced { public function __construct(public readonly int $orderId) {} } $message = new OrderPlaced(orderId: 12345); // Create envelope with stamps $envelope = new Envelope($message, [ new DelayStamp(10000), // Delay 10 seconds new TransportNamesStamp(['async']), // Route to 'async' transport new ValidationStamp(['strict']), // Use 'strict' validation group ]); // Or use the static wrap method (handles both messages and envelopes) $envelope = Envelope::wrap($message, [new DelayStamp(5000)]); // Add stamps to existing envelope (returns new instance) $envelope = $envelope->with( new TransportNamesStamp(['high-priority']), new ValidationStamp(['Default']) ); // Remove stamps by type $envelope = $envelope->withoutAll(DelayStamp::class); $envelope = $envelope->withoutStampsOfType(DelayStamp::class); // Also removes subclasses // Retrieve stamps $delayStamp = $envelope->last(DelayStamp::class); // Get last stamp of type $allDelayStamps = $envelope->all(DelayStamp::class); // Get all stamps of type $allStamps = $envelope->all(); // Get all stamps grouped by type // Get the wrapped message $originalMessage = $envelope->getMessage(); // OrderPlaced instance // Check handler results after dispatch if ($handledStamp = $envelope->last(HandledStamp::class)) { $result = $handledStamp->getResult(); $handlerName = $handledStamp->getHandlerName(); } ``` ## AsMessageHandler Attribute - Handler Configuration The `#[AsMessageHandler]` attribute configures classes or methods as message handlers with options for bus selection, transport filtering, priority, and more. The handler method receives the message object and can return a result that's captured in a `HandledStamp`. ```php <?php use Symfony\Component\Messenger\Attribute\AsMessageHandler; class UserRegistered { public function __construct( public readonly int $userId, public readonly string $email ) {} } class OrderCreated { public function __construct(public readonly int $orderId) {} } // Simple handler - method name is inferred from __invoke #[AsMessageHandler] class SendWelcomeEmailHandler { public function __invoke(UserRegistered $event): void { // Send welcome email to $event->email echo "Sending welcome email to user {$event->userId}"; } } // Handler with configuration options #[AsMessageHandler( bus: 'event.bus', // Only handle from this bus fromTransport: 'high-priority', // Only handle from this transport priority: 10, // Higher priority = handled first handles: UserRegistered::class, // Explicit message type method: 'handleRegistration' // Custom method name )] class NotificationHandler { public function handleRegistration(UserRegistered $event): void { // Process the event } } // Multiple handlers in one class using repeatable attribute #[AsMessageHandler(handles: UserRegistered::class, method: 'onUserRegistered')] #[AsMessageHandler(handles: OrderCreated::class, method: 'onOrderCreated')] class AuditLogHandler { public function onUserRegistered(UserRegistered $event): void { // Log user registration } public function onOrderCreated(OrderCreated $event): void { // Log order creation } } // Method-level attribute for invokable classes class InvoiceHandler { #[AsMessageHandler] public function processInvoice(OrderCreated $order): string { // Generate invoice return "Invoice generated for order {$order->orderId}"; } } ``` ## AsMessage Attribute - Message Routing Configuration The `#[AsMessage]` attribute configures message routing to specific transports and serialization options directly on message classes. This provides a declarative way to define where messages should be sent without external configuration. ```php <?php use Symfony\Component\Messenger\Attribute\AsMessage; // Route to a single transport #[AsMessage(transport: 'async')] class ProcessPayment { public function __construct( public readonly string $paymentId, public readonly float $amount ) {} } // Route to multiple transports #[AsMessage(transport: ['async', 'audit-log'])] class OrderShipped { public function __construct( public readonly int $orderId, public readonly string $trackingNumber ) {} } // Custom serialized type name for cross-system compatibility #[AsMessage( transport: 'external-api', serializedTypeName: 'com.example.events.UserCreated' )] class UserCreated { public function __construct( public readonly int $userId, public readonly string $email, public readonly \DateTimeImmutable $createdAt ) {} } // Multiple transports with serialization config #[AsMessage(transport: ['async', 'analytics'], serializedTypeName: 'checkout.completed.v1')] class CheckoutCompleted { public function __construct( public readonly string $sessionId, public readonly array $items, public readonly float $total ) {} } ``` ## HandleTrait - Synchronous Query/Command Bus The `HandleTrait` provides a convenient `handle()` method for dispatching messages that expect exactly one handler and need the handler's return value. This is ideal for query buses where you dispatch a query and expect a single result. ```php <?php use Symfony\Component\Messenger\HandleTrait; use Symfony\Component\Messenger\MessageBusInterface; // Query message class GetUserById { public function __construct(public readonly int $userId) {} } // Query result class UserDto { public function __construct( public readonly int $id, public readonly string $name, public readonly string $email ) {} } // Service using HandleTrait for query bus class UserQueryService { use HandleTrait; public function __construct( private MessageBusInterface $messageBus ) {} public function findUser(int $userId): UserDto { // Dispatches message and returns handler result directly // Throws LogicException if zero or multiple handlers respond return $this->handle(new GetUserById($userId)); } } // Controller using the query service class UserController { public function __construct( private UserQueryService $queryService ) {} public function show(int $id): array { try { $user = $this->queryService->findUser($id); return [ 'id' => $user->id, 'name' => $user->name, 'email' => $user->email, ]; } catch (\Symfony\Component\Messenger\Exception\LogicException $e) { // Handle case: no handler or multiple handlers throw new \RuntimeException('Failed to fetch user: ' . $e->getMessage()); } } } // Example handler class GetUserByIdHandler { public function __invoke(GetUserById $query): UserDto { // Fetch from database return new UserDto( id: $query->userId, name: 'John Doe', email: 'john@example.com' ); } } ``` ## Worker - Async Message Consumer The `Worker` class consumes messages from transports (queues) and dispatches them to the message bus. It supports multiple receivers, rate limiting, and provides events for monitoring and controlling the consumption process. ```php <?php use Symfony\Component\Messenger\Worker; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Psr\EventDispatcher\EventDispatcherInterface; class ProcessImageUpload { public function __construct( public readonly string $imageId, public readonly string $path ) {} } // Create transport $transport = new InMemoryTransport(); // Create bus with handlers $handlersLocator = new HandlersLocator([ ProcessImageUpload::class => [function (ProcessImageUpload $message) { echo "Processing image: {$message->imageId}\n"; // Resize, optimize, generate thumbnails... return "Processed: {$message->path}"; }], ]); $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // Create worker with receivers $worker = new Worker( receivers: ['async' => $transport], bus: $bus, eventDispatcher: null, // Optional EventDispatcherInterface logger: null, // Optional LoggerInterface rateLimiters: null // Optional rate limiters per transport ); // Run the worker (blocks until stopped) $worker->run([ 'sleep' => 1000000, // Microseconds to sleep when no messages (1 second) 'queues' => null, // Specific queue names or null for all 'time_limit' => 3600, // Stop after N seconds 'fetch_size' => 10, // Number of messages to fetch per iteration ]); // Worker can be stopped programmatically $worker->stop(); // Send keepalive for long-running tasks $worker->keepalive(seconds: 30); // Access worker metadata $metadata = $worker->getMetadata(); $transportNames = $metadata->getTransportNames(); ``` ## HandlersLocator - Handler Registry The `HandlersLocator` maps message types to their handlers. It supports mapping by class name, parent classes, interfaces, and wildcard patterns. Handlers are returned as `HandlerDescriptor` objects which contain the callable and options. ```php <?php use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Handler\HandlerDescriptor; // Message hierarchy interface DomainEvent {} class UserEvent implements DomainEvent {} class UserRegistered extends UserEvent {} class UserUpdated extends UserEvent {} // Handlers $sendWelcomeEmail = function (UserRegistered $event) { echo "Welcome email sent\n"; }; $logAllEvents = function (DomainEvent $event) { echo "Event logged: " . $event::class . "\n"; }; $auditHandler = new class { public function handle(UserEvent $event): void { echo "Audit: User event recorded\n"; } }; // Configure handlers with options using HandlerDescriptor $handlersLocator = new HandlersLocator([ // Exact class match UserRegistered::class => [ $sendWelcomeEmail, new HandlerDescriptor( $sendWelcomeEmail, ['from_transport' => 'high-priority', 'alias' => 'welcome-mailer'] ), ], // Interface-based handler (matches all DomainEvent implementations) DomainEvent::class => [$logAllEvents], // Parent class handler (matches UserRegistered, UserUpdated) UserEvent::class => [[$auditHandler, 'handle']], // Wildcard handler for entire namespace 'App\Message\*' => [function (object $message) { echo "Handling message from App\Message namespace\n"; }], // Catch-all handler '*' => [function (object $message) { echo "Fallback handler for: " . $message::class . "\n"; }], ]); // Use with HandleMessageMiddleware use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\MessageBus; $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); // All matching handlers will be called in order $bus->dispatch(new UserRegistered()); // Output: // Welcome email sent // Event logged: UserRegistered // Audit: User event recorded ``` ## DelayStamp - Delayed Message Delivery The `DelayStamp` schedules message delivery for a future time. It supports specifying delays in milliseconds, by DateInterval, or until a specific DateTime. The transport uses this stamp to delay processing. ```php <?php use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; class SendReminder { public function __construct( public readonly int $userId, public readonly string $message ) {} } // Create bus (configuration omitted for brevity) /** @var MessageBusInterface $bus */ // Delay by milliseconds $envelope = $bus->dispatch( new SendReminder(1, 'Your subscription expires tomorrow'), [new DelayStamp(60000)] // 60 seconds ); // Delay using DateInterval $inOneHour = DelayStamp::delayFor(new \DateInterval('PT1H')); $envelope = $bus->dispatch( new SendReminder(2, 'Weekly report ready'), [$inOneHour] ); // Delay by days $inThreeDays = DelayStamp::delayFor(new \DateInterval('P3D')); $envelope = $bus->dispatch( new SendReminder(3, 'Trial ending soon'), [$inThreeDays] ); // Delay until specific time $tomorrow9am = new \DateTimeImmutable('tomorrow 9:00'); $untilTomorrow = DelayStamp::delayUntil($tomorrow9am); $envelope = $bus->dispatch( new SendReminder(4, 'Good morning! Check your tasks'), [$untilTomorrow] ); // Combine with other stamps use Symfony\Component\Messenger\Stamp\TransportNamesStamp; $envelope = $bus->dispatch( new SendReminder(5, 'Follow up on order'), [ new DelayStamp(3600000), // 1 hour delay new TransportNamesStamp(['notifications']), // Specific transport ] ); // Retrieve delay from envelope $stamp = $envelope->last(DelayStamp::class); $delayMs = $stamp?->getDelay(); // milliseconds ``` ## TransportNamesStamp - Override Message Routing The `TransportNamesStamp` allows overriding the default transport routing configured in the Messenger routing configuration. This is useful for dynamically routing messages based on runtime conditions. ```php <?php use Symfony\Component\Messenger\Stamp\TransportNamesStamp; use Symfony\Component\Messenger\MessageBusInterface; class ProcessOrder { public function __construct( public readonly int $orderId, public readonly float $amount, public readonly bool $isPriority ) {} } /** @var MessageBusInterface $bus */ $order = new ProcessOrder( orderId: 12345, amount: 999.99, isPriority: true ); // Route to single transport $bus->dispatch($order, [ new TransportNamesStamp('high-priority'), ]); // Route to multiple transports $bus->dispatch($order, [ new TransportNamesStamp(['async', 'audit-log', 'analytics']), ]); // Dynamic routing based on business logic function dispatchOrder(ProcessOrder $order, MessageBusInterface $bus): void { $transports = ['async']; if ($order->isPriority) { $transports = ['high-priority']; } if ($order->amount > 1000) { $transports[] = 'fraud-check'; } $transports[] = 'audit-log'; // Always log $bus->dispatch($order, [ new TransportNamesStamp(array_unique($transports)), ]); } // Retrieve transport names from envelope $envelope = $bus->dispatch($order, [ new TransportNamesStamp(['async', 'notifications']), ]); $stamp = $envelope->last(TransportNamesStamp::class); $names = $stamp?->getTransportNames(); // ['async', 'notifications'] ``` ## DispatchAfterCurrentBusStamp - Deferred Dispatch The `DispatchAfterCurrentBusStamp` marks messages to be handled after the current message handler completes. This ensures that sub-dispatched messages only run after the main transaction commits, preventing issues with database consistency. ```php <?php use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\MessageBus; // Messages class CreateOrder { public function __construct( public readonly array $items, public readonly string $customerId ) {} } class SendOrderConfirmation { public function __construct(public readonly int $orderId) {} } class UpdateInventory { public function __construct(public readonly array $items) {} } // Handler that dispatches secondary messages class CreateOrderHandler { public function __construct( private MessageBusInterface $bus ) {} public function __invoke(CreateOrder $command): int { // Create order in database (inside transaction) $orderId = $this->createOrderInDatabase($command); // These messages will be handled AFTER this handler completes // and the database transaction is committed $this->bus->dispatch( new SendOrderConfirmation($orderId), [new DispatchAfterCurrentBusStamp()] ); $this->bus->dispatch( new UpdateInventory($command->items), [new DispatchAfterCurrentBusStamp()] ); return $orderId; } private function createOrderInDatabase(CreateOrder $command): int { // Database insert logic return 12345; } } // Configure bus with DispatchAfterCurrentBusMiddleware $handlersLocator = new HandlersLocator([ CreateOrder::class => [new CreateOrderHandler($bus)], SendOrderConfirmation::class => [function ($msg) { echo "Confirmation sent\n"; }], UpdateInventory::class => [function ($msg) { echo "Inventory updated\n"; }], ]); $bus = new MessageBus([ // Must be before any transaction middleware new DispatchAfterCurrentBusMiddleware(), new HandleMessageMiddleware($handlersLocator), ]); // Dispatch - secondary messages handled after main handler completes $bus->dispatch(new CreateOrder(['item1', 'item2'], 'customer-123')); // Output order: // 1. CreateOrderHandler runs, returns orderId // 2. SendOrderConfirmation handled // 3. UpdateInventory handled ``` ## DeduplicateStamp - Message Deduplication The `DeduplicateStamp` prevents duplicate messages from being processed using a lock-based mechanism. It requires the Symfony Lock component and supports configurable TTL and queue-only deduplication modes. ```php <?php use Symfony\Component\Messenger\Stamp\DeduplicateStamp; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Lock\Key; class SyncExternalData { public function __construct( public readonly string $resourceId, public readonly string $source ) {} } class ProcessWebhook { public function __construct( public readonly string $webhookId, public readonly array $payload ) {} } /** @var MessageBusInterface $bus */ // Simple deduplication with string key $bus->dispatch( new SyncExternalData('resource-123', 'api'), [new DeduplicateStamp('sync-resource-123')] ); // Custom TTL (lock expires after 600 seconds) $bus->dispatch( new SyncExternalData('resource-456', 'api'), [new DeduplicateStamp( key: 'sync-resource-456', ttl: 600.0 // seconds )] ); // Deduplication only while message is in queue (not during processing) $bus->dispatch( new ProcessWebhook('webhook-789', ['event' => 'updated']), [new DeduplicateStamp( key: 'webhook-789', ttl: 300.0, onlyDeduplicateInQueue: true )] ); // Using Lock Key object for advanced scenarios $lockKey = new Key('custom-lock-key'); $bus->dispatch( new SyncExternalData('resource-999', 'webhook'), [new DeduplicateStamp($lockKey, ttl: 120.0)] ); // Dynamic key generation based on message content function dispatchWithDeduplication( SyncExternalData $message, MessageBusInterface $bus ): void { $key = sprintf('sync-%s-%s', $message->source, $message->resourceId); $bus->dispatch($message, [ new DeduplicateStamp($key, ttl: 300.0), ]); } // Retrieve stamp info $envelope = $bus->dispatch( new SyncExternalData('test', 'api'), [new DeduplicateStamp('test-key', 180.0)] ); $stamp = $envelope->last(DeduplicateStamp::class); $key = $stamp?->getKey(); // Lock\Key instance $ttl = $stamp?->getTtl(); // 180.0 $queueOnly = $stamp?->onlyDeduplicateInQueue(); // false ``` ## MultiplierRetryStrategy - Exponential Backoff Retry The `MultiplierRetryStrategy` implements retry logic with configurable exponential backoff, jitter, and maximum delay. It determines whether a message should be retried and calculates the waiting time between attempts. ```php <?php use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; // Default strategy: 3 retries, 1 second initial delay, no multiplier $defaultStrategy = new MultiplierRetryStrategy(); // Exponential backoff: delays of 1s, 2s, 4s, 8s... $exponentialStrategy = new MultiplierRetryStrategy( maxRetries: 5, delayMilliseconds: 1000, // 1 second initial multiplier: 2.0, // Double delay each retry maxDelayMilliseconds: 60000, // Cap at 60 seconds jitter: 0.1 // +/- 10% randomness ); // Aggressive retry for critical operations $aggressiveStrategy = new MultiplierRetryStrategy( maxRetries: 10, delayMilliseconds: 500, multiplier: 1.5, maxDelayMilliseconds: 30000, jitter: 0.2 ); // Constant delay (no backoff) $constantStrategy = new MultiplierRetryStrategy( maxRetries: 3, delayMilliseconds: 5000, multiplier: 1.0, // No increase jitter: 0.0 // No randomness ); // Example usage class PaymentProcessingFailed extends \Exception {} $envelope = new Envelope(new \stdClass()); // Simulate retries $exception = new PaymentProcessingFailed('Gateway timeout'); // Check if message should be retried if ($exponentialStrategy->isRetryable($envelope, $exception)) { $waitTime = $exponentialStrategy->getWaitingTime($envelope, $exception); echo "Retry in {$waitTime}ms\n"; // ~1000ms for first retry } // After adding RedeliveryStamp (simulating retry count) $envelopeAfterRetry = $envelope->with(new RedeliveryStamp(1)); // 1st retry done $waitTime = $exponentialStrategy->getWaitingTime($envelopeAfterRetry, $exception); echo "Wait time after 1 retry: {$waitTime}ms\n"; // ~2000ms $envelopeAfter3Retries = $envelope->with(new RedeliveryStamp(3)); $waitTime = $exponentialStrategy->getWaitingTime($envelopeAfter3Retries, $exception); echo "Wait time after 3 retries: {$waitTime}ms\n"; // ~8000ms // Check retry exhaustion $envelopeAfter5Retries = $envelope->with(new RedeliveryStamp(5)); $canRetry = $exponentialStrategy->isRetryable($envelopeAfter5Retries, $exception); echo "Can retry after 5 attempts: " . ($canRetry ? 'yes' : 'no') . "\n"; // no ``` ## InMemoryTransport - Testing Transport The `InMemoryTransport` is a transport implementation that stores messages in memory, making it ideal for testing. It tracks sent, acknowledged, and rejected messages, and supports delayed message simulation. ```php <?php use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Clock\MockClock; class OrderCreated { public function __construct(public readonly int $orderId) {} } class EmailSent { public function __construct(public readonly string $email) {} } // Basic usage $transport = new InMemoryTransport(); // Send messages $envelope1 = $transport->send(new Envelope(new OrderCreated(1))); $envelope2 = $transport->send(new Envelope(new OrderCreated(2))); $envelope3 = $transport->send(new Envelope(new EmailSent('test@example.com'))); // Messages get auto-assigned TransportMessageIdStamp $messageId = $envelope1->last(TransportMessageIdStamp::class)?->getId(); echo "Message ID: {$messageId}\n"; // 1 // Retrieve messages (simulates consuming from queue) $messages = $transport->get(); // Returns array of Envelopes echo "Messages in queue: " . count(iterator_to_array($messages)) . "\n"; // 3 // Acknowledge and reject foreach ($transport->get() as $envelope) { if ($envelope->getMessage() instanceof OrderCreated) { $transport->ack($envelope); // Mark as successfully processed } else { $transport->reject($envelope); // Mark as failed } } // Inspect transport state for assertions $sent = $transport->getSent(); // All sent messages $acknowledged = $transport->getAcknowledged(); // Successfully processed $rejected = $transport->getRejected(); // Failed messages echo "Sent: " . count($sent) . "\n"; // 3 echo "Acknowledged: " . count($acknowledged) . "\n"; // 2 (OrderCreated) echo "Rejected: " . count($rejected) . "\n"; // 1 (EmailSent) // Testing delayed messages with MockClock $clock = new MockClock('2024-01-15 10:00:00'); $transportWithClock = new InMemoryTransport(clock: $clock); $transportWithClock->send(new Envelope( new OrderCreated(100), [new DelayStamp(60000)] // 60 second delay )); // Message not available yet $available = $transportWithClock->get(); echo "Available now: " . count(iterator_to_array($available)) . "\n"; // 0 // Advance time $clock->modify('+2 minutes'); // Now available $available = $transportWithClock->get(); echo "Available after 2 min: " . count(iterator_to_array($available)) . "\n"; // 1 // Reset transport between tests $transport->reset(); echo "After reset: " . count($transport->getSent()) . "\n"; // 0 ``` ## ReceiverInterface and SenderInterface - Transport Contracts The `ReceiverInterface` and `SenderInterface` define the contracts for receiving messages from and sending messages to transports. These interfaces allow creating custom transport implementations for different message brokers. ```php <?php use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\TransportInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Exception\TransportException; // Custom transport implementation class RedisTransport implements TransportInterface { private \Redis $redis; private string $queue; public function __construct(\Redis $redis, string $queue = 'messages') { $this->redis = $redis; $this->queue = $queue; } /** * Receive messages from the transport. * * @param int $fetchSize Hint about how many messages to fetch * @return iterable<Envelope> * @throws TransportException */ public function get(int $fetchSize = 1): iterable { try { for ($i = 0; $i < $fetchSize; $i++) { $data = $this->redis->lPop($this->queue); if (!$data) { break; } $decoded = json_decode($data, true); $message = unserialize($decoded['body']); $id = $decoded['id']; yield new Envelope($message, [ new TransportMessageIdStamp($id), ]); } } catch (\RedisException $e) { throw new TransportException('Failed to receive message', 0, $e); } } /** * Acknowledge message was successfully handled. */ public function ack(Envelope $envelope): void { // In Redis LPOP, message is already removed // For other patterns, you'd remove from processing queue $id = $envelope->last(TransportMessageIdStamp::class)?->getId(); $this->redis->hDel('processing', (string) $id); } /** * Reject message (won't be retried). */ public function reject(Envelope $envelope): void { $id = $envelope->last(TransportMessageIdStamp::class)?->getId(); // Move to dead letter queue $this->redis->lPush('dead-letters', json_encode([ 'id' => $id, 'body' => serialize($envelope->getMessage()), 'rejected_at' => time(), ])); $this->redis->hDel('processing', (string) $id); } /** * Send message to the transport. * * @return Envelope With TransportMessageIdStamp added */ public function send(Envelope $envelope): Envelope { $id = uniqid('msg_', true); try { $this->redis->rPush($this->queue, json_encode([ 'id' => $id, 'body' => serialize($envelope->getMessage()), 'sent_at' => time(), ])); } catch (\RedisException $e) { throw new TransportException('Failed to send message', 0, $e); } return $envelope->with(new TransportMessageIdStamp($id)); } } // Usage $redis = new \Redis(); $redis->connect('localhost'); $transport = new RedisTransport($redis, 'my-app-queue'); // Send a message $envelope = $transport->send(new Envelope(new \stdClass())); $messageId = $envelope->last(TransportMessageIdStamp::class)->getId(); // Receive and process foreach ($transport->get(10) as $receivedEnvelope) { try { // Process message... $transport->ack($receivedEnvelope); } catch (\Exception $e) { $transport->reject($receivedEnvelope); } } ``` ## HandlerFailedException - Error Handling The `HandlerFailedException` is thrown when one or more handlers fail while processing a message. It wraps all handler exceptions and provides access to the envelope for retry logic and error recovery. ```php <?php use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Envelope; class ProcessPayment { public function __construct( public readonly string $orderId, public readonly float $amount ) {} } // Multiple handlers that might fail $chargeCard = function (ProcessPayment $payment) { if ($payment->amount > 1000) { throw new \RuntimeException('Amount exceeds limit'); } return 'Card charged'; }; $sendReceipt = function (ProcessPayment $payment) { throw new \RuntimeException('Email service unavailable'); }; $updateInventory = function (ProcessPayment $payment) { return 'Inventory updated'; }; $handlersLocator = new HandlersLocator([ ProcessPayment::class => [$chargeCard, $sendReceipt, $updateInventory], ]); $bus = new MessageBus([ new HandleMessageMiddleware($handlersLocator), ]); try { $bus->dispatch(new ProcessPayment('order-123', 500.00)); } catch (HandlerFailedException $e) { // Get the envelope with partial results $envelope = $e->getEnvelope(); // Get all exceptions (keyed by handler name) $exceptions = $e->getWrappedExceptions(); echo "Message: {$e->getMessage()}\n"; // "Handling "ProcessPayment" failed: Email service unavailable" foreach ($exceptions as $handlerName => $exception) { echo "Handler '{$handlerName}' failed: {$exception->getMessage()}\n"; } // Check which handlers succeeded via HandledStamp use Symfony\Component\Messenger\Stamp\HandledStamp; $handledStamps = $envelope->all(HandledStamp::class); foreach ($handledStamps as $stamp) { echo "Handler '{$stamp->getHandlerName()}' succeeded with: {$stamp->getResult()}\n"; } // Retry logic based on exception type foreach ($e->getWrappedExceptions() as $handlerName => $exception) { if ($exception instanceof \RuntimeException) { // Log and potentially reschedule error_log("Retryable failure in {$handlerName}: {$exception->getMessage()}"); } } // Get nested exceptions (returns first or can iterate) $firstException = $e->getPrevious(); // Re-throw if critical if (count($exceptions) > 1) { throw $e; // Multiple failures - escalate } } ``` ## RedispatchMessage - Cross-Transport Message Routing The `RedispatchMessage` allows re-routing a message to different transports at runtime. This is useful for forwarding messages between queues or implementing saga patterns where messages need to be sent to specific transports based on processing results. ```php <?php use Symfony\Component\Messenger\Message\RedispatchMessage; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Envelope; class ProcessOrder { public function __construct( public readonly int $orderId, public readonly string $region ) {} } class ShipOrder { public function __construct( public readonly int $orderId, public readonly string $warehouse ) {} } /** @var MessageBusInterface $bus */ // Redispatch to a single transport $order = new ProcessOrder(123, 'EU'); $bus->dispatch(new RedispatchMessage( envelope: $order, transportNames: 'eu-processing' )); // Redispatch to multiple transports $bus->dispatch(new RedispatchMessage( envelope: new ShipOrder(123, 'warehouse-A'), transportNames: ['shipping-queue', 'tracking-queue', 'notifications'] )); // Redispatch an envelope with existing stamps $envelope = new Envelope( new ProcessOrder(456, 'US'), [new \Symfony\Component\Messenger\Stamp\DelayStamp(5000)] ); $bus->dispatch(new RedispatchMessage( envelope: $envelope, transportNames: 'us-processing' )); // Saga pattern: route based on processing result class OrderProcessedHandler { public function __construct( private MessageBusInterface $bus ) {} public function __invoke(ProcessOrder $command): void { // Process the order $result = $this->processOrder($command); // Route to appropriate transport based on result $transport = match ($result['status']) { 'approved' => 'fulfillment-queue', 'review' => 'manual-review-queue', 'rejected' => 'rejection-notifications', }; $this->bus->dispatch(new RedispatchMessage( new ShipOrder($command->orderId, $result['warehouse']), $transport )); } private function processOrder(ProcessOrder $command): array { // Business logic return ['status' => 'approved', 'warehouse' => 'warehouse-B']; } } // String representation for logging $redispatch = new RedispatchMessage( new ShipOrder(789, 'warehouse-C'), ['queue-a', 'queue-b'] ); echo (string) $redispatch; // "ShipOrder via queue-a, queue-b" ``` ## Summary The Symfony Messenger component provides a comprehensive solution for building message-driven applications in PHP. Its core use cases include implementing asynchronous job processing where time-consuming tasks like email sending, image processing, or API calls are offloaded to background workers. The component excels at CQRS implementations through its flexible bus architecture - a command bus for write operations and a query bus using HandleTrait for reads. Event-driven architectures benefit from the handlers locator's support for interface-based and wildcard handlers, allowing multiple listeners to react to domain events. Integration patterns typically involve configuring the MessageBus with appropriate middleware (validation, transaction handling, send/handle), setting up transports for different message queues (AMQP, Redis, Doctrine), and running workers to consume messages asynchronously. The retry strategy and failure transport mechanisms provide robust error handling for production environments. For testing, the InMemoryTransport allows complete isolation without external dependencies. The component integrates seamlessly with Symfony's service container through the AsMessageHandler and AsMessage attributes, enabling automatic handler discovery and routing configuration. Common patterns include using DispatchAfterCurrentBusStamp for transactional outbox patterns, DeduplicateStamp for idempotent message handling, and DelayStamp for scheduled tasks and rate limiting.