# Symfony Doctrine Messenger Bridge The Symfony Doctrine Messenger Bridge provides a database-backed transport layer for the Symfony Messenger component, enabling reliable asynchronous message processing using Doctrine DBAL. This bridge allows applications to queue messages in a database table and process them asynchronously by workers, supporting multiple database platforms including MySQL, PostgreSQL, Oracle, and more. It offers features like delayed message delivery, message redelivery on timeout, and PostgreSQL-specific optimizations using LISTEN/NOTIFY for real-time message pushing. The bridge implements a complete transport system with sender and receiver components, automatic table setup, message acknowledgment/rejection, keepalive support for long-running handlers, and transaction management. It integrates seamlessly with Symfony's Messenger component and supports various database operations through Doctrine DBAL connections, making it ideal for building robust, database-backed message queues in PHP applications. ## APIs and Key Functions ### Transport Factory Creation The `DoctrineTransportFactory` creates transport instances from DSN configurations, automatically selecting PostgreSQL optimizations when available. ```php createTransport( 'doctrine://default', ['table_name' => 'messenger_messages', 'queue_name' => 'async'], new PhpSerializer() ); // Create transport with PostgreSQL LISTEN/NOTIFY disabled $transport = $transportFactory->createTransport( 'doctrine://default', ['use_notify' => false, 'queue_name' => 'emails'], new PhpSerializer() ); // Check if DSN is supported if ($transportFactory->supports('doctrine://default', [])) { // Create and use transport } ``` ### Sending Messages to Queue The `DoctrineSender::send()` method serializes and stores messages in the database with optional delays. ```php send($envelope); // Get the assigned message ID $messageId = $envelope->last(TransportMessageIdStamp::class)->getId(); // Send delayed message (5 second delay) $delayedEnvelope = (new Envelope($message))->with(new DelayStamp(5000)); $delayedEnvelope = $sender->send($delayedEnvelope); // Send to specific queue (configured in Connection) try { $result = $sender->send($envelope); echo "Message sent with ID: " . $result->last(TransportMessageIdStamp::class)->getId(); } catch (TransportException $e) { echo "Failed to send: " . $e->getMessage(); } ``` ### Receiving Messages from Queue The `DoctrineReceiver::get()` method retrieves and locks messages for processing with automatic retry handling. ```php get(); foreach ($envelopes as $envelope) { try { // Process the message $message = $envelope->getMessage(); processMessage($message); // Acknowledge successful processing $receiver->ack($envelope); } catch (\Exception $e) { // Reject message (will be deleted or marked) $receiver->reject($envelope); echo "Failed: " . $e->getMessage(); } } // Get message count in queue $count = $receiver->getMessageCount(); echo "Messages pending: $count"; // List all messages (up to 10) foreach ($receiver->all(10) as $envelope) { echo "Message ID: " . $envelope->last(TransportMessageIdStamp::class)->getId(); } // Find specific message by ID $envelope = $receiver->find(123); if ($envelope) { echo "Found message: " . get_class($envelope->getMessage()); } ``` ### Database Connection Configuration The `Connection::buildConfiguration()` method parses DSN strings into configuration arrays for database connections. ```php 'my_messages'] ); // Create connection with configuration $connection = new Connection($config, $dbalConnection); // Send message $messageId = $connection->send( json_encode(['email' => 'test@example.com']), ['type' => 'email'], 5000 // 5 second delay ); // Get next message (with locking) $message = $connection->get(); if ($message) { echo "ID: {$message['id']}, Body: {$message['body']}"; // Acknowledge or reject $connection->ack($message['id']); // or $connection->reject($message['id']); } // Configuration options $defaultConfig = [ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'redeliver_timeout' => 3600, // 1 hour in seconds 'auto_setup' => true ]; ``` ### Automatic Table Setup The `Connection::setup()` method automatically creates the required database table and indexes with platform-specific optimizations. ```php 'mysql://user:pass@localhost/dbname' ]); // Create connection with auto_setup enabled $connection = new Connection([ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'auto_setup' => true ], $dbalConnection); // Manual setup (creates table if not exists) $connection->setup(); // Setup is automatically called on first send/get if auto_setup is true $messageId = $connection->send('{"data": "value"}', ['type' => 'notification']); // Table structure created: // - id (BIGINT, auto-increment, primary key) // - body (TEXT) // - headers (TEXT, JSON encoded) // - queue_name (VARCHAR 190, indexed) // - created_at (DATETIME, immutable) // - available_at (DATETIME, immutable, indexed) // - delivered_at (DATETIME, immutable, nullable, indexed) ``` ### PostgreSQL LISTEN/NOTIFY Support The `PostgreSqlConnection` extends basic connection with real-time message notifications using PostgreSQL's LISTEN/NOTIFY mechanism. ```php 'postgresql://user:pass@localhost/dbname' ]); // Create PostgreSQL-optimized connection $connection = new PostgreSqlConnection([ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'check_delayed_interval' => 60000, // Check delayed messages every 60s 'get_notify_timeout' => 0 // Milliseconds to wait for notification ], $dbalConnection); // Setup creates triggers for notifications $connection->setup(); // Workers will receive instant notifications when messages arrive // Instead of polling, it uses LISTEN/NOTIFY for efficiency $message = $connection->get(); // Blocks efficiently until message arrives // Trigger SQL automatically created: // - Function: notify_messenger_messages() // - Trigger: notify_trigger on INSERT/UPDATE // - NOTIFY channel: table name // - Payload: queue name ``` ### Keepalive for Long-Running Handlers The `Connection::keepalive()` method extends the processing timeout for long-running message handlers. ```php get(); foreach ($envelopes as $envelope) { // Long-running process for ($i = 0; $i < 10; $i++) { // Process chunk processChunk($i); // Update delivered_at timestamp every 30 seconds // Prevents message from being redelivered to another worker $receiver->keepalive($envelope, 30); sleep(30); } // Complete processing $receiver->ack($envelope); } // Keepalive validates timeout try { // This will throw exception if keepalive > redeliver_timeout $receiver->keepalive($envelope, 5000); // 5000 seconds } catch (TransportException $e) { echo "Keepalive interval too large: " . $e->getMessage(); } ``` ### Complete Transport Integration The `DoctrineTransport` provides a unified interface combining sender and receiver functionality with setup capabilities. ```php setup(); // Send message $message = new OrderProcessedMessage(orderId: 123); $envelope = new Envelope($message); $sentEnvelope = $transport->send($envelope); // Receive messages $receivedEnvelopes = $transport->get(); foreach ($receivedEnvelopes as $envelope) { try { $message = $envelope->getMessage(); handleMessage($message); // Acknowledge $transport->ack($envelope); } catch (\Exception $e) { // Reject $transport->reject($envelope); } } // Query operations $messageCount = $transport->getMessageCount(); $allMessages = $transport->all(50); // Get up to 50 messages $specificMessage = $transport->find(123); // Find by ID // Schema configuration for migrations use Doctrine\DBAL\Schema\Schema; $schema = new Schema(); $transport->configureSchema($schema, $dbalConnection, function() { return true; // Same database check }); ``` ### Message Redelivery on Timeout Messages not acknowledged within the redeliver timeout are automatically made available for reprocessing. ```php 'messenger_messages', 'queue_name' => 'default', 'redeliver_timeout' => 1800 // 30 minutes in seconds ], $dbalConnection); // Message processing scenario: // 1. Worker A gets message at 10:00:00 (delivered_at set) // 2. Worker A crashes or hangs // 3. At 10:30:00, message becomes available again // 4. Worker B can now get the message $receiver = new DoctrineReceiver($connection, $serializer); // Worker loop while (true) { $envelopes = $receiver->get(); foreach ($envelopes as $envelope) { $startTime = time(); try { processMessage($envelope->getMessage()); $receiver->ack($envelope); } catch (\Exception $e) { if (time() - $startTime > 1800) { // Timeout reached, message will be redelivered echo "Processing timeout, message will be redelivered"; } else { $receiver->reject($envelope); } } } sleep(1); } ``` ## Summary and Integration The Symfony Doctrine Messenger Bridge serves as a reliable, database-backed message queue system suitable for applications requiring persistent message storage and guaranteed delivery. Primary use cases include asynchronous task processing (email sending, image processing, report generation), background job execution, event-driven architectures, and distributed systems requiring message persistence. The bridge handles scenarios where Redis or RabbitMQ may be unavailable or unnecessary, leveraging existing database infrastructure. It excels in applications needing transactional message insertion alongside business logic, ensuring messages are queued atomically within database transactions. Integration patterns include configuring the transport in `config/packages/messenger.yaml` with DSN strings like `doctrine://default?queue_name=async`, creating worker processes using `php bin/console messenger:consume doctrine`, and implementing custom message handlers. The bridge supports multiple queues by configuring different queue names, allows delayed message delivery through DelayStamp, and integrates with Symfony's retry mechanism for failed messages. For PostgreSQL deployments, enabling LISTEN/NOTIFY provides real-time message delivery without polling overhead. The transport automatically handles database schema creation, supports message inspection through the ListableReceiverInterface, and provides keepalive functionality for long-running handlers, making it a comprehensive solution for asynchronous messaging in Symfony applications.