### Nuke Broker Setup (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Asynchronously nuke the broker setup using broker.nuke with async/await. This provides a modern and readable way to handle asynchronous cleanup operations. ```javascript afterEach(async () => { await broker.nuke(); }); ``` -------------------------------- ### Nuke Broker Setup (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Completely tear down and reinitialize the broker setup using broker.nuke with a callback. This is useful for cleaning up between test runs to ensure a fresh state. ```javascript afterEach((done) => { broker.nuke(done); }); ``` -------------------------------- ### Basic Broker Setup with Callbacks Source: https://github.com/onebeyond/rascal/blob/master/README.md Demonstrates initializing a Rascal broker using callbacks, publishing a message, and setting up a subscription to consume messages. Ensure error handlers are attached to the broker, publication, and subscription. ```javascript const Broker = require('rascal').Broker; const config = require('./config'); Broker.create(config, (err, broker) => { if (err) throw err; broker.on('error', console.error); // Publish a message broker.publish('demo_publication', 'Hello World!', (err, publication) => { if (err) throw err; publication.on('error', console.error); }); // Consume a message broker.subscribe('demo_subscription', (err, subscription) => { if (err) throw err; subscription .on('message', (message, content, ackOrNack) => { console.log(content); ackOrNack(); }) .on('error', console.error); }); }); ``` -------------------------------- ### Async/Await Example for Rascal Broker Source: https://github.com/onebeyond/rascal/blob/master/README.md Demonstrates creating a Rascal broker using async/await, publishing a message, and setting up a subscription to consume messages. Includes basic error handling for the broker, publications, and subscriptions. ```javascript const Broker = require('rascal').BrokerAsPromised; const config = require('./config'); (async () => { try { const broker = await Broker.create(config); broker.on('error', console.error); // Publish a message const publication = await broker.publish('demo_publication', 'Hello World!'); publication.on('error', console.error); // Consume a message const subscription = await broker.subscribe('demo_subscription'); subscription .on('message', (message, content, ackOrNack) => { console.log(content); ackOrNack(); }) .on('error', console.error); } catch (err) { console.error(err); } })(); ``` -------------------------------- ### Bounce Broker (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Restart the broker by disconnecting and reinitializing it using broker.bounce with a callback. This is useful for resetting the broker's state during test setups. ```javascript beforeEach((done) => { broker.bounce(done); }); ``` -------------------------------- ### Handle Publication Pausing and Aborting Source: https://github.com/onebeyond/rascal/blob/master/README.md Listen for the 'paused' event on a publication and call `publication.abort()` to drop messages if the connection is temporarily lost. This example uses callbacks. ```javascript broker.publish('p1', 'some message', (err, publication) => { if (err) throw err; // publication didn't exist publication .on('success', (messageId) => { console.log('Message id was: ', messageId); }) .on('error', (err, messageId) => { console.error('Error was: ', err.message); }) .on('paused', (messageId) => { console.warn('Publication was paused. Aborting message: ', messageId); publication.abort(); }); }); ``` -------------------------------- ### Rascal Error Recovery Strategy Examples Source: https://context7.com/onebeyond/rascal/llms.txt Illustrates various ways to acknowledge messages using different error recovery strategies, including republish, forward, nack with requeue, nack without requeue (dead letter), and chaining multiple strategies. ```javascript // Recovery strategy options: // 1. Republish to same queue with retry count tracking ackOrNack(err, { strategy: 'republish', defer: 5000, attempts: 10 }); // 2. Forward to different publication ackOrNack(err, { strategy: 'forward', publication: 'error_queue', attempts: 3 }); // 3. Nack with requeue ackOrNack(err, { strategy: 'nack', defer: 1000, requeue: true }); // 4. Nack without requeue (dead letter) ackOrNack(err, { strategy: 'nack' }); // 5. Chain strategies with fallback ackOrNack(err, [ { strategy: 'republish', defer: 1000, attempts: 3 }, { strategy: 'forward', publication: 'dlq', attempts: 1 }, { strategy: 'nack' } ]); ``` -------------------------------- ### Get Rascal Managed Connections Source: https://github.com/onebeyond/rascal/blob/master/README.md Retrieve a list of Rascal-managed connections by calling `broker.getConnections()`. The output shows vhost and connection URL details. ```json [ { "vhost": "/", "connectionUrl": "amqp://guest:***@localhost:5672?heartbeat=50&connection_timeout=10000&channelMax=100" } { "vhost": "other", "connectionUrl": "amqp://guest:***@localhost:5672/other?heartbeat=50&connection_timeout=10000&channelMax=100" } ] ``` -------------------------------- ### Create Rascal Broker Instance (Async/Await) Source: https://context7.com/onebeyond/rascal/llms.txt Use `Rascal.BrokerAsPromised.create` to instantiate a broker with default configuration and async/await support. Listen for 'error' and 'vhost_initialised' events. ```javascript const Rascal = require('rascal'); // Configuration with vhosts, exchanges, queues, bindings, publications, and subscriptions const config = { vhosts: { '/': { connection: { url: 'amqp://guest:guest@localhost:5672/', retry: { min: 1000, max: 60000, factor: 2, strategy: 'exponential' } }, exchanges: ['demo_ex'], queues: ['demo_q'], bindings: ['demo_ex[a.b.c] -> demo_q'], publications: { demo_pub: { exchange: 'demo_ex', routingKey: 'a.b.c', confirm: true, timeout: 10000 } }, subscriptions: { demo_sub: { queue: 'demo_q', prefetch: 10 } } } } }; // Using async/await (BrokerAsPromised) const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); broker.on('error', console.error); broker.on('vhost_initialised', ({ vhost, connectionUrl }) => { console.log(`Vhost ${vhost} connected via ${connectionUrl}`); }); ``` -------------------------------- ### Configure Channel Pooling Source: https://github.com/onebeyond/rascal/blob/master/README.md Adjust channel pool settings like max/min size, eviction intervals, and idle timeouts. Set autostart to true to create pools on first use. ```json { "vhosts": { "v1": { "publicationChannelPools": { "regularPool": { "max": 10, "min": 5, "evictionRunIntervalMillis": 10000, "idleTimeoutMillis": 60000, "autostart": true }, "confirmPool": { "max": 10, "min": 5, "evictionRunIntervalMillis": 10000, "idleTimeoutMillis": 60000, "autostart": true } } } } } ``` -------------------------------- ### Handle Redelivery Exceeded Event (Promises API) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribe to messages using the promises API and attach event listeners for 'message', 'error', and 'redeliveries_exceeded'. Ensure to catch potential errors during subscription setup. ```javascript try { const subscription = await broker.subscribe('s1'); subscription .on('message', (message, content, ackOrNack) => { // Do stuff with message }) .on('error', (err) => { console.error('Subscriber error', err); }) .on('redeliveries_exceeded', (err, message, ackOrNack) => { console.error('Redeliveries exceeded', err); ackOrNack(err); }); } catch (err) { // subscription didn't exist } ``` -------------------------------- ### Connect to RabbitMQ vhost (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Establish a connection to a RabbitMQ vhost using the broker.connect method with a callback. This approach is suitable for asynchronous operations and requires explicit error handling. ```javascript broker.connect('/', (err, connection) => { if (err) throw new Error(`Connection error: ${err.message}`); // profit }); ``` -------------------------------- ### Configure Publication to Queue Source: https://github.com/onebeyond/rascal/blob/master/README.md Define a publication named 'p1' that targets queue 'q1' in vhost 'v1'. ```json { "publications": { "p1": { "vhost": "v1", "queue": "q1" } } } ``` -------------------------------- ### Connect to RabbitMQ vhost (Promise) Source: https://github.com/onebeyond/rascal/blob/master/README.md Establish a connection to a RabbitMQ vhost using the broker.connect method, which returns a Promise. This allows for cleaner asynchronous code using try-catch blocks for error handling. ```javascript try { const connection = broker.connect('/'); // profit } catch (err) { throw new Error(`Connection error: ${err.message}`); } ``` -------------------------------- ### Configure Connections with Management URL Source: https://github.com/onebeyond/rascal/blob/master/README.md Provide both AMQP connection URLs and management API URLs for each connection in a list. This allows for explicit configuration of both aspects. ```json { "vhosts": { "v1": { "connections": [ { "url": "amqp://guest:guest@broker1.example.com:5672/v1?heartbeat=10", "management": "http://guest:guest@broker1.example.com:15672" }, { "url": "amqp://guest:guest@broker2.example.com:5672/v1?heartbeat=10", "management": "http://guest:guest@broker2.example.com:15672" }, { "url": "amqp://guest:guest@broker3.example.com:5672/v1?heartbeat=10", "management": "http://guest:guest@broker3.example.com:15672" } ] } } } ``` -------------------------------- ### Initialize and Manage Rascal Broker Source: https://context7.com/onebeyond/rascal/llms.txt Sets up a Rascal broker with a default configuration, including virtual hosts, exchanges, queues, bindings, publications, and subscriptions. It also demonstrates error handling, message subscription, and graceful shutdown. ```javascript const Rascal = require('rascal'); const config = { vhosts: { '/': { // Assert vhost exists (requires management plugin) assert: true, check: true, exchanges: ['app_ex'], queues: { 'app_q': { // Purge queue on initialization (useful for testing) purge: false } }, bindings: ['app_ex[#] -> app_q'], publications: { app_pub: { exchange: 'app_ex' } }, subscriptions: { app_sub: { queue: 'app_q', prefetch: 10 } } } } }; const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); broker.on('error', console.error); // Subscribe to messages const subscription = await broker.subscribe('app_sub'); subscription.on('message', (message, content, ackOrNack) => { console.log('Received:', content); ackOrNack(); }); // Graceful shutdown - cancels subscriptions, waits for unacked messages, closes connections process.on('SIGTERM', async () => { console.log('Shutting down...'); await broker.shutdown(); process.exit(0); }); ``` -------------------------------- ### Configure Publication to Exchange Source: https://github.com/onebeyond/rascal/blob/master/README.md Define a publication named 'p1' that targets exchange 'e1' in vhost 'v1' with routing key 'foo'. ```json { "publications": { "p1": { "vhost": "v1", "exchange": "e1", "routingKey": "foo" } } } ``` -------------------------------- ### Configure Concurrency for Vhost Initialization Source: https://github.com/onebeyond/rascal/blob/master/README.md Set the `concurrency` attribute per vhost to specify the number of channels to create during initialization. This can improve startup time for a high number of exchanges, queues, and bindings. ```json { "vhosts": { "v1": { "concurrency": 10 } } } ``` -------------------------------- ### Enable Vhost Assertion Source: https://github.com/onebeyond/rascal/blob/master/README.md Set `assert` to `true` to have Rascal create the vhost using the RabbitMQ management API if it does not exist. Requires the management plugin and appropriate permissions. ```json { "vhosts": { "v1": { "assert": true } } } ``` -------------------------------- ### Publish Message Asynchronously with Options (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication with async/await, providing an options object for routing key and message options. ```javascript await broker.publish('p1', 'some message', { routingKey: 'some.routing.key', options: { messageId: 'foo', expiration: 5000 }, }); ``` -------------------------------- ### Purge and UnsubscribeAll (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Asynchronously unsubscribe all consumers and purge queues using await. This offers a cleaner syntax for managing sequential asynchronous cleanup tasks. ```javascript afterEach(async () => { await broker.unsubscribeAll(); await broker.purge(); }); after(async () => { await broker.nuke(); }); ``` -------------------------------- ### Bounce Broker (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Asynchronously restart the broker using broker.bounce with async/await. This provides a modern syntax for handling the broker restart operation. ```javascript beforeEach(async () => { await broker.bounce(); }); ``` -------------------------------- ### Subscribe with Options (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribes to 's1' with specific options for prefetch count and retry behavior, using a callback function. ```javascript broker.subscribe('s1', { prefetch: 10, retry: false }, callback); ``` -------------------------------- ### Purge and UnsubscribeAll (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Perform a quicker tear down by unsubscribing all consumers and purging queues using async.series with callbacks. This is an alternative to nuke for faster test execution. ```javascript afterEach((done) => { async.series([broker.unsubscribeAll, broker.purge], done); }); after((done) => { broker.nuke(done); }); ``` -------------------------------- ### Subscribe with Options (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribes to 's1' using async/await, providing options for prefetch count and retry behavior. ```javascript await subscription = broker.subscribe("s1", { prefetch: 10, retry: false }) ``` -------------------------------- ### Enable Vhost Check Source: https://github.com/onebeyond/rascal/blob/master/README.md Set `check` to `true` to have Rascal verify the existence of the vhost using the RabbitMQ management API. Requires the management plugin and appropriate permissions. ```json { "vhosts": { "v1": { "check": true } } } ``` -------------------------------- ### Create Rascal Broker Instance (Callbacks) Source: https://context7.com/onebeyond/rascal/llms.txt Use `Rascal.Broker.create` with a callback function for traditional Node.js callback-style error handling. The callback receives an error and the broker instance. ```javascript Rascal.Broker.create(Rascal.withDefaultConfig(config), (err, broker) => { if (err) throw err; broker.on('error', console.error); // Use broker... }); ``` -------------------------------- ### Run Rascal Tests Source: https://github.com/onebeyond/rascal/blob/master/README.md Execute the project's tests using npm test. Ensure a local RabbitMQ server is running with default configuration, or use the provided Docker command. ```bash npm test ``` -------------------------------- ### Configure Publication with Confirmation Source: https://github.com/onebeyond/rascal/blob/master/README.md Enable message confirmation for publication 'p1' by setting 'confirm' to true. This ensures messages are confirmed by the broker before the 'success' event is emitted. ```json { "publications": { "p1": { "exchange": "e1", "vhost": "v1", "confirm": true } } } ``` -------------------------------- ### Direct Connection to Vhost Source: https://context7.com/onebeyond/rascal/llms.txt Establishes a direct connection to a specific virtual host for advanced operations, allowing the use of the underlying amqplib connection. ```javascript // Connect directly to vhost for advanced operations const connection = await broker.connect('/'); // Use amqplib connection directly... ``` -------------------------------- ### Basic Subscription Configuration Source: https://github.com/onebeyond/rascal/blob/master/README.md Defines a subscription named 's1' targeting queue 'e1' on vhost 'v1'. This is the fundamental configuration for receiving messages. ```json { "subscriptions": { "s1": { "queue": "e1", "vhost": "v1" } } } ``` -------------------------------- ### Publish Message Asynchronously with Routing Key (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication with async/await, specifying a custom routing key. ```javascript await broker.publish('p1', 'some message', 'some.routing.key'); ``` -------------------------------- ### Load Rascal Configuration with Default Settings Source: https://github.com/onebeyond/rascal/blob/master/README.md Load Rascal configuration by merging custom definitions with sensible defaults for production environments. ```javascript var rascal = require('rascal'); var definitions = require('./your-config.json'); var config = rascal.withDefaultConfig(definitions); ``` -------------------------------- ### Configure Exchange Type and Options Source: https://github.com/onebeyond/rascal/blob/master/README.md Specify the exchange type (direct, topic, headers, fanout) and additional options like durability. ```json { "vhosts": { "v1": { "exchanges": { "e1": { "type": "fanout", "options": { "durable": false } } } } } } ``` -------------------------------- ### Handle vhost_initialised Event in Rascal Source: https://github.com/onebeyond/rascal/blob/master/README.md Listen for the 'vhost_initialised' event to log vhost name and connection details after a connection error recovery. ```javascript broker.on('vhost_initialised', ({ vhost, connectionUrl }) => { console.log(`Vhost: ${vhost} was initialised using connection: ${connectionUrl}`); }); ``` -------------------------------- ### Configure Multiple Broker Connections with Strategy Source: https://github.com/onebeyond/rascal/blob/master/README.md Specify an array of connection URLs for resilience. Rascal orders them by the connection strategy (e.g., 'random', 'fixed') and attempts to connect until successful. ```json { "vhosts": { "v1": { "connectionStrategy": "random", "connections": ["amqp://guest:guest@broker1.example.com:5672/v1?heartbeat=10", "amqp://guest:guest@broker2.example.com:5672/v1?heartbeat=10", "amqp://guest:guest@broker3.example.example.com:5672/v1?heartbeat=10"] } } } ``` -------------------------------- ### Basic Rascal Configuration Schema Source: https://github.com/onebeyond/rascal/blob/master/README.md A minimal Rascal configuration defining vhosts, connection, exchanges, queues, bindings, publications, and subscriptions. ```json { "$schema": "./node_modules/rascal/lib/config/schema.json", "vhosts": { "/": { "connection": { "url": "amqp://user:password@broker.example.com:5742/" }, "exchanges": ["demo_ex"], "queues": ["demo_q"], "bindings": ["demo_ex[a.b.c] -> demo_q"], "publications": { "demo_pub": { "exchange": "demo_ex", "routingKey": "a.b.c" } }, "subscriptions": { "demo_sub": { "queue": "demo_q", "prefetch": 3 } } } } } ``` -------------------------------- ### Publish Message with Routing Key and Callback Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication, specifying a custom routing key and a callback function for handling the result. ```javascript broker.publish('p1', 'some message', 'some.routing.key', callback); ``` -------------------------------- ### Handle Publication Pausing and Aborting (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Handle publication pausing and aborting using async/await syntax. Listen for 'paused' and call `publication.abort()` to drop messages during connection interruptions. ```javascript try { const publication = await broker.publish('p1', 'some message'); publication .on('success', (messageId) => { console.log('Message id was: ', messageId); }) .on('error', (err, messageId) => { console.error('Error was: ', err.message); }) .on('paused', (messageId) => { console.warn('Publication was paused. Aborting message: ', messageId); publication.abort(); }); } catch (err) { // publication didn't exist } ``` -------------------------------- ### Subscribe to All Subscriptions (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribes to all available subscriptions using async/await. This method simplifies handling multiple subscriptions asynchronously. ```javascript try { const subscriptions = await broker.subscribeAll(); subscriptions.forEach((subscription) => { subscription .on('message', (message, content, ackOrNack) => { // Do stuff with message }) .on('error', (err) => { console.error('Subscriber error', err); }); }); } catch (err) { // One or more subscriptions didn't exist } ``` -------------------------------- ### Define a Basic Exchange-to-Queue Binding Source: https://github.com/onebeyond/rascal/blob/master/README.md Configure a binding between an exchange and a queue. By default, destinationType is 'queue' and bindingKey is '#'. ```json { "vhosts": { "v1": { "exchanges": { "e1": {} }, "queues": { "q1": {} }, "bindings": { "b1": { "source": "e1", "destination": "q1", "destinationType": "queue", "bindingKey": "foo" } } } } } ``` -------------------------------- ### Rascal Configuration Shorthand Source: https://github.com/onebeyond/rascal/blob/master/README.md Use shorthand notation to simplify Rascal configuration for exchanges, queues, and bindings. This reduces verbosity compared to the full object notation. ```json { "exchanges": { "e1": {}, "e2": {} }, "queues": { "q1": {}, "q2": {} }, "bindings": { "b1": { "source": "e1", "destination": "q1" }, "b2": { "source": "e2", "destination": "q2", "bindingKeys": ["bk1", "bk2"] } } } ``` ```json { "exchanges": ["e1", "e2"], "queues": ["q1", "q2"], "bindings": ["e1 -> q1", "e2[bk1, bk2] -> q2"] } ``` -------------------------------- ### Handle Publication Events with Async/Await Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using async/await and attach event listeners to the returned publication object for success, error, and return events. ```javascript try { const publication = await broker.publish('p1', 'some message'); publication .on('success', (messageId) => { console.log('Message id was: ', messageId); }) .on('error', (err, messageId) => { console.error('Error was: ', err.message); }) .on('return', (message) => { console.warn('Message was returned: ', message.properties.messageId); }); } catch (err) { // publication didn't exist } ``` -------------------------------- ### Publish Message with Options Object Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication, providing an options object that includes a custom routing key and additional message options. ```javascript broker.publish('p1', 'some message', { routingKey: 'some.routing.key', options: { messageId: 'foo', expiration: 5000 }, }); ``` -------------------------------- ### Load Rascal Configuration with Test Settings Source: https://github.com/onebeyond/rascal/blob/master/README.md Load Rascal configuration by merging custom definitions with defaults optimized for testing environments. ```javascript var rascal = require('rascal'); var definitions = require('./your-test-config.json'); var config = rascal.withTestConfig(definitions); ``` -------------------------------- ### Configure Rascal Cluster Connections Source: https://context7.com/onebeyond/rascal/llms.txt Set up multiple broker connections for high availability using 'random' or 'fixed' strategies. This snippet includes connection retry configurations and event monitoring for connection status. ```javascript const Rascal = require('rascal'); const config = { vhosts: { '/': { // Connection strategy: 'random' (default) or 'fixed' (active/passive) connectionStrategy: 'random', // Multiple connection configurations for clustering connections: [ { url: 'amqp://guest:guest@rabbit1.example.com:5672/', management: 'http://guest:guest@rabbit1.example.com:15672' }, { url: 'amqp://guest:guest@rabbit2.example.com:5672/', management: 'http://guest:guest@rabbit2.example.com:15672' }, { url: 'amqp://guest:guest@rabbit3.example.com:5672/', management: 'http://guest:guest@rabbit3.example.com:15672' } ], // Connection retry configuration connection: { retry: { min: 1000, max: 60000, factor: 2, strategy: 'exponential' }, options: { heartbeat: 10, connection_timeout: 10000, channelMax: 100 } }, exchanges: ['events'], queues: ['events_q'], bindings: ['events[#] -> events_q'], publications: { event: { exchange: 'events', confirm: true } }, subscriptions: { process_events: { queue: 'events_q', prefetch: 10 } } } } }; const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); // Monitor connection events broker.on('error', (err, { vhost, connectionUrl }) => { console.error(`Broker error on ${vhost} (${connectionUrl}):`, err.message); }); broker.on('vhost_initialised', ({ vhost, connectionUrl }) => { console.log(`Connected to ${vhost} via ${connectionUrl}`); }); broker.on('blocked', (reason, { vhost, connectionUrl }) => { console.warn(`Connection blocked: ${reason} on ${vhost} (${connectionUrl})`); }); broker.on('unblocked', ({ vhost, connectionUrl }) => { console.log(`Connection unblocked on ${vhost} (${connectionUrl})`); }); // Get current connection details const connections = broker.getConnections(); console.log('Active connections:', connections); // [{ vhost: '/', connectionUrl: 'amqp://guest:***@rabbit1.example.com:5672/' }] ``` -------------------------------- ### Configure a Queue as a RabbitMQ Stream Source: https://github.com/onebeyond/rascal/blob/master/README.md Use the 'x-queue-type' argument to configure a queue as a stream. This is suitable for scenarios tolerating occasional message loss and requiring higher throughput. ```json { "queues": { "q1": { "options": { "arguments": { "x-queue-type": "stream" } } } } } ``` -------------------------------- ### Configure Management Connection Details Source: https://github.com/onebeyond/rascal/blob/master/README.md Explicitly specify management connection details for vhosts, including hostname, user, password, and management API specific options. This is useful in test environments. ```json { "vhosts": { "v1": { "connection": { "hostname": "broker.example.com", "user": "bob", "password": "secret", "management": { "protocol": "https", "pathname": "prefix", "user": "admin", "password": "super-secret", "options": { "timeout": 1000 } } } } } } ``` -------------------------------- ### Configure Channel Pools and Flow Control in Rascal Source: https://context7.com/onebeyond/rascal/llms.txt This snippet demonstrates configuring channel pools for high-throughput publishing and setting up flow control event handlers to manage backpressure. It includes configuration for publication channel pools, concurrency, exchanges, queues, bindings, publications, and subscriptions. The code also shows how to handle 'busy' and 'ready' events for flow control and publish messages with backpressure awareness. ```javascript const Rascal = require('rascal'); const config = { vhosts: { '/': { // Configure channel pools for high-throughput publishing publicationChannelPools: { regularPool: { max: 10, min: 2, evictionRunIntervalMillis: 10000, idleTimeoutMillis: 60000, autostart: false }, confirmPool: { max: 20, min: 5, evictionRunIntervalMillis: 10000, idleTimeoutMillis: 60000, autostart: true // Pre-create channels on startup } }, // Concurrency for exchange/queue/binding assertions concurrency: 10, exchanges: ['high_volume'], queues: ['high_volume_q'], bindings: ['high_volume[#] -> high_volume_q'], publications: { fast_pub: { exchange: 'high_volume', confirm: true, timeout: 5000 } }, subscriptions: { fast_sub: { queue: 'high_volume_q', prefetch: 100 } } } } }; const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); broker.on('error', console.error); // Flow control - handle backpressure let paused = false; broker.on('busy', ({ vhost, mode, queue, size, available, borrowed, min, max }) => { console.warn(`Channel pool busy: ${vhost} (${mode}) - ${borrowed}/${max} borrowed`); paused = true; // Pause upstream data source }); broker.on('ready', ({ vhost, mode, queue, size, available, borrowed, min, max }) => { console.log(`Channel pool ready: ${vhost} (${mode}) - ${available} available`); paused = false; // Resume upstream data source }); // High-throughput publishing with backpressure handling async function publishBatch(messages) { for (const msg of messages) { if (paused) { await new Promise(resolve => broker.once('ready', resolve)); } const publication = await broker.publish('fast_pub', msg); publication .on('paused', (messageId) => { console.warn(`Publication paused for message: ${messageId}`); // Optionally abort: publication.abort(); }) .on('error', (err, messageId) => { console.error(`Publish failed for ${messageId}:`, err.message); }); } } ``` -------------------------------- ### Configure Rascal Connection using URL Source: https://github.com/onebeyond/rascal/blob/master/README.md Specify a RabbitMQ connection using a simple AMQP URL, including optional query parameters like heartbeat. ```json { "vhosts": { "v1": { "connection": "amqp://guest:guest@broker.example.com:5672/v1?heartbeat=10" } } } ``` -------------------------------- ### Publish Message with Callback Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication and provide a callback function to handle the publication result. The callback receives an error if the publication doesn't exist. ```javascript broker.publish('p1', 'some message', callback); ``` -------------------------------- ### Configure Rascal Connection with Individual Details Source: https://github.com/onebeyond/rascal/blob/master/README.md Define connection parameters individually, including protocol, hostname, user, password, vhost, and options for heartbeat and socket timeouts. ```json { "vhosts": { "v1": { "connection": { "slashes": true, "protocol": "amqp", "hostname": "localhost", "user": "guest", "password": "guest", "port": 5672, "vhost": "v1", "options": { "heartbeat": 5 }, "socketOptions": { "timeout": 10000 } } } } } ``` -------------------------------- ### Rascal Configuration with Mixed Notation Source: https://github.com/onebeyond/rascal/blob/master/README.md Mix string and object notation within configuration to specify parameters for exchanges, queues, or bindings when needed. This allows for concise definitions while enabling specific parameter overrides. ```json { "exchanges": { "e1": {}, "e2": { "type": "fanout" } } } ``` ```json { "exchanges": [ "e1", { "name": "e2", "type": "fanout" } ] } ``` -------------------------------- ### Define Queue Options with Arguments in Rascal Source: https://github.com/onebeyond/rascal/blob/master/README.md Extend queue configuration by adding specific arguments to the 'options' block, such as message time-to-live ('x-message-ttl') or queue mode ('x-queue-mode'). Refer to amqplib documentation for a comprehensive list of available queue options. ```json { "queues": { "q1": { "options": { "durable": false, "arguments": { "x-message-ttl": 65000, "x-queue-mode": "lazy" } } } } } ``` -------------------------------- ### Publisher Error Handling with Async/Await Source: https://github.com/onebeyond/rascal/blob/master/README.md Implement publisher error handling with async/await. This pattern is useful for managing errors during asynchronous message publishing operations. ```javascript // Async/Await try { const publication = await broker.publish('p1', 'some text'); publication.on('error', (err, messageId) => { console.error('Publisher error', err, messageId); }); } catch (err) { throw new Error(`Rascal config error: ${err.message}`); } ``` -------------------------------- ### Publish Message with Runtime Overrides Source: https://context7.com/onebeyond/rascal/llms.txt Override publication settings like routing key or options at runtime during the `broker.publish` call. Supports publishing plain text or buffer content. ```javascript // Publish with runtime routing key override await broker.publish('new_order', order, 'order.priority.created'); // Publish with options override await broker.publish('new_order', order, { routingKey: 'order.express.created', options: { messageId: 'custom-id-123', expiration: 60000, headers: { priority: 'high' } } }); // Publish text message (content-type: text/plain) await broker.publish('new_order', 'Plain text message'); // Publish buffer (content-type: application/octet-stream) await broker.publish('new_order', Buffer.from('binary data')); ``` -------------------------------- ### Configure and Use Message Encryption Source: https://context7.com/onebeyond/rascal/llms.txt Set up AES encryption profiles for automatic message encryption and decryption. Ensure the 'encryption' profile name in the publication matches a defined profile in the configuration. ```javascript const Rascal = require('rascal'); const config = { vhosts: { '/': { exchanges: ['secure_ex'], queues: ['secure_q'], bindings: ['secure_ex[#] -> secure_q'], publications: { secure_pub: { exchange: 'secure_ex', routingKey: 'secure.message', confirm: true, encryption: 'aes-256-v1' // Reference to encryption profile } }, subscriptions: { secure_sub: { queue: 'secure_q', prefetch: 10 // No encryption config needed - uses profile from message headers } } } }, encryption: { 'aes-256-v1': { key: 'f81db52a3b2c717fe65d9a3b7dd04d2a08793e1a28e3083db3ea08db56e7c315', ivLength: 16, algorithm: 'aes-256-cbc' }, // Support multiple encryption versions for key rotation 'aes-256-v2': { key: 'a1b2c3d4e5f6789012345678901234567890123456789012345678901234abcd', ivLength: 16, algorithm: 'aes-256-cbc' } } }; const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); broker.on('error', console.error); // Publish encrypted message const sensitiveData = { userId: '12345', ssn: '123-45-6789', creditCard: '4111111111111111' }; const publication = await broker.publish('secure_pub', sensitiveData); publication.on('success', (messageId) => { console.log('Encrypted message published:', messageId); }); // Subscribe - message is automatically decrypted const subscription = await broker.subscribe('secure_sub'); subscription .on('message', (message, content, ackOrNack) => { // content is automatically decrypted and parsed console.log('Decrypted content:', content); // { userId: '12345', ssn: '123-45-6789', creditCard: '4111111111111111' } // Encryption metadata available in headers const encryptionProfile = message.properties.headers.rascal.encryption.name; console.log('Encrypted with profile:', encryptionProfile); ackOrNack(); }) .on('error', console.error); ``` -------------------------------- ### Publish Message Asynchronously (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message using the 'p1' publication with async/await syntax. This is the modern approach for handling asynchronous operations. ```javascript await broker.publish('p1', 'some message'); ``` -------------------------------- ### Dynamically Adjust Channel Prefetch Source: https://github.com/onebeyond/rascal/blob/master/README.md Use `setChannelPrefetch` to adjust prefetch dynamically after subscription. Ensure regular consumer prefetch is zero to avoid conflicts. ```javascript broker.subscribe('s1', { prefetch: 0, channelPrefetch: 5 }, (err, subscription) => { if (err) throw err; subscription.on('message', (message, content, ackOrNack) => { ackOrNack(); const prefetch = tunePrefetch(); subscription.setChannelPrefetch(prefetch, (err) => { if (err) throw err; }); }); }); ``` ```javascript const subscription = await broker.subscribe('s1', { prefetch: 0, channelPrefetch: 5 }); subscription.on('message', (message, content, ackOrNack) => { ackOrNack(); const prefetch = tunePrefetch(); await subscription.setChannelPrefetch(prefetch); }); ``` -------------------------------- ### Define Default Broker Configuration Source: https://github.com/onebeyond/rascal/blob/master/README.md Specify default settings for exchanges, queues, bindings, publications, and subscriptions within the 'defaults' sub-document of the configuration. ```json { "defaults": { "vhosts": { "exchanges": { "assert": true, "type": "topic" }, "queues": { "assert": true }, "bindings": { "destinationType": "queue", "bindingKey": "#" } }, "publications": { "vhost": "/", "confirm": true, "options": { "persistent": true } }, "subscriptions": { "vhost": "/", "prefetch": 10, "retry": { "delay": 1000 }, "redeliveries": { "counter": { "size": 1000 }, "limit": 1000 } } } } ``` -------------------------------- ### Define a Binding with Multiple Binding Keys Source: https://github.com/onebeyond/rascal/blob/master/README.md Specify an array of binding keys using 'bindingKey' or 'bindingKeys' to bind a destination to the same source with multiple routing keys. ```json { "vhosts": { "v1": { "exchanges": { "e1": {} }, "queues": { "q1": {} }, "bindings": { "b1": { "source": "e1", "destination": "q1", "destinationType": "queue", "bindingKeys": ["foo", "bar"] } } } } } ``` -------------------------------- ### Configure Rascal Broker with Error Recovery Source: https://context7.com/onebeyond/rascal/llms.txt Sets up a Rascal broker with detailed configuration for exchanges, queues, bindings, publications, subscriptions, and sophisticated error recovery strategies like deferred retries and immediate dead-lettering. ```javascript const Rascal = require('rascal'); const config = { vhosts: { '/': { exchanges: ['service', 'delay', 'retry', 'dead_letters'], queues: { 'orders_q': { options: { arguments: { 'x-dead-letter-exchange': 'dead_letters', 'x-dead-letter-routing-key': 'orders.dead' } } }, 'delay:5m': { options: { arguments: { 'x-message-ttl': 300000, 'x-dead-letter-exchange': 'retry' } } }, 'dead_letters_q': {} }, bindings: [ 'service[order.#] -> orders_q', 'delay[delay.5m] -> delay:5m', 'retry[orders_q.#] -> orders_q', 'dead_letters[orders.dead] -> dead_letters_q' ], publications: { retry_in_5m: { exchange: 'delay', options: { CC: ['delay.5m'] } } }, subscriptions: { process_orders: { queue: 'orders_q', prefetch: 10 } } } }, recovery: { deferred_retry: [ { strategy: 'forward', attempts: 5, publication: 'retry_in_5m', xDeathFix: true }, { strategy: 'nack' } ], immediate_dead_letter: [ { strategy: 'republish', immediateNack: true } ] } }; const broker = await Rascal.BrokerAsPromised.create(Rascal.withDefaultConfig(config)); broker.on('error', console.error); ``` -------------------------------- ### Enable Namespace for Queues and Exchanges Source: https://github.com/onebeyond/rascal/blob/master/README.md Use namespace to prefix queues and exchanges with a UUID for test isolation. Can also specify a custom namespace string. ```json { "vhosts": { "v1": { "namespace": true } } } ``` -------------------------------- ### Handle Publication Events with Callback Source: https://github.com/onebeyond/rascal/blob/master/README.md Publish a message and attach event listeners to the publication object to handle success, error, and return events. This is used when providing a callback to broker.publish. ```javascript broker.publish('p1', 'some message', (err, publication) => { if (err) throw err; // publication didn't exist publication .on('success', (messageId) => { console.log('Message id was: ', messageId); }) .on('error', (err, messageId) => { console.error('Error was: ', err.message); }) .on('return', (message) => { console.warn('Message was returned: ', message.properties.messageId); }); }); ``` -------------------------------- ### Subscribe to a Single Subscription (Async/Await) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribes to a single subscription 's1' using async/await. This approach simplifies asynchronous operations and error handling. ```javascript try { const subscription = await broker.subscribe('s1'); subscription .on('message', (message, content, ackOrNack) => { // Do stuff with message }) .on('error', (err) => { console.error('Subscriber error', err); }); } catch (err) { // subscription didn't exist } ``` -------------------------------- ### Configure Exchange Check Mode Source: https://github.com/onebeyond/rascal/blob/master/README.md Set assert to false and check to true to validate exchange existence without creating it on initialization. ```json { "vhosts": { "v1": { "exchanges": { "e1": { "assert": false, "check": true } } } } } ``` -------------------------------- ### Subscription Configuration with Content Type Source: https://github.com/onebeyond/rascal/blob/master/README.md Configures a subscription 's1' to expect JSON content. Ensure the publisher also sets the 'contentType' to 'application/json' for proper parsing. ```json { "subscriptions": { "s1": { "queue": "e1", "vhost": "v1", "contentType": "application/json" } } } ``` -------------------------------- ### Forward Message with Callback Source: https://github.com/onebeyond/rascal/blob/master/README.md Use this method to forward a message to a publication using a callback for handling results. Ensure to handle potential errors, such as the publication not existing. ```javascript broker.forward('p1', message, overrides, (err, publication) => { if (err) throw err; // publication didn't exist publication .on('success', (messageId) => { console.log('Message id was: ', messageId); }) .on('error', (err, messageId) => { console.error('Error was: ', err.message); }) .on('return', (message) => { console.warn('Message was returned: ', message.properties.messageId); }); }); ``` -------------------------------- ### Subscribe to All Subscriptions (Callback) Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribes to all available subscriptions using a callback. It iterates through each subscription to attach message and error handlers. ```javascript broker.subscribeAll((err, subscriptions) => { if (err) throw err; // one or more subscriptions didn't exist subscriptions.forEach((subscription) => { subscription .on('message', (message, content, ackOrNack) => { // Do stuff with message }) .on('error', (err) => { console.error('Subscriber error', err); }); }); }); ``` -------------------------------- ### Subscribe to a RabbitMQ Stream with Offset Tracking Source: https://github.com/onebeyond/rascal/blob/master/README.md Subscribe to a stream, loading and updating consumer offsets to handle application restarts and potential message loss. Ensure proper error handling and offset management. ```javascript const initialOffset = (await loadOffset('/my-queue')) || 'first'; const overrides = { options: { arguments: { 'x-stream-offset': initialOffset } } }; const subscription = await broker.subscribe('/my-queue', overrides); subscription.on('message', async (message, content, ackOrNack) => { const currentOffset = message.properties.headers['x-stream-offset']; try { await handleMessage(content); await updateOffset('/my-queue', currentOffset); } catch (err) { await handleError('/my-queue', currentOffset, err); } finally { ackOrNack(); // Streams do not support nack so do not pass the error argument } }); ```