### Run Producer Example Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Execute a producer example using Node.js to verify the successful installation and build of the Pulsar Node.js client. A running Pulsar server is required. ```shell node examples/producer ``` -------------------------------- ### Basic Logging Setup Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/logging.md Demonstrates how to configure the Pulsar client with a basic log handler that logs messages to the console. Ensure the 'pulsar-client' package is installed. ```javascript const Pulsar = require('pulsar-client'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', logLevel: Pulsar.LogLevel.INFO, log: (level, file, line, message) => { const levelName = Pulsar.LogLevel.toString(level); console.log(`[${levelName}][${file}:${line}] ${message}`); } }); // ... use client ... ``` -------------------------------- ### Complete Encryption/Decryption Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md This example shows how to set up a producer to encrypt messages and a consumer to decrypt them. It includes configuration for encryption keys and crypto failure actions for both producer and consumer. Ensure you have RSA keys generated and placed in the './keys/' directory. ```javascript const Pulsar = require('pulsar-client'); const fs = require('fs'); // Generate RSA keys (for example purposes) // In production, use proper key management const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', operationTimeoutSeconds: 30 }); // Producer with encryption (async () => { console.log('=== PRODUCER ==='); const producer = await client.createProducer({ topic: 'persistent://public/default/secure-messages', encryptionKey: 'my-key', publicKeyPath: './keys/public.pem', cryptoFailureAction: 'FAIL' }); for (let i = 0; i < 5; i++) { const message = `Confidential message ${i}`; await producer.send({ data: Buffer.from(message), properties: { 'message-index': i.toString() } }); console.log(`Sent: ${message}`); } await producer.flush(); await producer.close(); console.log('\n=== CONSUMER ==='); // Consumer with decryption const consumer = await client.subscribe({ topic: 'persistent://public/default/secure-messages', subscription: 'secure-consumer', privateKeyPath: './keys/private.pem', cryptoFailureAction: 'FAIL' }); for (let i = 0; i < 5; i++) { const message = await consumer.receive(); const data = message.getData().toString(); const props = message.getProperties(); const context = message.getEncryptionContext(); console.log(`Received: ${data}`); if (context) { console.log(` - Encrypted with: ${context.algorithm}`); console.log(` - Keys: ${context.keys.map(k => k.key).join(', ')}`); } await consumer.acknowledge(message); } await consumer.close(); await client.close(); })(); ``` -------------------------------- ### Pulsar End-to-End Example Source: https://github.com/apache/pulsar-client-node/blob/master/README.md A simple example demonstrating creating a Pulsar client, producer, and consumer, sending a message, receiving it, and acknowledging it. Ensure a Pulsar server is running locally. ```javascript const Pulsar = require('pulsar-client'); (async () => { // Create a client const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); // Create a producer const producer = await client.createProducer({ topic: 'persistent://public/default/my-topic', }); // Create a consumer const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'sub1' }); // Send a message producer.send({ data: Buffer.from("hello") }); // Receive the message const msg = await consumer.receive(); console.log(msg.getData().toString()); consumer.acknowledge(msg); await producer.close(); await consumer.close(); await client.close(); })(); ``` -------------------------------- ### Full Consumer Decryption Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md This example demonstrates the complete process of creating a Pulsar client, subscribing to a topic with decryption enabled, receiving, decrypting, and acknowledging messages. It includes necessary imports and client/consumer lifecycle management. ```javascript const Pulsar = require('pulsar-client'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); // Create a consumer that can decrypt messages const consumer = await client.subscribe({ topic: 'persistent://public/default/encrypted-topic', subscription: 'decrypted-consumer', privateKeyPath: './keys/private-key.pem', cryptoFailureAction: 'FAIL' }); // Receive and decrypt messages for (let i = 0; i < 10; i++) { const message = await consumer.receive(); const decryptedData = message.getData().toString(); console.log(`Received decrypted message: ${decryptedData}`); await consumer.acknowledge(message); } await consumer.close(); await client.close(); ``` -------------------------------- ### Build Pulsar Node.js Addon Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Install the Node.js dependencies and build the C++ addon after the C++ client has been installed. This command is run from the root of the cloned repository. ```shell npm install ``` -------------------------------- ### Install Pulsar Node.js Client Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Install the Pulsar Node.js client library using npm or yarn. This is the first step to using Pulsar in your Node.js project. ```shell npm install pulsar-client ``` ```shell yarn add pulsar-client ``` -------------------------------- ### Global Logging Setup Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/logging.md Demonstrates setting a global log handler that will be used by all subsequently created Pulsar clients. ```javascript Pulsar.Client.setLogHandler((level, file, line, message) => { // Handler for all clients }); // Both clients will use the global handler const client1 = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', logLevel: Pulsar.LogLevel.DEBUG }); const client2 = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6651', logLevel: Pulsar.LogLevel.INFO }); ``` -------------------------------- ### Full MessageId Serialization and Deserialization Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/messageid.md Demonstrates the complete workflow of receiving a message, serializing its MessageId, storing it, and later deserializing it to seek to that specific position in a topic. This example requires setting up a Pulsar client, subscribing to a topic, and using file system operations to save and load the serialized MessageId. ```javascript const Pulsar = require('pulsar-client'); const fs = require('fs'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-sub' }); // Receive a message const message = await consumer.receive(); const messageId = message.getMessageId(); console.log(`Received message ID: ${messageId.toString()}`); // Serialize and store const serialized = messageId.serialize(); fs.writeFileSync('last_processed.bin', serialized); await consumer.acknowledge(message); await consumer.close(); // Later, restore from saved ID const savedBuffer = fs.readFileSync('last_processed.bin'); const restoredId = Pulsar.MessageId.deserialize(savedBuffer); const consumer2 = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-sub' }); // Seek to the saved position await consumer2.seek(restoredId); const nextMessage = await consumer2.receive(); console.log(`Next message after saved ID: ${nextMessage.getMessageId().toString()}`); await consumer2.close(); await client.close(); ``` -------------------------------- ### Generate API Documentation Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Install dependencies and generate API documentation using TypeDoc. The documentation will be placed in the './apidocs' directory. ```shell npm install npx typedoc ``` -------------------------------- ### Initialize Pulsar Client, Producer, and Consumer Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/README.md This snippet demonstrates the basic setup for connecting to a Pulsar service, creating a producer for a topic, and subscribing a consumer to the same topic. It also shows how to configure token-based authentication. ```javascript const Pulsar = require('pulsar-client'); // Create client const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); // Create producer const producer = await client.createProducer({ topic: 'persistent://public/default/my-topic' }); // Create consumer const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-subscription' }); // Use authentication const auth = new Pulsar.AuthenticationToken({ token: 'my-token' }); ``` -------------------------------- ### Complete Pulsar Node.js Reader Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/reader.md Demonstrates how to create, read messages from, and close a Pulsar reader. Ensure the Pulsar client and reader are properly initialized before use. ```javascript const Pulsar = require('pulsar-client'); (async () => { const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.earliest() }); let messageCount = 0; while (reader.hasNext() && messageCount < 100) { const message = await reader.readNext(5000); console.log(`Message ${messageCount}: ${message.getData().toString()}`); messageCount++; } await reader.close(); await client.close(); })(); ``` -------------------------------- ### Producer Encryption Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md Demonstrates creating an encrypted producer and sending encrypted messages using the Pulsar Node.js client. Ensure keys are available at the specified path. ```javascript const Pulsar = require('pulsar-client'); const fs = require('fs'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); // Create an encrypted producer const producer = await client.createProducer({ topic: 'persistent://public/default/encrypted-topic', encryptionKey: 'encryption-key-1', publicKeyPath: './keys/public-key.pem', cryptoFailureAction: 'FAIL' }); // Send encrypted messages for (let i = 0; i < 10; i++) { const message = `Secret message ${i}`; await producer.send({ data: Buffer.from(message) }); console.log(`Sent encrypted message: ${message}`); } await producer.flush(); await producer.close(); await client.close(); ``` -------------------------------- ### Get Partitions for a Pulsar Topic Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/client.md Retrieve the partition information for a given topic. This is useful for understanding the topic's structure. ```javascript const partitions = await client.getPartitionsForTopic( 'persistent://public/default/my-topic' ); console.log(`Topic has ${partitions.length} partitions`); ``` -------------------------------- ### Consumer Subscription Position Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Configure the initial position for a consumer's subscription. Can start from the 'Earliest' or 'Latest' available message. ```javascript const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-subscription', subscriptionInitialPosition: 'Earliest' }); ``` -------------------------------- ### TLS Authentication Example Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/authentication.md Use TLS certificate-based authentication by providing paths to your client certificate and private key. Ensure your Pulsar service URL uses SSL (e.g., pulsar+ssl://). ```javascript const Pulsar = require('pulsar-client'); const auth = new Pulsar.AuthenticationTls({ certificatePath: '/path/to/client.crt', privateKeyPath: '/path/to/client.key' }); const client = new Pulsar.Client({ serviceUrl: 'pulsar+ssl://localhost:6651', authentication: auth, tlsTrustCertsFilePath: '/path/to/ca.crt' }); ``` -------------------------------- ### Seek to Earliest Message and Read Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/messageid.md Demonstrates how to create a reader, start from the earliest message, read a few messages, save a specific message ID, continue reading, seek back to the saved message ID, and read from that point. ```javascript const Pulsar = require('pulsar-client'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.earliest() }); let message; let targetMessageId = null; // Read messages and save the 5th message ID for (let i = 0; i < 5; i++) { message = await reader.readNext(); console.log(`Message ${i}: ${message.getMessageId().toString()}`); if (i === 4) { targetMessageId = message.getMessageId(); } } // Continue reading console.log('\nContinuing to read...'); for (let i = 0; i < 5; i++) { message = await reader.readNext(); console.log(`Message ${i + 5}: ${message.getMessageId().toString()}`); } // Seek back to the saved message console.log('\nSeeking back to message ID:', targetMessageId.toString()); await reader.seek(targetMessageId); message = await reader.readNext(); console.log(`Back at message: ${message.getMessageId().toString()}`); await reader.close(); await client.close(); ``` -------------------------------- ### Pulsar Client with Cluster Failover Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/authentication.md Example of initializing a Pulsar client with automatic cluster failover enabled. This configuration specifies primary and secondary clusters, along with failover thresholds and check intervals. ```javascript const client = new Pulsar.Client({ serviceUrlProvider: { primary: { serviceUrl: 'pulsar://primary-cluster:6650', authentication: new Pulsar.AuthenticationToken({ token: 'primary-token' }) }, secondary: [ { serviceUrl: 'pulsar://secondary-cluster-1:6650', authentication: new Pulsar.AuthenticationToken({ token: 'secondary-token-1' }) }, { serviceUrl: 'pulsar://secondary-cluster-2:6650', authentication: new Pulsar.AuthenticationToken({ token: 'secondary-token-2' }) } ], checkIntervalMs: 30000, failoverThreshold: 3, switchBackThreshold: 5 } }); ``` -------------------------------- ### Configure Basic Pulsar Reader Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Use this snippet to create a basic reader instance. Specify the topic, starting message ID, and an optional reader name for identification. ```javascript const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.earliest(), readerName: 'my-reader' }); ``` -------------------------------- ### Athenz Authentication from Config File Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/authentication.md Initialize Athenz authentication by providing the path to a configuration file. This is a convenient way to manage authentication settings. ```javascript const auth = new Pulsar.AuthenticationAthenz('/path/to/athenz.conf'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', authentication: auth }); ``` -------------------------------- ### getTopic() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Gets the topic name that this producer sends messages to. ```APIDOC ## getTopic() ### Description Gets the topic name that this producer sends messages to. ### Method ```javascript getTopic(): string ``` ### Returns The topic name as a string. ### Examples ```javascript const topic = producer.getTopic(); console.log(`Sending to topic: ${topic}`); ``` ``` -------------------------------- ### Prepare Staging Directory in SVN Source: https://github.com/apache/pulsar-client-node/blob/master/docs/release-process.md Check out the dev repository, create a directory for the release candidate artifacts, and navigate into it. Increment '-rc.1' if multiple iterations are needed. ```sh $ svn co https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-node pulsar-dist-dev $ cd pulsar-dist-dev # '-rc.1' needs to be incremented in case of multiple iterations in getting # to the final release) $ svn mkdir pulsar-client-node-1.X.0-rc.1 $ cd pulsar-client-node-1.X.0-rc.1 ``` -------------------------------- ### Get Producer Topic Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Retrieves the name of the topic to which the producer is sending messages. ```javascript const topic = producer.getTopic(); console.log(`Sending to topic: ${topic}`); ``` -------------------------------- ### getProducerName() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Gets the logical name of the producer. Useful for identifying the producer in logs and metrics. ```APIDOC ## getProducerName() ### Description Gets the logical name of the producer. Useful for identifying the producer in logs and metrics. ### Method ```javascript getProducerName(): string ``` ### Returns The producer name as a string. ### Examples ```javascript const name = producer.getProducerName(); console.log(`Producer name: ${name}`); ``` ``` -------------------------------- ### Basic Consumer Configuration Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Set up a basic consumer with topic, subscription, consumer name, and subscription type. Supports Exclusive, Shared, Failover, and KeyShared subscription modes. ```javascript const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-subscription', consumerName: 'my-consumer', subscriptionType: 'Shared' }); ``` -------------------------------- ### InitialPosition Type Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/types.md Defines the starting point for a consumer or reader. Used in ConsumerConfig and ReaderConfig. ```typescript type InitialPosition = 'Latest' | 'Earliest' ``` -------------------------------- ### Get Message Topic Name Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieves the topic name from which a message was received. This is useful for routing or logging. ```javascript const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-sub' }); const message = await consumer.receive(); console.log(`Message from topic: ${message.getTopicName()}`); ``` -------------------------------- ### Get Message Data Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieves the message payload as a Node.js Buffer. The data can be converted to a string or parsed as JSON. ```javascript const message = await consumer.receive(); const data = message.getData(); console.log(data.toString()); // Parse JSON payload const jsonData = JSON.parse(data.toString()); console.log(jsonData); ``` -------------------------------- ### Get Producer Name Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Retrieves the logical name assigned to the producer. This is helpful for identification in logs and monitoring systems. ```javascript const name = producer.getProducerName(); console.log(`Producer name: ${name}`); ``` -------------------------------- ### getReplicatedFrom() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieves the name of the cluster from which the message was replicated. This is useful for understanding message lineage in a multi-cluster Pulsar setup. ```APIDOC ## getReplicatedFrom() ### Description Gets the name of the cluster that this message was replicated from. ### Returns The cluster name as a string, or an empty string if the message was not replicated. ### Examples ```javascript const message = await consumer.receive(); if (message.isReplicated()) { const sourceCluster = message.getReplicatedFrom(); console.log(`Replicated from cluster: ${sourceCluster}`); } ``` ``` -------------------------------- ### Create SchemaInfo from Protobuf Root Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/schema.md Use `createSchemaInfoFromRoot` to create a `SchemaInfo` object from a protobufjs Root object. This `SchemaInfo` can then be used with producers and consumers. ```javascript const Pulsar = require('pulsar-client'); const protoJson = { nested: { com: { example: { User: { fields: { name: { type: 'string', id: 1 }, age: { type: 'int32', id: 2 } } } } } } }; const root = Pulsar.ProtobufNativeSchema.createRootFromJson(protoJson); const schemaInfo = Pulsar.ProtobufNativeSchema.createSchemaInfoFromRoot({ root, rootMessageTypeName: 'com.example.User', rootFileDescriptorName: 'User.proto', schemaType: 'ProtobufNative', syntax: 'proto3', name: 'User' }); const producer = await client.createProducer({ topic: 'persistent://public/default/user-topic', schema: schemaInfo }); ``` -------------------------------- ### subscribe() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/client.md Creates a new consumer for subscribing to a topic. It takes a ConsumerConfig object and returns a Promise that resolves to a Consumer instance. ```APIDOC ## subscribe() ### Description Creates a new consumer for subscribing to a topic. ### Method ```javascript subscribe(config: ConsumerConfig): Promise ``` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **config** (ConsumerConfig) - Required - Consumer configuration ### Request Example ```javascript const consumer = await client.subscribe({ topic: 'persistent://public/default/my-topic', subscription: 'my-subscription', subscriptionType: 'Shared', ackTimeoutMs: 10000 }); const message = await consumer.receive(); console.log(message.getData().toString()); consumer.acknowledge(message); await consumer.close(); ``` ### Response #### Success Response (200) - **Consumer** - A Consumer instance. #### Response Example (Consumer instance details) ### Throws - When topic doesn't exist, authentication fails, or configuration is invalid. ``` -------------------------------- ### Get Message Producer Name Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieve the name of the producer that sent the received message. This is useful for tracking message origins. ```javascript const message = await consumer.receive(); const producerName = message.getProducerName(); console.log(`Produced by: ${producerName}`); ``` -------------------------------- ### Initialize Pulsar Client Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/README.md Instantiate a new Pulsar client with specified service URL, timeouts, authentication, and log level. Ensure the 'authentication' object is properly configured for your chosen method. ```javascript const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', operationTimeoutSeconds: 30, authentication: authObject, logLevel: Pulsar.LogLevel.DEBUG }); ``` -------------------------------- ### Message Sending with Partition and Ordering Keys Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Sends a message specifying both a partition key for routing and an ordering key for message sequence guarantees within a partition. ```javascript await producer.send({ data: Buffer.from('Order details'), partitionKey: 'user-123', orderingKey: 'user-123' }); ``` -------------------------------- ### Configure Consumer with Private Key Path Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md Use this method to configure a consumer by providing the path to the private key file for decryption. ```javascript const consumer = await client.subscribe({ topic: 'persistent://public/default/encrypted-topic', subscription: 'decrypted-consumer', privateKeyPath: '/path/to/private-key.pem' }); ``` -------------------------------- ### MessageId.latest() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/messageid.md Creates a MessageId representing the latest message in a topic. This is useful for initializing readers or consumers to start from the most recent message. ```APIDOC ## MessageId.latest() ### Description Creates a MessageId representing the latest message in a topic. ### Method `static latest(): MessageId` ### Returns A `MessageId` object pointing to the latest message. ### Examples ```javascript const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.latest() }); const message = await reader.readNext(); ``` ``` -------------------------------- ### Configure Producer with Multiple Encryption Keys Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md Use `encryptionKeys` and `publicKeyPath` to configure a producer with multiple encryption keys. ```javascript const producer = await client.createProducer({ topic: 'persistent://public/default/encrypted-topic', encryptionKeys: ['key1', 'key2', 'key3'], publicKeyPath: '/path/to/public-key.pem' }); ``` -------------------------------- ### Consumer Configuration Options Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/consumer.md This section details the configuration options available for the `client.subscribe()` method. These options allow for fine-grained control over how a consumer interacts with Pulsar topics, including subscription details, message handling, and security. ```APIDOC ## Consumer Configuration Reference Complete configuration options for `client.subscribe()`: | Option | Type | Required | Default | Description | |--------|------|----------|---------|-------------| | topic | `string` | Conditional* | — | Single topic to subscribe to | | topics | `string[]` | Conditional* | — | Multiple topics to subscribe to | | topicsPattern | `string` | Conditional* | — | Topic pattern (regex) for dynamic subscriptions | | subscription | `string` | Yes | — | Subscription name | | subscriptionType | `SubscriptionType` | No | `'Exclusive'` | Subscription type: `'Exclusive'`, `'Shared'`, `'KeyShared'`, or `'Failover'` | | subscriptionInitialPosition | `InitialPosition` | No | `'Latest'` | Initial position: `'Latest'` or `'Earliest'` | | ackTimeoutMs | `number` | No | — | Timeout for acknowledgment in milliseconds | | nAckRedeliverTimeoutMs | `number` | No | — | Timeout for redelivering negatively acknowledged messages | | receiverQueueSize | `number` | No | 1000 | Local queue size for this consumer | | receiverQueueSizeAcrossPartitions | `number` | No | — | Maximum queue size across all partitions | | consumerName | `string` | No | — | Custom consumer name | | properties | `{ [key: string]: string }` | No | — | Custom consumer properties | | listener | `(message: Message, consumer: Consumer) => void` | No | — | Message listener callback function | | readCompacted | `boolean` | No | `false` | Read compacted topics | | privateKeyPath | `string` | No | — | Path to private key for message decryption | | cryptoKeyReader | `CryptoKeyReader` | No | — | Custom crypto key reader for decryption | | cryptoFailureAction | `ConsumerCryptoFailureAction` | No | `'CONSUME'` | Action on crypto failure: `'FAIL'`, `'DISCARD'`, or `'CONSUME'` | | maxPendingChunkedMessage | `number` | No | — | Maximum pending chunked messages | | autoAckOldestChunkedMessageOnQueueFull | `number` | No | — | Auto-acknowledge oldest chunked message on queue full | | schema | `SchemaInfo` | No | — | Schema for message validation | | batchIndexAckEnabled | `boolean` | No | `false` | Enable batch index acknowledgment | | regexSubscriptionMode | `RegexSubscriptionMode` | No | `'PersistentOnly'` | Regex mode: `'PersistentOnly'`, `'NonPersistentOnly'`, or `'AllTopics'` | | deadLetterPolicy | `DeadLetterPolicy` | No | — | Dead letter policy for failed messages | | batchReceivePolicy | `ConsumerBatchReceivePolicy` | No | — | Policy for batch receive operations | | keySharedPolicy | `KeySharedPolicy` | No | — | Policy for KeyShared subscription mode | | replicateSubscriptionState | `boolean` | No | — | Replicate subscription state across clusters | *At least one of `topic`, `topics`, or `topicsPattern` is required. ``` -------------------------------- ### MessageId.earliest() Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/messageid.md Creates a MessageId representing the earliest message in a topic. This is useful for initializing readers or consumers to start from the beginning of a topic. ```APIDOC ## MessageId.earliest() ### Description Creates a MessageId representing the earliest message in a topic. ### Method `static earliest(): MessageId` ### Returns A `MessageId` object pointing to the earliest message. ### Examples ```javascript const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.earliest() }); const message = await reader.readNext(); ``` ``` -------------------------------- ### Get Encryption Context Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieve the encryption context for a message if it was encrypted. This provides details about the encryption algorithm, keys, and decryption status. ```javascript const message = await consumer.receive(); const encryptionContext = message.getEncryptionContext(); if (encryptionContext) { console.log(`Message was encrypted with algorithm: ${encryptionContext.algorithm}`); console.log(`Encryption keys: ${encryptionContext.keys.map(k => k.key).join(', ')}`); if (encryptionContext.isDecryptionFailed) { console.error('Failed to decrypt message'); } } ``` -------------------------------- ### Configure Cluster Failover Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Set up primary and secondary cluster configurations for failover. This allows the client to automatically switch to a backup cluster if the primary becomes unavailable. Configure health check intervals and thresholds for failover and switch-back. ```javascript const client = new Pulsar.Client({ serviceUrlProvider: { primary: { serviceUrl: 'pulsar://primary:6650', authentication: new Pulsar.AuthenticationToken({ token: 'token1' }) }, secondary: [ { serviceUrl: 'pulsar://secondary1:6650', authentication: new Pulsar.AuthenticationToken({ token: 'token2' }) } ], checkIntervalMs: 30000, failoverThreshold: 3, switchBackThreshold: 5 } }); ``` -------------------------------- ### Get Replicated From Cluster Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Use this method to retrieve the name of the cluster from which a message was replicated. Check if the message is replicated before calling this method. ```javascript const message = await consumer.receive(); if (message.isReplicated()) { const sourceCluster = message.getReplicatedFrom(); console.log(`Replicated from cluster: ${sourceCluster}`); } ``` -------------------------------- ### Download Pulsar C++ Client (Windows) Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Download the Pulsar C++ client, a dependency for building the Node.js addon. This batch script is for Windows systems. ```shell pkg\windows\download-cpp-client.bat ``` -------------------------------- ### Basic Producer Configuration Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Configure essential producer settings like topic and send timeout. The producer name is auto-generated if not provided. ```javascript const producer = await client.createProducer({ topic: 'persistent://public/default/my-topic', producerName: 'my-producer', sendTimeoutMs: 30000 }); ``` -------------------------------- ### Get Message Publish Timestamp Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieve the timestamp when a message was published to the Pulsar broker. This is useful for tracking message latency and order. ```javascript const message = await consumer.receive(); const publishTime = message.getPublishTimestamp(); console.log(`Published at: ${new Date(publishTime).toISOString()}`); ``` -------------------------------- ### Download Pulsar C++ Client (Linux/macOS) Source: https://github.com/apache/pulsar-client-node/blob/master/README.md Download the Pulsar C++ client, a dependency for building the Node.js addon. This script is for Linux and macOS systems. ```shell pkg/linux/download-cpp-client.sh ``` ```shell pkg/mac/download-cpp-client.sh ``` -------------------------------- ### Get and Acknowledge Message ID Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieves the unique identifier for a message, which can then be used for acknowledging the message. This ensures the message is processed correctly. ```javascript const message = await consumer.receive(); const messageId = message.getMessageId(); console.log(`Message ID: ${messageId.toString()}`); await consumer.acknowledgeId(messageId); ``` -------------------------------- ### Advanced Client Configuration Options Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Configure advanced client settings such as statistics interval, listener name, and concurrent lookup requests. ```javascript const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', statsIntervalInSeconds: 60, listenerName: 'public', concurrentLookupRequest: 100 }); ``` -------------------------------- ### ReaderConfig Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/types.md Configuration object for creating a reader. This is used when calling `Client.createReader()` to specify topic, starting message ID, and other reader-specific settings. ```APIDOC ## ReaderConfig ### Description Configuration object for creating a reader. ### Fields - **topic** (string) - Required - The topic to read from. - **startMessageId** (MessageId) - Required - The message ID to start reading from. - **receiverQueueSize** (number) - Optional - The size of the receiver queue. - **readerName** (string) - Optional - The name of the reader. - **subscriptionRolePrefix** (string) - Optional - The subscription role prefix. - **readCompacted** (boolean) - Optional - Whether to read compacted topics. - **listener** (function(message: Message, reader: Reader) => void) - Optional - A callback function to process incoming messages. - **privateKeyPath** (string) - Optional - Path to the private key for encryption. - **cryptoFailureAction** (ConsumerCryptoFailureAction) - Optional - Action to take on crypto failure. ``` -------------------------------- ### Producer Configuration Options Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/producer.md Configuration options for `client.createProducer()`. ```APIDOC ## Producer Configuration This section details the configuration options available when creating a producer using the `client.createProducer()` method. ### Parameters #### Producer Configuration Object - **topic** (`string`) - Required - The name of the topic to produce messages to (e.g., `persistent://public/default/my-topic`). - **producerName** (`string`) - Optional - A custom name for the producer. - **sendTimeoutMs** (`number`) - Optional - The timeout in milliseconds for send operations. Defaults to 30000. - **initialSequenceId** (`number`) - Optional - The initial sequence ID for messages sent by this producer. - **maxPendingMessages** (`number`) - Optional - The maximum number of messages that can be pending for a single partition. Defaults to 1000. - **maxPendingMessagesAcrossPartitions** (`number`) - Optional - The maximum number of pending messages across all partitions. Defaults to 50000. - **blockIfQueueFull** (`boolean`) - Optional - If true, send calls will block when the message queue is full. Defaults to `false`. - **messageRoutingMode** (`MessageRoutingMode`) - Optional - The message routing mode. Possible values are `'UseSinglePartition'`, `'RoundRobinDistribution'`, or `'CustomPartition'`. Defaults to `'RoundRobinDistribution'`. - **hashingScheme** (`HashingScheme`) - Optional - The hashing scheme to use for partitioning. Possible values are `'Murmur3_32Hash'`, `'BoostHash'`, or `'JavaStringHash'`. Defaults to `'Murmur3_32Hash'`. - **compressionType** (`CompressionType`) - Optional - The compression type for messages. Possible values are `'None'`, `'Zlib'`, `'LZ4'`, `'ZSTD'`, or `'SNAPPY'`. Defaults to `'None'`. - **batchingEnabled** (`boolean`) - Optional - Enables message batching. Defaults to `true`. - **batchingMaxPublishDelayMs** (`number`) - Optional - The maximum delay in milliseconds for publishing batches. Defaults to 10. - **batchingMaxMessages** (`number`) - Optional - The maximum number of messages allowed in a batch. Defaults to 1000. - **batchingType** (`ProducerBatchType`) - Optional - The type of batching to use. Possible values are `'DefaultBatching'` or `'KeyBasedBatching'`. Defaults to `'DefaultBatching'`. - **batchingMaxAllowedSizeInBytes** (`number`) - Optional - The maximum allowed size in bytes for a batch. - **properties** (`{ [key: string]: string }`) - Optional - Custom key-value properties for the producer. - **publicKeyPath** (`string`) - Optional - The path to the public key file for message encryption. - **encryptionKey** (`string`) - Optional - A single encryption key name to use for message encryption. - **encryptionKeys** (`string[]`) - Optional - An array of encryption key names to use for message encryption. - **cryptoKeyReader** (`CryptoKeyReader`) - Optional - A custom crypto key reader implementation for encryption. - **cryptoFailureAction** (`ProducerCryptoFailureAction`) - Optional - The action to take on crypto failure. Possible values are `'FAIL'` or `'SEND'`. Defaults to `'FAIL'`. - **chunkingEnabled** (`boolean`) - Optional - Enables message chunking for large messages. Defaults to `false`. - **schema** (`SchemaInfo`) - Optional - The schema information for message validation. - **accessMode** (`ProducerAccessMode`) - Optional - The producer access mode. Possible values are `'Shared'`, `'Exclusive'`, `'WaitForExclusive'`, or `'ExclusiveWithFencing'`. Defaults to `'Shared'`. - **messageRouter** (`MessageRouter`) - Optional - A custom message router function for routing messages to partitions. ``` -------------------------------- ### Get Message Properties Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieves all custom properties associated with a message. These properties can be used for metadata like content type or user ID. ```javascript const message = await consumer.receive(); const props = message.getProperties(); console.log(`Content-Type: ${props['content-type']}`); console.log(`User-ID: ${props['user-id']}`); ``` -------------------------------- ### Receive a single message with timeout Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/consumer.md This example demonstrates how to use the `receive` method with a timeout. If no message is received within the specified duration, an error will be caught. ```javascript try { const message = await consumer.receive(5000); console.log(message.getData().toString()); } catch (error) { console.log('No message received within timeout'); } ``` -------------------------------- ### Client Constructor Logging Configuration Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/logging.md Configure logging directly when initializing a new Pulsar client instance by providing logLevel and a custom log handler. ```APIDOC ## Logging Configuration ### Via Client Constructor Set logging when creating a client: ```javascript const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', logLevel: Pulsar.LogLevel.DEBUG, log: (level, file, line, message) => { const levelName = Pulsar.LogLevel.toString(level); console.log(`[${levelName}] ${file}:${line} - ${message}`); } }); ``` ``` -------------------------------- ### Get Message Event Timestamp Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Retrieve the event timestamp associated with a message, typically set by the producer. Use this to determine when the original event occurred. ```javascript const message = await consumer.receive(); const eventTime = message.getEventTimestamp(); if (eventTime > 0) { console.log(`Event occurred at: ${new Date(eventTime).toISOString()}`); } ``` -------------------------------- ### Update Package Version and Create Candidate Tag Source: https://github.com/apache/pulsar-client-node/blob/master/docs/release-process.md Bump the package version and create a candidate tag for the release. Ensure GPG is configured for signing. ```sh # Bump to the release version (the suffix "-rc.0" is removed) $ npm version patch --no-git-tag-version $ git add . $ git commit -m 'Release v1.x.0' # Create a "candidate" tag $ export GPG_TTY=$(tty) $ git tag -u $USER@apache.org v1.X.0-rc.1 -m 'Release v1.X.0-rc.1' # Push both the branch and the tag to GitHub repo $ git push origin branch-1.X $ git push origin v1.X.0-rc.1 ``` -------------------------------- ### Configure TLS/SSL Client Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/README.md Initialize the Pulsar client with TLS/SSL security enabled for secure connections. ```javascript const client = new Pulsar.Client({ serviceUrl: 'pulsar+ssl://localhost:6651', tlsTrustCertsFilePath: '/path/to/ca.crt', tlsCertificateFilePath: '/path/to/client.crt', tlsPrivateKeyFilePath: '/path/to/client.key' }); ``` -------------------------------- ### Get Message Redelivery Count Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/message.md Check the number of times a message has been redelivered. This is crucial for implementing retry logic or routing messages to a dead-letter queue after excessive redeliveries. ```javascript const message = await consumer.receive(); const redeliveryCount = message.getRedeliveryCount(); if (redeliveryCount > 3) { console.log('Message has been redelivered multiple times'); // Consider sending to dead letter queue consumer.negativeAcknowledge(message); } else { await consumer.acknowledge(message); } ``` -------------------------------- ### Consumer Creation with Fallback Topic Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/errors.md Attempt to create a consumer for a primary topic, falling back to a secondary topic if the primary fails. Logs warnings and errors for each step. ```javascript async function createConsumerWithFallback(client, primaryTopic, fallbackTopic) { try { console.log(`Creating consumer for primary topic: ${primaryTopic}`); return await client.subscribe({ topic: primaryTopic, subscription: 'my-subscription' }); } catch (error) { console.warn(`Failed to create consumer for primary topic: ${error.message}`); console.log(`Falling back to topic: ${fallbackTopic}`); try { return await client.subscribe({ topic: fallbackTopic, subscription: 'my-subscription' }); } catch (fallbackError) { console.error(`Fallback also failed: ${fallbackError.message}`); throw fallbackError; } } } ``` -------------------------------- ### Reader Configuration Interface Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/types.md Defines the structure for configuring a Pulsar reader, including topic, start message ID, and optional settings like queue size and listener. ```typescript interface ReaderConfig { topic: string startMessageId: MessageId receiverQueueSize?: number readerName?: string subscriptionRolePrefix?: string readCompacted?: boolean listener?: (message: Message, reader: Reader) => void privateKeyPath?: string cryptoFailureAction?: ConsumerCryptoFailureAction } ``` -------------------------------- ### Client Class Methods Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/README.md The main Client class provides methods for interacting with Pulsar, including creating producers and consumers, managing topics, and closing the connection. ```APIDOC ## Client Class ### Description Represents the main client for connecting to an Apache Pulsar cluster. ### Methods #### createProducer(topic, options) Creates a new producer for the specified topic. #### subscribe(topic, subscription, options) Subscribes to a topic with a given subscription name. #### createReader(topic, startMessageId, options) Creates a new reader for a topic, starting from a specific message ID. #### getPartitionsForTopic(topic) Retrieves the partition information for a given topic. #### close() Closes the client connection and releases all resources. #### setLogHandler(handler) Sets a custom log handler for the client. This is a static method. ``` -------------------------------- ### Create a Pulsar Reader Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/client.md Use this snippet to create a new reader for a topic. Ensure you provide a valid topic name and optionally specify a starting message ID. ```javascript const reader = await client.createReader({ topic: 'persistent://public/default/my-topic', startMessageId: Pulsar.MessageId.earliest() }); const message = await reader.readNext(); console.log(message.getData().toString()); await reader.close(); ``` -------------------------------- ### Basic Client Connection Configuration Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/configuration.md Configure essential connection parameters like service URL, operation timeout, and thread counts for the Pulsar client. ```javascript const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', operationTimeoutSeconds: 30, connectionTimeoutMs: 10000, ioThreads: 1, messageListenerThreads: 1, concurrentLookupRequest: 50 }); ``` -------------------------------- ### Configure Producer with Custom CryptoKeyReader Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/encryption.md Implement a custom `MyKeyReader` class to handle public and private key loading for encryption. ```javascript class MyKeyReader { getPublicKey(keyName, metadata) { // Load public key from file or service const keyData = fs.readFileSync(`/keys/${keyName}.pub`); return { key: keyData, metadata: metadata }; } getPrivateKey(keyName, metadata) { // Load private key from file or service const keyData = fs.readFileSync(`/keys/${keyName}.priv`); return { key: keyData, metadata: metadata }; } } const producer = await client.createProducer({ topic: 'persistent://public/default/encrypted-topic', encryptionKey: 'my-key', cryptoKeyReader: new MyKeyReader() }); ``` -------------------------------- ### Key Share Sticky Range Definition Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/types.md Defines a range for sticky key sharing. Specify the start and end of the inclusive range for message distribution based on key hashes. ```typescript interface ConsumerKeyShareStickyRange { start: number end: number } ``` -------------------------------- ### Create Basic Pulsar Client Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/client.md Instantiate a new Pulsar client with a specified service URL. This is the most basic way to connect to a Pulsar cluster. ```javascript const Pulsar = require('pulsar-client'); const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650' }); ``` -------------------------------- ### Seek Consumer Position Source: https://github.com/apache/pulsar-client-node/blob/master/_autodocs/api-reference/consumer.md Seeks the consumer to a specific message ID. The next receive() call will retrieve messages at or after this position. Examples show seeking to the earliest message and a specific message ID. ```javascript const message = await consumer.receive(); const messageId = message.getMessageId(); // Seek to earliest message await consumer.seek(Pulsar.MessageId.earliest()); // Seek to a specific message await consumer.seek(messageId); ```