### Install NestJS RabbitMQ Module Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Install the @golevelup/nestjs-rabbitmq package using npm, yarn, or pnpm. ```bash npm install ---save @golevelup/nestjs-rabbitmq ``` ```bash yarn add @golevelup/nestjs-rabbitmq ``` ```bash pnpm add @golevelup/nestjs-rabbitmq ``` -------------------------------- ### Select Specific Channel for Handler Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Assign a handler to a specific channel by setting `queueOptions.channel` to the desired channel name. The channel must be pre-configured in the module setup. Defaults to the default channel if not specified or if the channel doesn't exist. ```typescript import { RabbitSubscribe, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitRPC({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'subscribe-queue', queueOptions: { channel: 'channel-2', }, }) public async rpcHandler(msg: {}) { console.log(`Received rpc message: ${JSON.stringify(msg)}`); return { message: 'hi' }; } @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route-2', queue: 'subscribe-queue-2', }) public async pubSubHandler(msg: {}) { console.log(`Received pub/sub message: ${JSON.stringify(msg)}`); } } ``` -------------------------------- ### Initialize RabbitMQ Module with Exchanges and Channels Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Import `RabbitMQModule` into your module's `imports` array. Configure exchanges and channels, including prefetch counts, for message handling. If no channels are defined, a default one is created. ```typescript import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; import { Module } from '@nestjs/common'; import { MessagingController } from './messaging/messaging.controller'; import { MessagingService } from './messaging/messaging.service'; @Module({ imports: [ RabbitMQModule.forRoot({ exchanges: [ { name: 'exchange1', type: 'topic', }, ], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', channels: { 'channel-1': { prefetchCount: 15, default: true, }, 'channel-2': { prefetchCount: 2, }, }, }), RabbitExampleModule, ], providers: [MessagingService], controllers: [MessagingController], }) export class RabbitExampleModule {} ``` -------------------------------- ### Configure Connection Resiliency Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Enable connection resiliency by configuring `connectionInitOptions` to not wait for a connection. This allows the application to bootstrap even if the RabbitMQ broker is unavailable. ```typescript import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; @Module({ imports: [ RabbitMQModule.forRoot({ exchanges: [ { name: 'exchange1', type: 'topic', }, ], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', connectionInitOptions: { wait: false }, }), ], }) export class RabbitExampleModule {} ``` -------------------------------- ### Configure Consumer-Side Message Batching Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Configure message batching for consumers by setting batch options like size, timeout, and an error handler. Ensure channel's prefetchCount is sufficient for batching to work correctly. ```typescript import { RabbitSubscribe, RabbitMQConfig, RabbitMQChannelConfig, } from '@golevelup/nestjs-rabbitmq'; const batchErrorHandler = (channel, messages, error) => { console.log(`Received message batch of length: ${messages.length}`); }; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'batch-route', queue: 'batch-queue', batchOptions: { size: 10, timeout: 200, errorHandler: batchErrorHandler, }, }) public async batchHandler(messages) { console.log(`Received message batch of length: ${messages.length}`); } } ``` -------------------------------- ### Mix Top-Level Exchange/Routing Key with Bindings Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Combine top-level `exchange` and `routingKey` options with the `bindings` array in `@RabbitSubscribe` to handle messages from both a primary source and additional binding sources. ```typescript @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'route.c', queue: 'my-queue', bindings: [ { exchange: 'exchange2', routingKey: 'route.d' }, ], }) public async mixedHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } ``` -------------------------------- ### Mock AmqpConnection Provider for Testing Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html In tests, mock the `AmqpConnection` provider to ensure that publishers injecting it receive a mocked instance. This is done by creating a mock module that provides a mocked `AmqpConnection` and overriding the real `RabbitMQModule` with this mock during testing. ```typescript // In your test setup... import { Module } from '@nestjs/common'; import { AmqpConnection } from '@golevelup/nestjs-rabbitmq'; import { createMock } from '@golevelup/ts-jest'; import { Test } from '@nestjs/testing'; import { AppModule } from 'where your root module is located'; import { RabbitExampleModule } from 'where your rabbitmq module is located'; // Create a valid mock module @Module({ providers: [ { provide: AmqpConnection, useValue: createMock(), }, ], exports: [AmqpConnection], }) class MockRabbitExampleModule {} // Then override the real `RabbitMqModule` with the mocked one beforeAll(async () => { const moduleFixture = await Test.createTestingModule({ imports: [AppModule], }) .overrideModule(RabbitExampleModule) .useModule(MockRabbitExampleModule) .compile(); const app = moduleFixture.createNestApplication(); await app.init(); }); ``` -------------------------------- ### Publish Message with AmqpConnection Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the publish method of AmqpConnection to send a message to a RabbitMQ exchange. Options can be provided for message persistence. ```typescript amqpConnection.publish('some-exchange', 'routing-key', { msg: 'hello world' }); ``` ```typescript amqpConnection.publish( 'some-exchange', 'routing-key', { msg: 'hello world' }, { persistent: true }, ); ``` -------------------------------- ### Enable Controller Discovery in RabbitMQModule Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Configure the `RabbitMQModule` to enable controller discovery by setting `enableControllerDiscovery: true`. This allows NestJS controllers to be used as message handlers. ```typescript import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; import { Module } from '@nestjs/common'; import { MessagingController } from './messaging/messaging.controller'; import { MessagingService } from './messaging/messaging.service'; @Module({ imports: [ RabbitMQModule.forRoot({ exchanges: [ { name: 'exchange1', type: 'topic', }, ], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', enableControllerDiscovery: true, }), RabbitExampleModule, ], providers: [MessagingService, MessagingController], controllers: [MessagingController], }) export class RabbitExampleModule {} ``` -------------------------------- ### Configure Default Publish Options Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Set default publish options, such as message persistence, in the RabbitMQModule configuration to apply them to all published messages. ```typescript RabbitMQModule.forRoot(RabbitMQModule, { exchanges: [{ name: 'some-exchange', type: 'topic' }], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', defaultPublishOptions: { persistent: true, }, }); ``` -------------------------------- ### Consumer-Side Message Batching Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Configure consumers to receive messages in batches, improving efficiency by processing multiple messages at once. Batching can be controlled by size and timeout, with an optional error handler for batch-level errors. ```APIDOC ## Consumer-Side Message Batching ### Description Messages can be presented as a batch to the handler. This works by accumulating messages on the consumer-side until either a batch size limit is reached or the batch timer expires. After handling, all messages in the batch will be acked (or nacked) automatically. :::note For batching to work correctly, the channel's `prefetchCount` must be set to a value greater than or equal to the configured batch `size`. If the prefetch count is lower than the batch size, RabbitMQ will not deliver enough messages at once to fill a batch. You can configure this either globally via `RabbitMQConfig.prefetchCount` or per-channel via `RabbitMQChannelConfig.prefetchCount`. See the Configuration section for details. ::: This behaviour is configured in the `batchOptions` property: ```typescript import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; const batchErrorHandler = (channel, messages, error) => { console.log(`Received message batch of length: ${messages.length}`); }; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'batch-route', queue: 'batch-queue', batchOptions: { size: 10, timeout: 200, errorHandler: batchErrorHandler, }, }) public async batchHandler(messages) { console.log(`Received message batch of length: ${messages.length}`); } } ``` An error handler may be provided here if your error handling logic needs to be aware of the batch, otherwise it will fall back to either the top-level `errorHandler` or the default error handling behaviour. ``` -------------------------------- ### Publishing Messages (Fire and Forget) Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Publish messages to RabbitMQ exchanges using the `publish` method of the `AmqpConnection` class. Supports options for message persistence and default publish options for the module. ```APIDOC ## Publishing Messages (Fire and Forget) ### Inject the AmqpConnection All RabbitMQ interactions go through the `AmqpConnection` object. Assuming you installed and configured the `RabbitMQModule`, the object can be obtained through Nest's dependency injection system. Simply require it as a constructor parameter in a Nest Controller or Service. ```typescript @Controller() export class AppController { constructor(private readonly amqpConnection: AmqpConnection) {} ... } ``` ### Publishing Messages (Fire and Forget) If you just want to publish a message onto a RabbitMQ exchange, use the `publish` method of the `AmqpConnection` which has the following signature: ```typescript public publish( exchange: string, routingKey: string, message: any, options?: amqplib.Options.Publish ) ``` For example: ```typescript amqpConnection.publish('some-exchange', 'routing-key', { msg: 'hello world' }); ``` To mark published messages as persistent on the broker, pass `{ persistent: true }` as the options argument: ```typescript amqpConnection.publish( 'some-exchange', 'routing-key', { msg: 'hello world' }, { persistent: true }, ); ``` Note: For messages to be retained across a RabbitMQ broker restart, they must be published to queues (and exchanges) that are declared as `durable`, and the broker must have successfully flushed them to disk in addition to the messages being marked as persistent. Alternatively, configure `defaultPublishOptions` in the module configuration to apply the message persistence flag to **all** published messages by default: ```typescript RabbitMQModule.forRoot(RabbitMQModule, { exchanges: [{ name: 'some-exchange', type: 'topic' }], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', defaultPublishOptions: { persistent: true, }, }); ``` Per-call options passed to `publish()` are merged on top of `defaultPublishOptions`, so individual calls can still override specific properties. ``` -------------------------------- ### Apply Pipe and Error Handling to RabbitMQ RPC Method Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Decorate a `@RabbitRPC` method with `@UsePipes` to apply a validation pipe and configure `errorBehavior` and `errorHandler` for robust message processing. Ensure the controller is imported as a provider. ```typescript @RabbitRPC({ routingKey: 'intercepted-rpc-2', exchange: 'exchange2', queue: 'intercepted-rpc-2', errorBehavior: MessageHandlerErrorBehavior.ACK, errorHandler: ReplyErrorCallback, }) @UsePipes(ValidationPipe) interceptedRpc(@RabbitPayload() message:messageDto) { return { message: 42, }; } ``` -------------------------------- ### Bind Queue to Multiple Exchanges Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the `bindings` option in `@RabbitSubscribe` to bind a single queue to routing keys from multiple exchanges. This allows a handler to receive messages from different sources. ```typescript import { RabbitSubscribe, } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitSubscribe({ queue: 'my-queue', bindings: [ { exchange: 'exchange1', routingKey: 'route.a' }, { exchange: 'exchange2', routingKey: 'route.b' }, ], }) public async multiExchangeHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } } ``` -------------------------------- ### Competing Consumers Pattern Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Implement the competing consumers pattern by specifying a named queue for `@RabbitSubscribe`. This ensures that only one handler processes a given message when multiple instances of the application are running. ```typescript import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route1', queue: 'subscribe-queue', }) public async competingPubSubHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route2', }) public async messagePerInstanceHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } } ``` -------------------------------- ### Expose Pub/Sub Handler with RabbitSubscribe Decorator Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the `@RabbitSubscribe` decorator on a service method to create a Pub/Sub handler. This method will be invoked when a message matching the specified exchange and routing key is received, allowing for asynchronous message processing. ```typescript import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'subscribe-queue', }) public async pubSubHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } } ``` -------------------------------- ### Requesting Data from an RPC Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Make RPC requests to other services using the `request` method of the `AmqpConnection` class. This method allows for type inference for the response and includes optional timeout configuration. ```APIDOC ## Requesting Data from an RPC If you'd like to request data from another RPC handler that's been set up using this library, you can use the `request` method of the `AmqpConnection`. For example: ```typescript const response = await amqpConnection.request({ exchange: 'exchange1', routingKey: 'rpc', payload: { request: 'val', }, timeout: 10000, // optional timeout for how long the request // should wait before failing if no response is received }); ``` ### Type Inference The generic parameter used with the `request` method lets you specify the _expected_ return type of the RPC response. This is useful for getting intellisense in your editor but no object validation of the actual received object is done on your behalf. This means that you are required to provide your own object validation logic if you need to make runtime guarantees about message structure ### Interop with other RPC Servers The RPC functionality included in `@golevelup/nestjs-rabbitmq` is based on the Direct Reply-To Queue functionality of RabbitMQ. It is possible that because of this, the client library (`AmqpConnection.request`) could be used to interact with an RPC server implemented using a different language or framework. However, this functionality has not been verified. ``` -------------------------------- ### Custom Message Deserialization and Serialization Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Configure custom `deserializer` and `serializer` functions within `RabbitMQModule.forRoot` to handle messages that are not in JSON format or to implement custom serialization logic. ```typescript import { RabbitMQModule, } from '@golevelup/nestjs-rabbitmq'; import { Module } from '@nestjs/common'; import { MessagingController } from './messaging/messaging.controller'; import { MessagingService } from './messaging/messaging.service'; import { ConsumeMessage } from 'amqplib'; @Module({ imports: [ RabbitMQModule.forRoot({ // ... deserializer: (message: Buffer, msg: ConsumeMessage) => { const decodedMessage = myCustomDeserializer( msg.toString(), msg.properties.headers, ); return decodedMessage; }, serializer: (msg: any) => { const encodedMessage = myCustomSerializer(msg); return Buffer.from(encodedMessage); }, }), ], // ... }) export class RabbitExampleModule {} ``` -------------------------------- ### Request Data using AmqpConnection.request Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the request method of AmqpConnection to send a request to an RPC handler and receive a typed response. An optional timeout can be specified. ```typescript const response = await amqpConnection.request({ exchange: 'exchange1', routingKey: 'rpc', payload: { request: 'val', }, timeout: 10000, // optional timeout for how long the request // should wait before failing if no response is received }); ``` -------------------------------- ### Configure Custom Queue Creation Error Handler Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html The `assertQueueErrorHandler` property in `@RabbitSubscribe` allows for custom handling of errors that occur during the queue assertion process. This is useful for implementing specific logic when a queue needs to be created or verified. ```typescript @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route1', queue: 'subscribe-queue', assertQueueErrorHandler: myErrorHandler }) ``` -------------------------------- ### Identify RabbitMQ Context with Utility Function Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html The `isRabbitContext` utility function provides an alternative way to determine if the current execution context is related to RabbitMQ. This is useful for conditional logic within enhancers. ```typescript import { isRabbitContext } from '@golevelup/nestjs-rabbitmq'; @Injectable() class ExampleInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler) { // Do nothing if this is a RabbitMQ event if (isRabbitContext(context)) { return next.handle(); } // Execute custom interceptor logic for HTTP request/response return next.handle(); } } ``` -------------------------------- ### Access Original AMQP Message in Consumer Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html To access the original AMQP message, ensure your consumer method signature accepts `amqplib.ConsumeMessage` as the second argument. This allows retrieval of fields and properties from the raw message. ```typescript import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; import { ConsumeMessage } from 'amqplib'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'subscribe-queue', }) public async pubSubHandler(msg: {}, amqpMsg: ConsumeMessage) { console.log(`Correlation id: ${amqpMsg.properties.correlationId}`); } } ``` -------------------------------- ### Configure Per-Handler Consumer Tag Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Override the global consumer tag by setting `consumerTag` within `queueOptions.consumerOptions` on an individual handler decorator. This tag takes precedence over the global configuration. Ensure tags are unique per channel. ```typescript import { RabbitSubscribe, RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'my-queue', queueOptions: { consumerOptions: { consumerTag: 'my-handler-consumer-tag', }, }, }) public async pubSubHandler(msg: {}) { console.log(`Received message: ${JSON.stringify(msg)}`); } } ``` -------------------------------- ### Expose RPC Handler with RabbitRPC Decorator Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Apply the `@RabbitRPC` decorator to a service method to expose it as an RPC handler. This method will receive messages matching the specified exchange and routing key and send back the return value as a reply. ```typescript import { RabbitRPC } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitRPC({ exchange: 'exchange1', routingKey: 'rpc-route', queue: 'rpc-queue', }) public async rpcHandler(msg: {}) { return { response: 42, }; } } ``` -------------------------------- ### Configure Global Consumer Tag Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Set a default `consumerTag` for a queue in the module's `queues` configuration. This tag will be used by all handlers subscribed to that queue unless overridden at the handler level. Ensure tags are unique per channel. ```typescript RabbitMQModule.forRoot({ uri: 'amqp://localhost:5672', queues: [ { name: 'my-queue', consumerTag: 'my-global-consumer-tag', }, ], }); ``` -------------------------------- ### Apply Interceptor to RabbitMQ RPC Method Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the `@UseInterceptors` decorator on a method decorated with `@RabbitRPC` to apply an interceptor to that specific RPC handler. Ensure the controller is imported as a provider. ```typescript @RabbitRPC({ routingKey: 'intercepted-rpc-2', exchange: 'exchange2', queue: 'intercepted-rpc-2', }) @UseInterceptors(TransformInterceptor) interceptedRpc() { return { message: 42, }; } ``` -------------------------------- ### Conditionally Requeue Message Based on Retry Count Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use a retry counter in message headers to conditionally requeue a message. Return `new Nack(true)` to requeue for up to 3 retries, then `new Nack(false)` to dead-letter it. Ensure your retry infrastructure increments the counter. ```typescript import { RabbitSubscribe, Nack } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; import { ConsumeMessage } from 'amqplib'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'subscribe-queue', }) public async pubSubHandler(msg: {}, amqpMsg: ConsumeMessage) { // x-retry-count must be set by your retry infrastructure when republishing const retryCount = (amqpMsg.properties.headers?.['x-retry-count'] ?? 0) as number; try { // ... process message ... } catch (e) { console.error('Failed to process message', e); if (retryCount < 3) { // Requeue for retry — make sure you have a retry counter to avoid infinite loops return new Nack(true); } // Exhausted retries — nack without requeue (routes to DLQ if configured) return new Nack(false); } } } ``` -------------------------------- ### Inject AmqpConnection in NestJS Controller Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Obtain the AmqpConnection object via dependency injection in a NestJS Controller or Service to interact with RabbitMQ. ```typescript @Controller() export class AppController { constructor(private readonly amqpConnection: AmqpConnection) {} ... } ``` -------------------------------- ### Prevent RabbitMQ Connection in Tests Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html To avoid connecting to a RabbitMQ broker during tests, pass `undefined` as the configuration object to `RabbitMQModule.forRoot`. This effectively disables the connection process for the test environment. ```typescript import { RabbitMQModule } from '@golevelup/nestjs-rabbitmq'; @Module({ imports: [ RabbitMQModule.forRoot( RabbitMQModule, /** Not sending config object makes the connection process to be ignored */ process.env.NODE_ENV !== 'test' ? { exchanges: [ { name: 'exchange1', type: 'topic', }, ], uri: 'amqp://rabbitmq:rabbitmq@localhost:5672', connectionInitOptions: { wait: false }, } : undefined, ), ], }) export class RabbitExampleModule {} ``` -------------------------------- ### Configure Custom Message Error Handler Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use the `errorHandler` property within `@RabbitSubscribe` to define a custom function for handling message processing errors. This provides full control over error resolution, overriding the default `errorBehavior`. ```typescript @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route1', queue: 'subscribe-queue', errorHandler: myErrorHandler }) ``` -------------------------------- ### Allow Non-JSON Messages Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Set the `allowNonJsonMessages` flag on a consumer to receive raw message buffers if they cannot be parsed as JSON. This bypasses the default JSON parsing behavior. ```typescript // This is a conceptual example, the actual implementation would be within a consumer decorator or configuration. // Example: @RabbitSubscribe({ ..., allowNonJsonMessages: true }) // The provided text does not contain a direct code snippet for this specific flag, only a description. ``` -------------------------------- ### Create Distributed System Transaction Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use a correlation ID to group multiple requests into a single transaction in a distributed system. Each request within the transaction can have its own unique request ID. ```typescript const correlationId = randomUUID(); const response = await amqpConnection.request({ exchange: 'exchange1', routingKey: 'rpc', correlationId, // Each request in the transaction has its own requestId headers: { 'X-Request-ID': randomUUID() }, payload: { request: 'val', }, }); ``` -------------------------------- ### Identify RabbitMQ Context in Interceptor Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Use `RABBIT_CONTEXT_TYPE_KEY` to check if the current execution context is from RabbitMQ. This prevents global enhancers from unintentionally affecting RabbitMQ message handlers. ```typescript import { RABBIT_CONTEXT_TYPE_KEY } from '@golevelup/nestjs-rabbitmq'; @Injectable() class ExampleInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler) { const contextType = context.getType< 'http' | typeof RABBIT_CONTEXT_TYPE_KEY >(); // Do nothing if this is a RabbitMQ event if (contextType === RABBIT_CONTEXT_TYPE_KEY) { return next.handle(); } // Execute custom interceptor logic for HTTP request/response return next.handle(); } } ``` -------------------------------- ### Handle RPC Timeout Error Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Catch and handle `RpcTimeoutError` specifically when an RPC request exceeds the specified timeout. This allows for custom error logging and fallback mechanisms. ```typescript import { AmqpConnection, RpcTimeoutError } from '@golevelup/nestjs-rabbitmq'; try { const response = await amqpConnection.request({ exchange: 'exchange1', routingKey: 'rpc', payload: { request: 'val' }, timeout: 5000, }); } catch (error) { if (error instanceof RpcTimeoutError) { console.error(`RPC timed out after ${error.timeout}ms`); console.error( `Exchange: ${error.exchange}, Routing Key: ${error.routingKey}`, ); // Handle timeout specifically } else { // Handle other errors throw error; } } ``` -------------------------------- ### Nack Message Without Requeue Source: https://golevelup.github.io/nestjs/modules/rabbitmq.html Return `new Nack(false)` from a handler to negatively acknowledge a message without requeuing it. This is useful for non-recoverable errors, typically sending the message to a dead-letter exchange. ```typescript import { RabbitSubscribe, Nack } from '@golevelup/nestjs-rabbitmq'; import { Injectable } from '@nestjs/common'; @Injectable() export class MessagingService { @RabbitSubscribe({ exchange: 'exchange1', routingKey: 'subscribe-route', queue: 'subscribe-queue', }) public async pubSubHandler(msg: {}) { try { // ... process message ... // returning void/undefined here causes the library to auto-ack the message } catch (e) { // Log the error so it's not silently swallowed console.error('Failed to process message', e); // Nack without requeue — message goes to the dead-letter exchange (if configured) return new Nack(false); } } } ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.