### Install @boringnode/bus Package Source: https://github.com/boringnode/bus/blob/0.x/README.md Installs the @boringnode/bus package using npm. This is the first step to integrate the service bus into your Node.js project. ```bash npm install @boringnode/bus ``` -------------------------------- ### Subscribe and Publish Messages (TypeScript) Source: https://github.com/boringnode/bus/blob/0.x/README.md Shows how to subscribe to a channel and publish messages using the BusManager. This example covers basic message handling within the service bus. ```typescript manager.subscribe('channel', (message) => { console.log('Received message', message) }) manager.publish('channel', 'Hello world') ``` -------------------------------- ### Configure MQTT Transport for BoringNode Bus Source: https://context7.com/boringnode/bus/llms.txt Shows how to set up the MQTT transport for IoT and lightweight messaging, supporting MQTT, MQTTS, and WebSocket protocols. Includes examples for basic and secure connections, and integration with BusManager. ```typescript import { Bus } from '@boringnode/bus' import { MqttTransport, mqtt } from '@boringnode/bus/transports/mqtt' import { MqttProtocol } from '@boringnode/bus/types/main' // Basic MQTT connection const transport = new MqttTransport({ host: 'broker.hivemq.com', port: 1883, protocol: MqttProtocol.MQTT, }) // Secure MQTT with options const secureTransport = new MqttTransport({ host: 'secure-broker.example.com', port: 8883, protocol: MqttProtocol.MQTTS, options: { username: 'user', password: 'pass', clientId: 'my-client-id', clean: true, keepalive: 60, }, }) // WebSocket connection (for browser compatibility) const wsTransport = new MqttTransport({ host: 'broker.example.com', port: 8080, protocol: MqttProtocol.WS, }) // Using with BusManager const manager = new BusManager({ default: 'iot', transports: { iot: { transport: mqtt({ host: 'localhost', port: 1883, protocol: MqttProtocol.MQTT, }), }, }, }) // Publish sensor data interface SensorReading { sensorId: string value: number unit: string } await manager.subscribe('sensors/temperature', (reading) => { console.log(`Sensor ${reading.sensorId}: ${reading.value}${reading.unit}`) }) await manager.publish('sensors/temperature', { sensorId: 'temp-001', value: 23.5, unit: '°C', }) await manager.disconnect() ``` -------------------------------- ### Initialize BusManager with Transports (TypeScript) Source: https://github.com/boringnode/bus/blob/0.x/README.md Demonstrates how to initialize the BusManager with different transports: memory, Redis, and Mqtt. It shows configuration for each transport, including connection details for Redis and Mqtt. ```typescript import { BusManager } from '@boringnode/bus' import { redis } from '@boringnode/bus/transports/redis' import { mqtt } from '@boringnode/bus/transports/mqtt' import { memory } from '@boringnode/bus/transports/memory' const manager = new BusManager({ default: 'main', transports: { main: { transport: memory(), }, redis: { transport: redis({ host: 'localhost', port: 6379, }), }, mqtt: { transport: mqtt({ host: 'localhost', port: 1883, }), }, } }) ``` -------------------------------- ### Configure and Use BusManager with Multiple Transports (TypeScript) Source: https://context7.com/boringnode/bus/llms.txt Demonstrates how to set up and utilize the `BusManager` to manage multiple transport configurations (Redis, MQTT, Memory). It shows subscribing, publishing, switching transports, and disconnecting. ```typescript import { BusManager } from '@boringnode/bus' import { redis } from '@boringnode/bus/transports/redis' import { mqtt } from '@boringnode/bus/transports/mqtt' import { memory } from '@boringnode/bus/transports/memory' // Create a manager with multiple transports const manager = new BusManager({ default: 'redis', transports: { memory: { transport: memory(), }, redis: { transport: redis({ host: 'localhost', port: 6379, }), retryQueue: { enabled: true, retryInterval: '100ms', maxSize: 1000, removeDuplicates: true, }, }, mqtt: { transport: mqtt({ host: 'localhost', port: 1883, }), }, }, }) // Subscribe to a channel using the default transport (redis) await manager.subscribe('notifications', (message) => { console.log('Received notification:', message) }) // Publish using the default transport await manager.publish('notifications', { type: 'alert', content: 'New message!' }) // Use a specific transport await manager.use('mqtt').publish('sensors', { temperature: 22.5 }) // Unsubscribe from a channel await manager.unsubscribe('notifications') // Disconnect await manager.disconnect() ``` -------------------------------- ### DefineConfig Helper for Type-Safe Bus Configuration (TypeScript) Source: https://context7.com/boringnode/bus/llms.txt The `defineConfig` helper enhances TypeScript type inference for BusManager configuration, providing autocomplete and type checking for transport names. It allows defining multiple transport configurations with specific retry logic and connection details. ```typescript import { BusManager, defineConfig } from '@boringnode/bus' import { redis } from '@boringnode/bus/transports/redis' import { memory } from '@boringnode/bus/transports/memory' // Define configuration with full type inference const config = defineConfig({ default: 'production', transports: { production: { transport: redis({ host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), }), retryQueue: { retryInterval: '1s', maxSize: 10000, }, }, test: { transport: memory(), }, }, }) // Create manager with typed configuration const manager = new BusManager(config) // TypeScript knows valid transport names: 'production' | 'test' manager.use('production').publish('events', { data: 'hello' }) manager.use('test').subscribe('events', console.log) ``` -------------------------------- ### Create Bus Directly with Redis Transport (TypeScript) Source: https://github.com/boringnode/bus/blob/0.x/README.md Demonstrates creating a single `Bus` instance directly without using the `BusManager`. This approach is suitable when only one bus instance is needed, configuring it with a Redis transport and retry queue options. ```typescript import { Bus } from '@boringnode/bus' import { RedisTransport } from '@boringnode/bus/transports/redis' const transport = new RedisTransport({ host: 'localhost', port: 6379, }) const bus = new Bus(transport, { retryQueue: { retryInterval: '100ms' } }) ``` -------------------------------- ### Using Memory Transport for Testing with BusManager and Direct Bus (TypeScript) Source: https://context7.com/boringnode/bus/llms.txt Shows how to use the in-memory transport for testing purposes, both with `BusManager` and by instantiating `MemoryTransport` and `Bus` directly. It demonstrates message passing between separate bus instances and accessing received messages. ```typescript import { Bus } from '@boringnode/bus' import { MemoryTransport, memory } from '@boringnode/bus/transports/memory' // Using the factory function with BusManager const manager = new BusManager({ default: 'test', transports: { test: { transport: memory(), }, }, }) // Or using the class directly const transport1 = new MemoryTransport() const transport2 = new MemoryTransport() const bus1 = new Bus(transport1) const bus2 = new Bus(transport2) // Messages from bus2 are received by bus1 await bus1.subscribe('chat', (message) => { console.log('Bus1 received:', message) }) await bus2.publish('chat', 'Hello from bus2!') // Output: Bus1 received: Hello from bus2! // Access received messages (useful for testing) console.log(transport1.receivedMessages) // Output: ['Hello from bus2!'] await bus1.disconnect() await bus2.disconnect() ``` -------------------------------- ### Configure Redis Transport for BoringNode Bus Source: https://context7.com/boringnode/bus/llms.txt Demonstrates setting up the Redis transport for message passing using configuration objects, connection strings, or existing ioredis instances. It supports type-safe message subscriptions and publishing. ```typescript import { Bus } from '@boringnode/bus' import { RedisTransport, redis } from '@boringnode/bus/transports/redis' import Redis from 'ioredis' // Option 1: Using configuration object const transport1 = new RedisTransport({ host: 'localhost', port: 6379, password: 'secret', db: 0, useMessageBuffer: false, // Set to true for binary data }) // Option 2: Using connection string const transport2 = new RedisTransport('redis://:secret@localhost:6379/0') // Option 3: Using existing ioredis instance const existingClient = new Redis({ host: 'localhost', port: 6379 }) const transport3 = new RedisTransport(existingClient) // Option 4: Using factory with BusManager const manager = new BusManager({ default: 'main', transports: { main: { transport: redis({ host: 'localhost', port: 6379, password: 'secret', }), retryQueue: { retryInterval: '1s', }, }, }, }) // Example with type-safe messages interface OrderEvent { orderId: string status: 'created' | 'shipped' | 'delivered' timestamp: number } await manager.subscribe('orders', (event) => { console.log(`Order ${event.orderId} is now ${event.status}`) }) await manager.publish('orders', { orderId: 'order-456', status: 'shipped', timestamp: Date.now(), }) await manager.disconnect() ``` -------------------------------- ### Use Specific Transport for Publishing (TypeScript) Source: https://github.com/boringnode/bus/blob/0.x/README.md Illustrates how to explicitly select a transport (e.g., Redis or Mqtt) for publishing messages using the `use` method of the BusManager. This allows for targeted message delivery. ```typescript manager.use('redis').publish('channel', 'Hello world') manager.use('mqtt').publish('channel', 'Hello world') ``` -------------------------------- ### Configure BusManager with Retry Queue (TypeScript) Source: https://github.com/boringnode/bus/blob/0.x/README.md Shows how to configure the `BusManager` with a retry queue for a specific transport. This ensures that messages that fail to publish are retried at a defined interval. ```typescript const manager = new BusManager({ default: 'main', transports: { main: { transport: redis({ host: 'localhost', port: 6379, }), retryQueue: { retryInterval: '100ms' } }, } }) manager.use('redis').publish('channel', 'Hello World') ``` -------------------------------- ### Direct Bus Usage with Redis Transport and Retry Queue (TypeScript) Source: https://context7.com/boringnode/bus/llms.txt Illustrates direct usage of the `Bus` class with a Redis transport, including configuring a retry queue for failed messages. It covers subscribing, publishing, manual retry processing, and disconnecting. ```typescript import { Bus } from '@boringnode/bus' import { RedisTransport } from '@boringnode/bus/transports/redis' // Create a Redis transport const transport = new RedisTransport({ host: 'localhost', port: 6379, }) // Create a bus instance with retry queue const bus = new Bus(transport, { retryQueue: { enabled: true, retryInterval: '500ms', maxSize: 500, removeDuplicates: true, }, }) // Subscribe to receive messages await bus.subscribe<{ userId: string; action: string }>('user-events', (payload) => { console.log(`User ${payload.userId} performed ${payload.action}`) }) // Publish a message const success = await bus.publish('user-events', { userId: 'user-123', action: 'login', }) console.log('Message published:', success) // true if sent, false if queued for retry // Manually process retry queue await bus.processErrorRetryQueue() // Get retry queue status const queueSize = bus.getRetryQueue().size() console.log('Messages in retry queue:', queueSize) // Unsubscribe from channel await bus.unsubscribe('user-events') // Disconnect from transport await bus.disconnect() ``` -------------------------------- ### Simulate Bus Errors with ChaosTransport Source: https://github.com/boringnode/bus/blob/0.x/README.md Demonstrates how to use ChaosTransport to create a bus that intermittently throws errors, useful for testing application resilience. It wraps an existing transport (like MemoryTransport) and can be configured to always throw errors. ```typescript import { Bus } from '@boringnode/bus' import { ChaosTransport } from '@boringnode/bus/test_helpers' const buggyTransport = new ChaosTransport(new MemoryTransport()) const bus = new Bus(buggyTransport) /** * Now, every time you will try to publish a message, the transport * will throw an error. */ buggyTransport.alwaysThrow() ``` -------------------------------- ### Configure Retry Queue for BoringNode Bus Source: https://context7.com/boringnode/bus/llms.txt Explains how to configure the retry queue for automatic handling of failed message deliveries. Covers enabling/disabling, duplicate removal, maximum size, and retry intervals, along with manual processing and information retrieval. ```typescript import { Bus } from '@boringnode/bus' import { MemoryTransport } from '@boringnode/bus/transports/memory' // Full retry queue configuration const bus = new Bus(new MemoryTransport(), { retryQueue: { // Enable or disable the retry queue (default: true) enabled: true, // Remove duplicate messages from the queue (default: true) removeDuplicates: true, // Maximum number of messages in the queue (default: null = unlimited) maxSize: 1000, // Automatic retry interval (default: false = manual only) // Accepts milliseconds or duration strings: '100ms', '1s', '5m' retryInterval: '500ms', }, }) // Manually trigger retry queue processing await bus.processErrorRetryQueue() // Get retry queue information const retryQueue = bus.getRetryQueue() console.log('Queue size:', retryQueue.size()) console.log('Queue options:', retryQueue.getOptions()) // Output: { enabled: true, maxSize: 1000, removeDuplicates: true } await bus.disconnect() ``` -------------------------------- ### Redis Transport Source: https://context7.com/boringnode/bus/llms.txt The Redis transport leverages Redis pub/sub for distributed message passing. It allows connections through configuration options, connection strings, or existing ioredis instances, automatically managing separate publisher and subscriber connections. ```APIDOC ## Redis Transport ### Description Uses Redis pub/sub for distributed message passing. Supports connection via configuration options, connection strings, or existing ioredis instances. Creates separate publisher and subscriber connections automatically. ### Method `new RedisTransport(options | connectionString | ioredisInstance)` ### Endpoint N/A (Transport Layer) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```typescript import { RedisTransport } from '@boringnode/bus/transports/redis' // Option 1: Using configuration object const transport1 = new RedisTransport({ host: 'localhost', port: 6379, password: 'secret', db: 0, useMessageBuffer: false }) // Option 2: Using connection string const transport2 = new RedisTransport('redis://:secret@localhost:6379/0') // Option 3: Using existing ioredis instance import Redis from 'ioredis' const existingClient = new Redis({ host: 'localhost', port: 6379 }) const transport3 = new RedisTransport(existingClient) ``` ### Response #### Success Response (200) N/A (Transport Layer) #### Response Example N/A ``` -------------------------------- ### MQTT Transport Source: https://context7.com/boringnode/bus/llms.txt The MQTT transport facilitates connections to MQTT brokers, suitable for IoT and lightweight messaging. It supports various protocols including MQTT, MQTTS, WebSocket, and secure WebSocket connections. ```APIDOC ## MQTT Transport ### Description Connects to MQTT brokers for IoT and lightweight messaging scenarios. Supports MQTT, MQTTS, WebSocket, and secure WebSocket connections. ### Method `new MqttTransport(options)` ### Endpoint N/A (Transport Layer) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```typescript import { MqttTransport } from '@boringnode/bus/transports/mqtt' import { MqttProtocol } from '@boringnode/bus/types/main' // Basic MQTT connection const transport = new MqttTransport({ host: 'broker.hivemq.com', port: 1883, protocol: MqttProtocol.MQTT }) // Secure MQTT with options const secureTransport = new MqttTransport({ host: 'secure-broker.example.com', port: 8883, protocol: MqttProtocol.MQTTS, options: { username: 'user', password: 'pass', clientId: 'my-client-id', clean: true, keepalive: 60 } }) // WebSocket connection const wsTransport = new MqttTransport({ host: 'broker.example.com', port: 8080, protocol: MqttProtocol.WS }) ``` ### Response #### Success Response (200) N/A (Transport Layer) #### Response Example N/A ``` -------------------------------- ### Retry Queue Configuration Source: https://context7.com/boringnode/bus/llms.txt The retry queue is a feature that automatically manages failed message deliveries by queuing messages and retrying them at configurable intervals. It includes options for duplicate detection, maximum queue size, and can be enabled or disabled per transport. ```APIDOC ## Retry Queue Configuration ### Description Automatically handles failed message deliveries by queuing messages and retrying at configurable intervals. Supports duplicate detection, maximum queue size limits, and can be enabled or disabled per transport. ### Method `new Bus(transport, options)` where `options.retryQueue` is configured. ### Endpoint N/A (Configuration Option) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```typescript import { Bus } from '@boringnode/bus' import { MemoryTransport } from '@boringnode/bus/transports/memory' const bus = new Bus(new MemoryTransport(), { retryQueue: { enabled: true, removeDuplicates: true, maxSize: 1000, retryInterval: '500ms' } }) // Manually trigger retry queue processing await bus.processErrorRetryQueue() // Get retry queue information const retryQueue = bus.getRetryQueue() console.log('Queue size:', retryQueue.size()) console.log('Queue options:', retryQueue.getOptions()) ``` ### Response #### Success Response (200) N/A (Configuration Option) #### Response Example N/A ``` -------------------------------- ### ChaosTransport for Testing Retry Logic (TypeScript) Source: https://context7.com/boringnode/bus/llms.txt The `ChaosTransport` is a test utility that wraps any transport, allowing the injection of failures to test retry logic and error handling. It's useful for simulating network issues or broker downtime. Failures can be consistently injected or turned off. ```typescript import { Bus } from '@boringnode/bus' import { MemoryTransport } from '@boringnode/bus/transports/memory' import { ChaosTransport } from '@boringnode/bus/test_helpers' // Wrap a transport with chaos injection const innerTransport = new MemoryTransport() const chaosTransport = new ChaosTransport(innerTransport) const bus = new Bus(chaosTransport, { retryQueue: { enabled: true, retryInterval: '100ms', }, }) // Make all publish operations fail chaosTransport.alwaysThrow() // This will fail and be added to retry queue const success = await bus.publish('channel', 'test message') console.log('Published:', success) // false // Check retry queue console.log('Queue size:', bus.getRetryQueue().size()) // 1 // Stop injecting failures chaosTransport.neverThrow() // Wait for retry interval to process queue await new Promise((resolve) => setTimeout(resolve, 150)) // Message should now be delivered console.log('Queue size after retry:', bus.getRetryQueue().size()) // 0 // Access the inner transport for assertions const memTransport = chaosTransport.getInnerTransport() console.log('Received messages:', memTransport.receivedMessages) await bus.disconnect() ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.