Try Live
Add Docs
Rankings
Pricing
Docs
Install
Install
Docs
Pricing
More...
More...
Try Live
Rankings
Enterprise
Create API Key
Add Docs
Symfony Doctrine Messenger
https://github.com/symfony/doctrine-messenger
Admin
Provides Doctrine integration for Symfony Messenger, enabling message persistence and retrieval
...
Tokens:
3,178
Snippets:
11
Trust Score:
9.3
Update:
5 months ago
Context
Skills
Chat
Benchmark
37.5
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Symfony Doctrine Messenger Bridge The Symfony Doctrine Messenger Bridge provides a database-backed transport layer for the Symfony Messenger component, enabling reliable asynchronous message processing using Doctrine DBAL. This bridge allows applications to queue messages in a database table and process them asynchronously by workers, supporting multiple database platforms including MySQL, PostgreSQL, Oracle, and more. It offers features like delayed message delivery, message redelivery on timeout, and PostgreSQL-specific optimizations using LISTEN/NOTIFY for real-time message pushing. The bridge implements a complete transport system with sender and receiver components, automatic table setup, message acknowledgment/rejection, keepalive support for long-running handlers, and transaction management. It integrates seamlessly with Symfony's Messenger component and supports various database operations through Doctrine DBAL connections, making it ideal for building robust, database-backed message queues in PHP applications. ## APIs and Key Functions ### Transport Factory Creation The `DoctrineTransportFactory` creates transport instances from DSN configurations, automatically selecting PostgreSQL optimizations when available. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransportFactory; use Doctrine\Persistence\ConnectionRegistry; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; // Create transport factory with Doctrine connection registry $transportFactory = new DoctrineTransportFactory($registry); // Create transport with DSN - basic usage $transport = $transportFactory->createTransport( 'doctrine://default', ['table_name' => 'messenger_messages', 'queue_name' => 'async'], new PhpSerializer() ); // Create transport with PostgreSQL LISTEN/NOTIFY disabled $transport = $transportFactory->createTransport( 'doctrine://default', ['use_notify' => false, 'queue_name' => 'emails'], new PhpSerializer() ); // Check if DSN is supported if ($transportFactory->supports('doctrine://default', [])) { // Create and use transport } ``` ### Sending Messages to Queue The `DoctrineSender::send()` method serializes and stores messages in the database with optional delays. ```php <?php use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineSender; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; // Create sender with connection and serializer $sender = new DoctrineSender($connection, $serializer); // Send immediate message $message = new MyMessage('data'); $envelope = new Envelope($message); $envelope = $sender->send($envelope); // Get the assigned message ID $messageId = $envelope->last(TransportMessageIdStamp::class)->getId(); // Send delayed message (5 second delay) $delayedEnvelope = (new Envelope($message))->with(new DelayStamp(5000)); $delayedEnvelope = $sender->send($delayedEnvelope); // Send to specific queue (configured in Connection) try { $result = $sender->send($envelope); echo "Message sent with ID: " . $result->last(TransportMessageIdStamp::class)->getId(); } catch (TransportException $e) { echo "Failed to send: " . $e->getMessage(); } ``` ### Receiving Messages from Queue The `DoctrineReceiver::get()` method retrieves and locks messages for processing with automatic retry handling. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceiver; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; // Create receiver $receiver = new DoctrineReceiver($connection, $serializer); // Get next available message $envelopes = $receiver->get(); foreach ($envelopes as $envelope) { try { // Process the message $message = $envelope->getMessage(); processMessage($message); // Acknowledge successful processing $receiver->ack($envelope); } catch (\Exception $e) { // Reject message (will be deleted or marked) $receiver->reject($envelope); echo "Failed: " . $e->getMessage(); } } // Get message count in queue $count = $receiver->getMessageCount(); echo "Messages pending: $count"; // List all messages (up to 10) foreach ($receiver->all(10) as $envelope) { echo "Message ID: " . $envelope->last(TransportMessageIdStamp::class)->getId(); } // Find specific message by ID $envelope = $receiver->find(123); if ($envelope) { echo "Found message: " . get_class($envelope->getMessage()); } ``` ### Database Connection Configuration The `Connection::buildConfiguration()` method parses DSN strings into configuration arrays for database connections. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; use Doctrine\DBAL\Connection as DBALConnection; // Build configuration from DSN $config = Connection::buildConfiguration( 'doctrine://default?queue_name=emails&redeliver_timeout=7200', ['table_name' => 'my_messages'] ); // Create connection with configuration $connection = new Connection($config, $dbalConnection); // Send message $messageId = $connection->send( json_encode(['email' => 'test@example.com']), ['type' => 'email'], 5000 // 5 second delay ); // Get next message (with locking) $message = $connection->get(); if ($message) { echo "ID: {$message['id']}, Body: {$message['body']}"; // Acknowledge or reject $connection->ack($message['id']); // or $connection->reject($message['id']); } // Configuration options $defaultConfig = [ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'redeliver_timeout' => 3600, // 1 hour in seconds 'auto_setup' => true ]; ``` ### Automatic Table Setup The `Connection::setup()` method automatically creates the required database table and indexes with platform-specific optimizations. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; use Doctrine\DBAL\DriverManager; // Create DBAL connection $dbalConnection = DriverManager::getConnection([ 'url' => 'mysql://user:pass@localhost/dbname' ]); // Create connection with auto_setup enabled $connection = new Connection([ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'auto_setup' => true ], $dbalConnection); // Manual setup (creates table if not exists) $connection->setup(); // Setup is automatically called on first send/get if auto_setup is true $messageId = $connection->send('{"data": "value"}', ['type' => 'notification']); // Table structure created: // - id (BIGINT, auto-increment, primary key) // - body (TEXT) // - headers (TEXT, JSON encoded) // - queue_name (VARCHAR 190, indexed) // - created_at (DATETIME, immutable) // - available_at (DATETIME, immutable, indexed) // - delivered_at (DATETIME, immutable, nullable, indexed) ``` ### PostgreSQL LISTEN/NOTIFY Support The `PostgreSqlConnection` extends basic connection with real-time message notifications using PostgreSQL's LISTEN/NOTIFY mechanism. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; use Doctrine\DBAL\DriverManager; // Create PostgreSQL connection $dbalConnection = DriverManager::getConnection([ 'url' => 'postgresql://user:pass@localhost/dbname' ]); // Create PostgreSQL-optimized connection $connection = new PostgreSqlConnection([ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'check_delayed_interval' => 60000, // Check delayed messages every 60s 'get_notify_timeout' => 0 // Milliseconds to wait for notification ], $dbalConnection); // Setup creates triggers for notifications $connection->setup(); // Workers will receive instant notifications when messages arrive // Instead of polling, it uses LISTEN/NOTIFY for efficiency $message = $connection->get(); // Blocks efficiently until message arrives // Trigger SQL automatically created: // - Function: notify_messenger_messages() // - Trigger: notify_trigger on INSERT/UPDATE // - NOTIFY channel: table name // - Payload: queue name ``` ### Keepalive for Long-Running Handlers The `Connection::keepalive()` method extends the processing timeout for long-running message handlers. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceiver; $receiver = new DoctrineReceiver($connection, $serializer); // Get message $envelopes = $receiver->get(); foreach ($envelopes as $envelope) { // Long-running process for ($i = 0; $i < 10; $i++) { // Process chunk processChunk($i); // Update delivered_at timestamp every 30 seconds // Prevents message from being redelivered to another worker $receiver->keepalive($envelope, 30); sleep(30); } // Complete processing $receiver->ack($envelope); } // Keepalive validates timeout try { // This will throw exception if keepalive > redeliver_timeout $receiver->keepalive($envelope, 5000); // 5000 seconds } catch (TransportException $e) { echo "Keepalive interval too large: " . $e->getMessage(); } ``` ### Complete Transport Integration The `DoctrineTransport` provides a unified interface combining sender and receiver functionality with setup capabilities. ```php <?php use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection; use Symfony\Component\Messenger\Envelope; // Create transport $transport = new DoctrineTransport($connection, $serializer); // Setup database table $transport->setup(); // Send message $message = new OrderProcessedMessage(orderId: 123); $envelope = new Envelope($message); $sentEnvelope = $transport->send($envelope); // Receive messages $receivedEnvelopes = $transport->get(); foreach ($receivedEnvelopes as $envelope) { try { $message = $envelope->getMessage(); handleMessage($message); // Acknowledge $transport->ack($envelope); } catch (\Exception $e) { // Reject $transport->reject($envelope); } } // Query operations $messageCount = $transport->getMessageCount(); $allMessages = $transport->all(50); // Get up to 50 messages $specificMessage = $transport->find(123); // Find by ID // Schema configuration for migrations use Doctrine\DBAL\Schema\Schema; $schema = new Schema(); $transport->configureSchema($schema, $dbalConnection, function() { return true; // Same database check }); ``` ### Message Redelivery on Timeout Messages not acknowledged within the redeliver timeout are automatically made available for reprocessing. ```php <?php // Configure connection with custom redeliver timeout $connection = new Connection([ 'table_name' => 'messenger_messages', 'queue_name' => 'default', 'redeliver_timeout' => 1800 // 30 minutes in seconds ], $dbalConnection); // Message processing scenario: // 1. Worker A gets message at 10:00:00 (delivered_at set) // 2. Worker A crashes or hangs // 3. At 10:30:00, message becomes available again // 4. Worker B can now get the message $receiver = new DoctrineReceiver($connection, $serializer); // Worker loop while (true) { $envelopes = $receiver->get(); foreach ($envelopes as $envelope) { $startTime = time(); try { processMessage($envelope->getMessage()); $receiver->ack($envelope); } catch (\Exception $e) { if (time() - $startTime > 1800) { // Timeout reached, message will be redelivered echo "Processing timeout, message will be redelivered"; } else { $receiver->reject($envelope); } } } sleep(1); } ``` ## Summary and Integration The Symfony Doctrine Messenger Bridge serves as a reliable, database-backed message queue system suitable for applications requiring persistent message storage and guaranteed delivery. Primary use cases include asynchronous task processing (email sending, image processing, report generation), background job execution, event-driven architectures, and distributed systems requiring message persistence. The bridge handles scenarios where Redis or RabbitMQ may be unavailable or unnecessary, leveraging existing database infrastructure. It excels in applications needing transactional message insertion alongside business logic, ensuring messages are queued atomically within database transactions. Integration patterns include configuring the transport in `config/packages/messenger.yaml` with DSN strings like `doctrine://default?queue_name=async`, creating worker processes using `php bin/console messenger:consume doctrine`, and implementing custom message handlers. The bridge supports multiple queues by configuring different queue names, allows delayed message delivery through DelayStamp, and integrates with Symfony's retry mechanism for failed messages. For PostgreSQL deployments, enabling LISTEN/NOTIFY provides real-time message delivery without polling overhead. The transport automatically handles database schema creation, supports message inspection through the ListableReceiverInterface, and provides keepalive functionality for long-running handlers, making it a comprehensive solution for asynchronous messaging in Symfony applications.