### Install Dependencies with npm Source: https://github.com/googleapis/nodejs-pubsub/blob/main/CONTRIBUTING.md Run this command to install all necessary project dependencies. ```bash npm install ``` -------------------------------- ### Running the Benchwrapper Source: https://github.com/googleapis/nodejs-pubsub/blob/main/bin/README.md Follow these steps to install dependencies, set the emulator host, and run the benchwrapper with a specified port. ```bash cd nodejs-pubsub npm install export PUBSUB_EMULATOR_HOST=localhost:8080 npm run benchwrapper -- --port 50051 ``` -------------------------------- ### Install Pub/Sub Client Library Source: https://github.com/googleapis/nodejs-pubsub/blob/main/README.md Install the Google Cloud Pub/Sub client library for Node.js using npm. ```bash npm install @google-cloud/pubsub ``` -------------------------------- ### Pub/Sub Quickstart: Create Topic, Subscription, Publish, and Receive Messages Source: https://github.com/googleapis/nodejs-pubsub/blob/main/README.md Instantiates the Pub/Sub client, creates a topic and subscription, sends a message, and sets up a listener for incoming messages. Ensure you have a Google Cloud project, billing enabled, and the Pub/Sub API enabled. Authentication should also be configured. ```javascript // Imports the Google Cloud client library const {PubSub} = require('@google-cloud/pubsub'); async function quickstart( projectId = 'your-project-id', // Your Google Cloud Platform project ID topicNameOrId = 'my-topic', // Name for the new topic to create subscriptionName = 'my-sub', // Name for the new subscription to create ) { // Instantiates a client const pubsub = new PubSub({projectId}); // Creates a new topic const [topic] = await pubsub.createTopic(topicNameOrId); console.log(`Topic ${topic.name} created.`); // Creates a subscription on that new topic const [subscription] = await topic.createSubscription(subscriptionName); // Receive callbacks for new messages on the subscription subscription.on('message', message => { console.log('Received message:', message.data.toString()); process.exit(0); }); // Receive callbacks for errors on the subscription subscription.on('error', error => { console.error('Received error:', error); process.exit(1); }); // Send a message to the topic await topic.publishMessage({data: Buffer.from('Test message!')}); } ``` -------------------------------- ### Configure Pub/Sub Client with gRPC Transport Source: https://github.com/googleapis/nodejs-pubsub/blob/main/README.md Instantiate the Pub/Sub client with an alternative 'grpc' transport by installing the 'grpc' package and passing it during client initialization. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const grpc = require('grpc'); const pubsub = new PubSub({grpc}); ``` -------------------------------- ### PubSub#listSchemas / Schema#get Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Allows iteration over all project schemas or fetching full details of a specific schema. ```APIDOC ## PubSub#listSchemas Iterate over all project schemas. ### Method `pubsub.listSchemas([options])` ### Parameters #### Query Parameters - **view** (string) - Optional - Specifies the view of the schema to return (e.g., `SchemaViews.Basic`, `SchemaViews.Full`). Defaults to `SchemaViews.Basic`. ### Request Example ```javascript // List schema names (Basic view) for await (const s of pubsub.listSchemas(SchemaViews.Basic)) { console.log('Schema name:', s.name); } ``` ## Schema#get Fetch full details of a specific schema. ### Method `schema.get([options])` ### Parameters #### Query Parameters - **view** (string) - Optional - Specifies the view of the schema to return (e.g., `SchemaViews.Full`). Defaults to `SchemaViews.Basic`. ### Request Example ```javascript const schema = pubsub.schema('my-avro-schema'); const details = await schema.get(SchemaViews.Full); ``` ### Response #### Success Response (200) - **type** (string) - The type of the schema (e.g., 'AVRO', 'PROTOCOL_BUFFER'). - **definition** (string) - The schema definition. ``` -------------------------------- ### Close Subscription with Timeout Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Demonstrates closing a subscription with a specified timeout for graceful shutdown. Ensure the client library is installed and dependencies are met before running. ```javascript node closeSubscriptionWithTimeout.js ``` -------------------------------- ### Manage IAM Policies for Pub/Sub Topics and Subscriptions Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Get, set, and test IAM policies on topics and subscriptions. Use `getPolicy` to retrieve the current policy, `setPolicy` to update it with new bindings, and `testPermissions` to check caller privileges. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const topic = pubsub.topic('my-topic'); const subscription = pubsub.subscription('my-subscription'); // Get current IAM policy for a topic const [policy] = await topic.iam.getPolicy(); console.log('Topic policy:', JSON.stringify(policy, null, 2)); // Grant publisher role to a service account const updatedPolicy = { bindings: [ { role: 'roles/pubsub.publisher', members: ['serviceAccount:publisher-sa@my-project.iam.gserviceaccount.com'], }, ], }; const [setPolicy] = await topic.iam.setPolicy(updatedPolicy); console.log('Updated topic policy:', JSON.stringify(setPolicy, null, 2)); // Test which IAM permissions the caller has on a subscription const [permissions] = await subscription.iam.testPermissions([ 'pubsub.subscriptions.consume', 'pubsub.subscriptions.delete', ]); console.log('Has consume permission:', permissions['pubsub.subscriptions.consume']); console.log('Has delete permission:', permissions['pubsub.subscriptions.delete']); ``` -------------------------------- ### Receive Pub/Sub Messages (Streaming Pull) Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Listen for messages on a subscription using the `'message'` event, which automatically starts streaming pull. Configure flow control and acknowledge messages using `message.ack()`. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); async function listenForMessages(subscriptionNameOrId, timeoutSeconds = 60) { const subscription = pubsub.subscription(subscriptionNameOrId, { flowControl: { maxMessages: 10, allowExcessMessages: false, }, ackDeadline: 60, }); let messageCount = 0; const messageHandler = message => { console.log(`Received message ${message.id}:`); console.log(` Data: ${message.data.toString()}`); console.log(` Attributes:`, message.attributes); console.log(` Published at: ${message.publishTime.toISOString()}`); messageCount++; message.ack(); // acknowledge so it is not re-delivered // message.nack() to nack and trigger re-delivery }; const errorHandler = err => { console.error('Subscription error:', err); }; subscription.on('message', messageHandler); subscription.on('error', errorHandler); subscription.on('debug', msg => console.debug('Debug:', msg.message)); subscription.on('close', () => console.log('Subscription closed.')); await new Promise(resolve => setTimeout(resolve, timeoutSeconds * 1000)); await subscription.close(); console.log(`Received ${messageCount} messages.`); await pubsub.close(); } listenForMessages('my-subscription', 30).catch(console.error); ``` -------------------------------- ### IAM Policy Management Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Get, set, and test IAM policies on topics and subscriptions at a fine-grained level using the `iam` property. ```APIDOC ## Topic#iam / Subscription#iam — IAM Policy Management Get, set, and test IAM policies on topics and subscriptions at a fine-grained level. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const topic = pubsub.topic('my-topic'); const subscription = pubsub.subscription('my-subscription'); // Get current IAM policy for a topic const [policy] = await topic.iam.getPolicy(); console.log('Topic policy:', JSON.stringify(policy, null, 2)); // Grant publisher role to a service account const updatedPolicy = { bindings: [ { role: 'roles/pubsub.publisher', members: ['serviceAccount:publisher-sa@my-project.iam.gserviceaccount.com'], }, ], }; const [setPolicy] = await topic.iam.setPolicy(updatedPolicy); console.log('Updated topic policy:', JSON.stringify(setPolicy, null, 2)); // Test which IAM permissions the caller has on a subscription const [permissions] = await subscription.iam.testPermissions([ 'pubsub.subscriptions.consume', 'pubsub.subscriptions.delete', ]); console.log('Has consume permission:', permissions['pubsub.subscriptions.consume']); console.log('Has delete permission:', permissions['pubsub.subscriptions.delete']); ``` ``` -------------------------------- ### Subscription — Receive Messages (Streaming Pull) Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Receives messages from a subscription using the streaming pull method. Listening for the 'message' event starts the stream automatically. ```APIDOC ## Subscription — Receive Messages (Streaming Pull) `Subscription` extends `EventEmitter`. Listening for the `'message'` event starts streaming pull automatically. ### Usage ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); async function listenForMessages(subscriptionNameOrId, timeoutSeconds = 60) { const subscription = pubsub.subscription(subscriptionNameOrId, { flowControl: { maxMessages: 10, // process at most 10 messages concurrently allowExcessMessages: false, }, ackDeadline: 60, // seconds before Pub/Sub re-delivers }); let messageCount = 0; const messageHandler = message => { console.log(`Received message ${message.id}:`); console.log(` Data: ${message.data.toString()}`); console.log(` Attributes:`, message.attributes); console.log(` Published at: ${message.publishTime.toISOString()}`); messageCount++; message.ack(); // acknowledge so it is not re-delivered // message.nack() to nack and trigger re-delivery }; const errorHandler = err => { console.error('Subscription error:', err); }; subscription.on('message', messageHandler); subscription.on('error', errorHandler); subscription.on('debug', msg => console.debug('Debug:', msg.message)); subscription.on('close', () => console.log('Subscription closed.')); await new Promise(resolve => setTimeout(resolve, timeoutSeconds * 1000)); await subscription.close(); console.log(`Received ${messageCount} messages.`); await pubsub.close(); } listenForMessages('my-subscription', 30).catch(console.error); ``` ``` -------------------------------- ### Manage Pub/Sub Topic Reference Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Get a reference to a topic without an API call, check its existence, fetch metadata, update labels, or delete the topic. Note that deleting a topic does not delete its subscriptions. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); // Get a topic reference (no API call yet) const topic = pubsub.topic('my-topic'); // Topic with message ordering enabled at publish time const orderedTopic = pubsub.topic('ordered-topic', {messageOrdering: true}); // Check if the topic exists const [exists] = await topic.exists(); console.log(`Topic exists: ${exists}`); // Fetch full topic metadata from the API const [metadata] = await topic.getMetadata(); console.log('Topic labels:', metadata.labels); // Update topic metadata (e.g., add labels) await topic.setMetadata({labels: {env: 'production'}}); // Delete the topic (subscriptions are NOT deleted) await topic.delete(); ``` -------------------------------- ### PubSub Client Initialization Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt The `PubSub` class is the top-level entry point for the client library. It supports various initialization methods including Application Default Credentials, explicit service account keys, and connecting to the Pub/Sub emulator. ```APIDOC ## PubSub Client Initialization The `PubSub` class is the top-level entry point. It uses Application Default Credentials by default, or accepts explicit credentials via `ClientConfig`. ```javascript const {PubSub} = require('@google-cloud/pubsub'); // Using Application Default Credentials (recommended for GCP-hosted apps) const pubsub = new PubSub(); // Explicit project + service account key const pubsubExplicit = new PubSub({ projectId: 'my-gcp-project', keyFilename: '/path/to/service-account.json', }); // Connecting to the Pub/Sub emulator (e.g., for local development) // Set PUBSUB_EMULATOR_HOST=localhost:8085 in environment, or: const pubsubEmulator = new PubSub({ apiEndpoint: 'localhost:8085', projectId: 'test-project', }); // With OpenTelemetry tracing enabled const pubsubOtel = new PubSub({enableOpenTelemetryTracing: true}); // Always close the client when done to release gRPC connections await pubsub.close(); ``` ``` -------------------------------- ### Run Sample Integration Tests with npm Source: https://github.com/googleapis/nodejs-pubsub/blob/main/CONTRIBUTING.md Execute sample integration tests with the npm run samples-test command. ```bash # Run sample integration tests. npm run samples-test ``` -------------------------------- ### Initialize Pub/Sub Client Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Initialize the Pub/Sub client using Application Default Credentials, explicit credentials, or connecting to the emulator. Always close the client when done. ```javascript const {PubSub} = require('@google-cloud/pubsub'); // Using Application Default Credentials (recommended for GCP-hosted apps) const pubsub = new PubSub(); // Explicit project + service account key const pubsubExplicit = new PubSub({ projectId: 'my-gcp-project', keyFilename: '/path/to/service-account.json', }); // Connecting to the Pub/Sub emulator (e.g., for local development) // Set PUBSUB_EMULATOR_HOST=localhost:8085 in environment, or: const pubsubEmulator = new PubSub({ apiEndpoint: 'localhost:8085', projectId: 'test-project', }); // With OpenTelemetry tracing enabled const pubsubOtel = new PubSub({enableOpenTelemetryTracing: true}); // Always close the client when done to release gRPC connections await pubsub.close(); ``` -------------------------------- ### List Pub/Sub Topics, Subscriptions, and Snapshots Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Enumerate all resources in a project. Supports pagination and streaming for large result sets. Use `getTopicsStream` for efficient handling of many topics. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); // List all topics (Promise) const [topics] = await pubsub.getTopics(); topics.forEach(t => console.log(t.name)); // List subscriptions scoped to a specific topic const [subs] = await pubsub.getSubscriptions({topic: 'my-topic'}); subs.forEach(s => console.log(s.name)); // List all subscriptions project-wide const [allSubs] = await pubsub.getSubscriptions(); allSubs.forEach(s => console.log(s.name)); // Stream topics (useful for large result sets) pubsub.getTopicsStream() .on('error', console.error) .on('data', topic => console.log('Streamed topic:', topic.name)) .on('end', () => console.log('Done streaming topics.')); // List snapshots const [snapshots] = await pubsub.getSnapshots(); snapshots.forEach(s => console.log(s.name)); // Paginated listing with page size const [page, , response] = await pubsub.getTopics({pageSize: 5}); console.log(`Got ${page.length} topics, token: ${response.nextPageToken}`); ``` -------------------------------- ### Run Unit Tests with npm Source: https://github.com/googleapis/nodejs-pubsub/blob/main/CONTRIBUTING.md Execute unit tests using the npm test command. ```bash # Run unit tests. npm test ``` -------------------------------- ### Run System Tests with npm Source: https://github.com/googleapis/nodejs-pubsub/blob/main/CONTRIBUTING.md Execute all system tests using the npm run system-test command. ```bash # Run all system tests. npm run system-test ``` -------------------------------- ### Enable OpenTelemetry Tracing for Pub/Sub Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Integrate OpenTelemetry for distributed tracing in publishers and subscribers by setting `enableOpenTelemetryTracing: true` in the Pub/Sub client configuration. Requires configuring an OpenTelemetry provider and exporter. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node'); const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); const {TraceExporter} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); const {Resource} = require('@opentelemetry/resources'); const {SEMRESATTRS_SERVICE_NAME} = require('@opentelemetry/semantic-conventions'); // Configure OpenTelemetry provider and exporter const provider = new NodeTracerProvider({ resource: new Resource({[SEMRESATTRS_SERVICE_NAME]: 'my-pubsub-service'}), }); const exporter = new TraceExporter(); // exports to Google Cloud Trace provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); provider.register(); // Enable tracing in the Pub/Sub client const pubsub = new PubSub({enableOpenTelemetryTracing: true}); const topic = pubsub.topic('my-topic'); const messageId = await topic.publishMessage({data: Buffer.from('traced message')}); console.log(`Published traced message: ${messageId}`); // Ensure all spans are exported before shutdown await topic.flush(); await pubsub.close(); ``` -------------------------------- ### Create Avro and Protobuf Schemas Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Define and create schemas for Avro and Protocol Buffer formats to enforce message structure for topics. Schemas can be created from files or definition strings. Includes listing, validating, and deleting schemas. ```javascript const {PubSub, SchemaTypes, Encodings} = require('@google-cloud/pubsub'); const fs = require('fs'); const pubsub = new PubSub(); async function createSchemas() { // Create an Avro schema from a .avsc file const avroDefinition = fs.readFileSync('my-schema.avsc').toString(); const avroSchema = await pubsub.createSchema( 'my-avro-schema', SchemaTypes.Avro, avroDefinition, ); console.log(`Avro schema created: ${await avroSchema.getName()}`); // Create a Protobuf schema from a .proto definition string const protoDefinition = ` syntax = "proto3"; message MyMessage { string user_id = 1; string event = 2; } `; const protoSchema = await pubsub.createSchema( 'my-proto-schema', SchemaTypes.ProtocolBuffer, protoDefinition, ); console.log(`Proto schema created: ${await protoSchema.getName()}`); // List all schemas in the project for await (const s of pubsub.listSchemas()) { console.log('Schema:', s.name); } // Validate schema definition before creating await pubsub.validateSchema({ type: 'AVRO', definition: avroDefinition, }); console.log('Schema definition is valid.'); // Delete a schema const schema = pubsub.schema('my-avro-schema'); await schema.delete(); console.log('Schema deleted.'); } createSchemas().catch(console.error); ``` -------------------------------- ### Create Snapshot and Modify Push Configuration Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Manage subscription states by creating point-in-time snapshots for later seeking or reconfiguring push delivery endpoints. Pass an empty `pushConfig` to switch back to pull delivery. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const subscription = pubsub.subscription('my-subscription'); // Create a snapshot at the current cursor position const [snapshot] = await subscription.createSnapshot('my-snapshot'); console.log(`Snapshot ${snapshot.name} created.`); // Update push endpoint (pass empty pushConfig to switch back to pull) await subscription.modifyPushConfig({ pushEndpoint: 'https://new-endpoint.example.com/push', oidcToken: { serviceAccountEmail: 'sa@my-project.iam.gserviceaccount.com', }, }); console.log('Push config updated.'); ``` -------------------------------- ### Pub/Sub Listing APIs Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Enumerate all resources in a project, with optional pagination and streaming support for topics and subscriptions. Also includes listing snapshots. ```APIDOC ## Pub/Sub Listing APIs — Topics, Subscriptions, Snapshots Enumerate all resources in a project, with optional pagination and streaming support. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); // List all topics (Promise) const [topics] = await pubsub.getTopics(); topics.forEach(t => console.log(t.name)); // List subscriptions scoped to a specific topic const [subs] = await pubsub.getSubscriptions({topic: 'my-topic'}); subs.forEach(s => console.log(s.name)); // List all subscriptions project-wide const [allSubs] = await pubsub.getSubscriptions(); allSubs.forEach(s => console.log(s.name)); // Stream topics (useful for large result sets) pubsub.getTopicsStream() .on('error', console.error) .on('data', topic => console.log('Streamed topic:', topic.name)) .on('end', () => console.log('Done streaming topics.')); // List snapshots const [snapshots] = await pubsub.getSnapshots(); snapshots.forEach(s => console.log(s.name)); // Paginated listing with page size const [page, , response] = await pubsub.getTopics({pageSize: 5}); console.log(`Got ${page.length} topics, token: ${response.nextPageToken}`); ``` ``` -------------------------------- ### List and Inspect Schemas Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Iterate over all schemas in a project using a basic view or fetch full details of a specific schema, including its type and definition. Requires importing `SchemaViews`. ```javascript const {PubSub, SchemaViews} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); // List schema names (Basic view) for await (const s of pubsub.listSchemas(SchemaViews.Basic)) { console.log('Schema name:', s.name); } // Get full schema definition const schema = pubsub.schema('my-avro-schema'); const details = await schema.get(SchemaViews.Full); console.log('Schema type:', details.type); console.log('Schema definition:', details.definition); ``` -------------------------------- ### Create Proto Schema Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new schema definition using Protocol Buffers (Proto) format. Requires the schema name and the Proto schema filename. ```javascript node createProtoSchema.js ``` -------------------------------- ### Configure Subscriber Options at Runtime Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Update ackDeadline, streamingOptions, and flowControl on a live subscription without recreating it. Extend ackDeadline to allow for slower message processing. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const subscription = pubsub.subscription('my-subscription'); subscription.setOptions({ ackDeadline: 120, // seconds; extend to allow slower processing streamingOptions: { maxStreams: 2, // max concurrent gRPC streaming pull connections }, flowControl: { maxMessages: 100, maxBytes: 50 * 1024 * 1024, // 50 MB allowExcessMessages: true, }, }); subscription.on('message', async message => { // Longer processing is now covered by the extended ack deadline await processMessage(message.data); message.ack(); }); ``` -------------------------------- ### Publish Raw Bytes, JSON, and Ordered Messages Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Demonstrates publishing messages as raw bytes, JSON objects (which are automatically serialized), and with ordering keys. Ensure topics with ordering keys have `messageOrdering: true` enabled. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const topic = pubsub.topic('my-topic'); async function publishExamples() { // Publish raw bytes const msgId1 = await topic.publishMessage({ data: Buffer.from('Hello, world!'), }); console.log(`Published message ID: ${msgId1}`); // Publish JSON (automatically serialized to Buffer) const msgId2 = await topic.publishMessage({ json: {userId: 42, event: 'signup'}, attributes: {origin: 'signup-service', version: '1'}, }); console.log(`Published JSON message ID: ${msgId2}`); // Publish with ordering key (topic must have messageOrdering: true) const orderedTopic = pubsub.topic('ordered-topic', {messageOrdering: true}); const msgId3 = await orderedTopic.publishMessage({ data: Buffer.from('ordered event'), orderingKey: 'user-123', }); console.log(`Published ordered message ID: ${msgId3}`); // Flush any buffered (batched) messages before shutting down await topic.flush(); await pubsub.close(); } publishExamples().catch(console.error); ``` -------------------------------- ### Create Pub/Sub Topic Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Create a new topic in your Google Cloud project. Topics can be created with a simple name or with ingestion settings for sources like AWS Kinesis. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); async function createTopics() { // Simple topic by name const [topic] = await pubsub.createTopic('my-topic'); console.log(`Topic ${topic.name} created.`); // Output: Topic projects/my-project/topics/my-topic created. // Topic with AWS Kinesis ingestion source await pubsub.createTopic({ name: 'kinesis-ingest-topic', ingestionDataSourceSettings: { awsKinesis: { awsRoleArn: 'arn:aws:iam::123456789012:role/MyRole', gcpServiceAccount: 'ingestion-sa@my-project.iam.gserviceaccount.com', streamArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/MyStream', consumerArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/MyStream/consumer/MyConsumer', }, }, }); console.log('Kinesis-backed topic created.'); } createTopics().catch(console.error); ``` -------------------------------- ### Create Subscription Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new standard subscription. Requires the topic and subscription names or IDs. ```javascript node createSubscription.js ``` -------------------------------- ### PubSub#createTopic Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Creates a new topic in the project. This method can accept a simple topic name string or a `TopicMetadata` object for advanced configurations like ingestion settings. ```APIDOC ## PubSub#createTopic — Create a Topic Creates a new topic in the project. Accepts a plain name string or a full `TopicMetadata` object (for ingestion settings, labels, etc.). ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); async function createTopics() { // Simple topic by name const [topic] = await pubsub.createTopic('my-topic'); console.log(`Topic ${topic.name} created.`); // Output: Topic projects/my-project/topics/my-topic created. // Topic with AWS Kinesis ingestion source await pubsub.createTopic({ name: 'kinesis-ingest-topic', ingestionDataSourceSettings: { awsKinesis: { awsRoleArn: 'arn:aws:iam::123456789012:role/MyRole', gcpServiceAccount: 'ingestion-sa@my-project.iam.gserviceaccount.com', streamArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/MyStream', consumerArn: 'arn:aws:kinesis:us-east-1:123456789012:stream/MyStream/consumer/MyConsumer', }, }, }); console.log('Kinesis-backed topic created.'); } createTopics().catch(console.error); ``` ``` -------------------------------- ### Create Pub/Sub Subscriptions Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Use `createSubscription` to attach new pull or push subscriptions to a topic. Configure options for dead-lettering, OIDC tokens for push subscriptions, exactly-once delivery, message ordering, and message retention. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); async function createSubscriptions() { // Basic pull subscription const [sub] = await pubsub.createSubscription('my-topic', 'my-subscription'); console.log(`Subscription ${sub.name} created.`); // Push subscription with OIDC token await pubsub.createSubscription('my-topic', 'my-push-sub', { pushEndpoint: 'https://example.com/push', oidcToken: { serviceAccountEmail: 'sa@my-project.iam.gserviceaccount.com', audience: 'https://example.com', }, }); // Dead-letter policy: re-route undeliverable messages after 10 attempts await pubsub.topic('my-topic').createSubscription('my-dlq-sub', { deadLetterPolicy: { deadLetterTopic: pubsub.topic('my-dead-letter-topic').name, maxDeliveryAttempts: 10, }, }); // Exactly-once delivery await pubsub.topic('my-topic').createSubscription('my-eod-sub', { enableExactlyOnceDelivery: true, }); // Ordering enabled (publisher must also use orderingKey) await pubsub.topic('my-topic').createSubscription('my-ordered-sub', { enableMessageOrdering: true, }); // Message retention extended to 3 days (in seconds) await pubsub.topic('my-topic').createSubscription('my-retained-sub', { messageRetentionDuration: 3 * 24 * 60 * 60, retainAckedMessages: true, }); } createSubscriptions().catch(console.error); ``` -------------------------------- ### Subscription#createSnapshot / Subscription#modifyPushConfig Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Provides methods to create point-in-time snapshots of a subscription's message backlog and to reconfigure push delivery settings for a subscription. ```APIDOC ## Subscription#createSnapshot Create point-in-time snapshots for later seeking. ### Method `subscription.createSnapshot(snapshotId, [options])` ### Parameters #### Path Parameters - **snapshotId** (string) - Required - The ID for the new snapshot. ### Request Example ```javascript const [snapshot] = await subscription.createSnapshot('my-snapshot'); ``` ### Response #### Success Response (200) - **snapshot** (object) - The created snapshot object. ## Subscription#modifyPushConfig Reconfigure push delivery on an existing subscription. ### Method `subscription.modifyPushConfig(options)` ### Parameters #### Request Body - **pushEndpoint** (string) - Required - The new push endpoint URL. Pass an empty string or omit to switch back to pull delivery. - **oidcToken** (object) - Optional - Configuration for OIDC token authentication. - **serviceAccountEmail** (string) - Required - The service account email to use for authentication. ``` -------------------------------- ### Create BigQuery Subscription Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new subscription that delivers messages to a BigQuery table. Requires topic, subscription, and BigQuery table IDs. ```javascript node createBigQuerySubscription.js ``` -------------------------------- ### Low-Level v1 GAPIC Clients for Advanced Usage Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Use the v1 clients for direct access to the gRPC API, such as streaming pull management and admin operations. Obtain client options from PubSub#getClientConfig to resolve project ID and credentials. ```javascript const {PubSub, v1} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); // Get client options from the high-level instance (resolves projectId, credentials, etc.) const options = await pubsub.getClientConfig(); // Instantiate low-level publisher and subscriber clients const publisherClient = new v1.PublisherClient(options); const subscriberClient = new v1.SubscriberClient(options); // Use low-level publisher to publish directly const topicPath = publisherClient.topicPath('my-project', 'my-topic'); const [publishResponse] = await publisherClient.publish({ topic: topicPath, messages: [{data: Buffer.from('low-level message')}], }); console.log('Published message IDs:', publishResponse.messageIds); // Use low-level subscriber to pull synchronously const subscriptionPath = subscriberClient.subscriptionPath('my-project', 'my-subscription'); const [pullResponse] = await subscriberClient.pull({ subscription: subscriptionPath, maxMessages: 5, }); for (const {message, ackId} of pullResponse.receivedMessages ?? []) { console.log('Received:', Buffer.from(message.data).toString()); await subscriberClient.acknowledge({subscription: subscriptionPath, ackIds: [ackId]}); } ``` -------------------------------- ### Topic#resumePublishing Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt When an ordered message publish fails, all subsequent messages with that ordering key are blocked. Call `resumePublishing` to unblock the key after handling the error. ```APIDOC ## Topic#resumePublishing — Resume After Ordering Failure When an ordered message publish fails, all subsequent messages with that ordering key are blocked. Call `resumePublishing` to unblock the key after handling the error. ### Method `topic.resumePublishing(orderingKey)` ### Parameters - `orderingKey` (string) - The ordering key to resume publishing for. ### Request Example ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const topic = pubsub.topic('ordered-topic', {messageOrdering: true}); const orderingKey = 'customer-456'; const data = Buffer.from('order event'); topic.publishMessage({data, orderingKey}, err => { if (err) { console.error(`Publish failed for key ${orderingKey}:`, err.message); // Allow the key to be used again in future publishes topic.resumePublishing(orderingKey); } }); ``` ``` -------------------------------- ### PubSub#createSchema Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Enforces message structure for topics by creating schemas. Supports both Avro and Protocol Buffer formats. ```APIDOC ## PubSub#createSchema — Create a Schema Schemas enforce message structure for topics. Both Avro and Protocol Buffer formats are supported. ### Method `pubsub.createSchema(name, type, definition, [options])` ### Parameters #### Path Parameters - **name** (string) - Required - The name of the schema. - **type** (string) - Required - The type of the schema (e.g., `SchemaTypes.Avro`, `SchemaTypes.ProtocolBuffer`). - **definition** (string) - Required - The schema definition in the specified format. ### Request Example ```javascript // Create an Avro schema from a .avsc file const avroDefinition = fs.readFileSync('my-schema.avsc').toString(); const avroSchema = await pubsub.createSchema( 'my-avro-schema', SchemaTypes.Avro, avroDefinition ); // Create a Protobuf schema from a .proto definition string const protoDefinition = ` syntax = "proto3"; message MyMessage { string user_id = 1; string event = 2; } `; const protoSchema = await pubsub.createSchema( 'my-proto-schema', SchemaTypes.ProtocolBuffer, protoDefinition ); ``` ### Related Operations - `pubsub.listSchemas()`: Lists all schemas in the project. - `pubsub.validateSchema(request)`: Validates a schema definition before creation. - `schema.delete()`: Deletes a schema. ``` -------------------------------- ### Subscription#setOptions Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Update ackDeadline, streamingOptions, and flowControl on a live subscription without recreating it. This is useful for adjusting processing capacity or message handling timeouts dynamically. ```APIDOC ## Subscription#setOptions ### Description Update `ackDeadline`, `streamingOptions`, and `flowControl` on a live subscription without recreating it. ### Method `subscription.setOptions(options)` ### Parameters #### Options Object - **ackDeadline** (number) - Optional - The maximum time in seconds to wait for message acknowledgment. - **streamingOptions** (object) - Optional - Configuration for streaming pull connections. - **maxStreams** (number) - Optional - The maximum number of concurrent gRPC streaming pull connections. - **flowControl** (object) - Optional - Configuration for flow control. - **maxMessages** (number) - Optional - The maximum number of messages to hold in the subscriber's buffer. - **maxBytes** (number) - Optional - The maximum size in bytes to hold in the subscriber's buffer. - **allowExcessMessages** (boolean) - Optional - Whether to allow exceeding `maxMessages` temporarily. ### Request Example ```javascript subscription.setOptions({ ackDeadline: 120, streamingOptions: { maxStreams: 2, }, flowControl: { maxMessages: 100, maxBytes: 50 * 1024 * 1024, // 50 MB allowExcessMessages: true, }, }); ``` ``` -------------------------------- ### Create Push Subscription Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new push subscription to deliver messages to a specified endpoint URL. Requires the endpoint URL, topic, and subscription names. ```javascript node createPushSubscription.js ``` -------------------------------- ### Synchronous Pull (Low-Level v1 API) Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Demonstrates how to perform a synchronous pull operation to retrieve messages from a subscription using the `v1.SubscriberClient`. This is suitable for one-shot or batch processing patterns. ```APIDOC ## Synchronous Pull (Low-Level v1 API) For one-shot or batch processing patterns, use the lower-level `v1.SubscriberClient` directly. ### Method `subClient.pull(request)` ### Parameters #### Request Body - **subscription** (string) - Required - The name of the subscription to pull messages from. - **maxMessages** (number) - Optional - The maximum number of messages to return. ### Request Example ```javascript const [response] = await subClient.pull({ subscription: formattedSub, maxMessages: 10, }); ``` ### Response #### Success Response (200) - **receivedMessages** (array) - An array of received messages, each containing a `message` object and an `ackId`. ### Response Example ```json { "receivedMessages": [ { "message": { "data": "SGVsbG8gV29ybGQh", "messageId": "1234567890", "publishTime": "2023-10-27T10:00:00Z" }, "ackId": "some-ack-id" } ] } ``` ### Related Operations - `subClient.acknowledge(request)`: Acknowledges received messages. ``` -------------------------------- ### Create Push Subscription With No Wrapper Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new push subscription with payload wrapping disabled. Requires the endpoint URL, topic, and subscription names. ```javascript node createPushSubscriptionNoWrapper.js ``` -------------------------------- ### OpenTelemetry Tracing Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Enable distributed tracing for publishers and subscribers by setting `enableOpenTelemetryTracing: true` in the Pub/Sub client configuration. ```APIDOC ## OpenTelemetry Tracing Enable distributed tracing for both publishers and subscribers by setting `enableOpenTelemetryTracing: true`. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const {NodeTracerProvider} = require('@opentelemetry/sdk-trace-node'); const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); const {TraceExporter} = require('@google-cloud/opentelemetry-cloud-trace-exporter'); const {Resource} = require('@opentelemetry/resources'); const {SEMRESATTRS_SERVICE_NAME} = require('@opentelemetry/semantic-conventions'); // Configure OpenTelemetry provider and exporter const provider = new NodeTracerProvider({ resource: new Resource({[SEMRESATTRS_SERVICE_NAME]: 'my-pubsub-service'}) }); const exporter = new TraceExporter(); // exports to Google Cloud Trace provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); provider.register(); // Enable tracing in the PubSub client const pubsub = new PubSub({enableOpenTelemetryTracing: true}); const topic = pubsub.topic('my-topic'); const messageId = await topic.publishMessage({data: Buffer.from('traced message')}); console.log(`Published traced message: ${messageId}`); // Ensure all spans are exported before shutdown await topic.flush(); await pubsub.close(); ``` ``` -------------------------------- ### Lint and Fix Code with npm Source: https://github.com/googleapis/nodejs-pubsub/blob/main/CONTRIBUTING.md Use this command to lint your code and automatically fix style issues. ```bash # Lint (and maybe fix) any changes: npm run fix ``` -------------------------------- ### Create Avro Schema Source: https://github.com/googleapis/nodejs-pubsub/blob/main/samples/README.md Creates a new schema definition using Avro format. Provide the schema name and the Avro schema filename. ```javascript node createAvroSchema.js ``` -------------------------------- ### Synchronous Pull Subscription (v1 API) Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Use the low-level `v1.SubscriberClient` for one-shot or batch processing patterns. This client allows for direct control over subscription and message acknowledgment. ```javascript const {v1} = require('@google-cloud/pubsub'); const subClient = new v1.SubscriberClient(); async function synchronousPull(projectId, subscriptionNameOrId) { const formattedSub = subscriptionNameOrId.includes('/') ? subscriptionNameOrId : subClient.subscriptionPath(projectId, subscriptionNameOrId); const [response] = await subClient.pull({ subscription: formattedSub, maxMessages: 10, }); const ackIds = []; for (const {message, ackId} of response.receivedMessages ?? []) { console.log(`Received: ${Buffer.from(message.data).toString()}`); if (ackId) ackIds.push(ackId); } if (ackIds.length > 0) { await subClient.acknowledge({ subscription: formattedSub, ackIds, }); console.log(`Acknowledged ${ackIds.length} messages.`); } } synchronousPull('my-project', 'my-subscription').catch(console.error); ``` -------------------------------- ### Resume Publishing After Ordering Failure Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt If an ordered message publish fails, subsequent messages for that ordering key are blocked. Call `resumePublishing(orderingKey)` to unblock the key after resolving the error. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const topic = pubsub.topic('ordered-topic', {messageOrdering: true}); const orderingKey = 'customer-456'; const data = Buffer.from('order event'); topic.publishMessage({data, orderingKey}, err => { if (err) { console.error(`Publish failed for key ${orderingKey}:`, err.message); // Allow the key to be used again in future publishes topic.resumePublishing(orderingKey); } }); ``` -------------------------------- ### Low-Level v1 GAPIC Clients Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Provides advanced usage for direct access to the gRPC API, including streaming pull management and administrative operations. This is useful for scenarios requiring fine-grained control over the Pub/Sub communication. ```APIDOC ## Low-Level v1 GAPIC Clients ### Description For direct access to the gRPC API (e.g., streaming pull management, admin operations), use the `v1` clients obtained from `PubSub#getClientConfig`. ### Usage 1. Obtain client configuration from a `PubSub` instance. 2. Instantiate `v1.PublisherClient` and/or `v1.SubscriberClient` with the configuration. 3. Use the low-level clients for direct gRPC interactions. ### Methods #### `PubSub#getClientConfig()` - Returns a Promise that resolves with the client configuration object. #### `v1.PublisherClient(options)` - Constructor for the low-level publisher client. #### `v1.SubscriberClient(options)` - Constructor for the low-level subscriber client. ### Request Example (Publishing) ```javascript const {PubSub, v1} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const options = await pubsub.getClientConfig(); const publisherClient = new v1.PublisherClient(options); const topicPath = publisherClient.topicPath('my-project', 'my-topic'); const [publishResponse] = await publisherClient.publish({ topic: topicPath, messages: [{data: Buffer.from('low-level message')}], }); console.log('Published message IDs:', publishResponse.messageIds); ``` ### Request Example (Pulling) ```javascript const {PubSub, v1} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const options = await pubsub.getClientConfig(); const subscriberClient = new v1.SubscriberClient(options); const subscriptionPath = subscriberClient.subscriptionPath('my-project', 'my-subscription'); const [pullResponse] = await subscriberClient.pull({ subscription: subscriptionPath, maxMessages: 5, }); for (const {message, ackId} of pullResponse.receivedMessages ?? []) { console.log('Received:', Buffer.from(message.data).toString()); await subscriberClient.acknowledge({subscription: subscriptionPath, ackIds: [ackId]}); } ``` ``` -------------------------------- ### Seek Pub/Sub Subscription Source: https://context7.com/googleapis/nodejs-pubsub/llms.txt Use `subscription.seek()` to rewind or fast-forward a subscription. This allows replaying messages from a specific snapshot or timestamp. ```javascript const {PubSub} = require('@google-cloud/pubsub'); const pubsub = new PubSub(); const subscription = pubsub.subscription('my-subscription'); // Seek to a named snapshot await subscription.seek('my-snapshot'); console.log('Seeked to snapshot.'); // Seek to a specific point in time (replay messages since then) const replayFrom = new Date('2024-01-01T00:00:00Z'); await subscription.seek(replayFrom); console.log(`Seeked to ${replayFrom.toISOString()}.`); ```