### Quick Start with @platformatic/job-queue Source: https://github.com/platformatic/job-queue/blob/main/README.md Demonstrates creating a queue with in-memory storage, registering a job handler, starting the queue, enqueuing jobs, and handling results. Includes examples for fire-and-forget and wait-for-result scenarios. ```typescript import { Queue, MemoryStorage } from '@platformatic/job-queue' // Create a queue with in-memory storage const storage = new MemoryStorage() const queue = new Queue<{ email: string }, { sent: boolean }>({ storage, concurrency: 5 }) // Register a job handler queue.execute(async (job) => { console.log(`Processing job ${job.id}:`, job.payload) // Send email... return { sent: true } }) // Start the queue await queue.start() // Enqueue jobs await queue.enqueue('email-1', { email: 'user@example.com' }) // Optional per-job TTL override for cached result/error await queue.enqueue('email-ttl', { email: 'ttl@example.com' }, { resultTTL: 5 * 60 * 1000 // 5 minutes }) // Or wait for the result const result = await queue.enqueueAndWait('email-2', { email: 'another@example.com' }, { timeout: 30000, resultTTL: 24 * 60 * 60 * 1000 // keep this result for 24h }) console.log('Result:', result) // { sent: true } // Graceful shutdown await queue.stop() ``` -------------------------------- ### Queue Start Method Source: https://github.com/platformatic/job-queue/blob/main/README.md Connects to the storage backend and begins processing jobs from the queue. ```APIDOC ## `queue.start(): Promise` Connect to storage and start processing jobs. ``` -------------------------------- ### Start Test Infrastructure with Docker Source: https://github.com/platformatic/job-queue/blob/main/README.md Use Docker to start the Redis and Valkey containers for testing purposes. ```bash npm run docker:up ``` -------------------------------- ### MemoryStorage Example Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Demonstrates how to initialize and use the MemoryStorage for testing and development. ```APIDOC ## MemoryStorage Initialization ### Description Example of setting up a Queue with in-memory storage. ### Usage ```typescript import { Queue, MemoryStorage } from '@platformatic/job-queue'; // Define your payload and result types interface MyPayload { /* ... */ } interface MyResult { /* ... */ } const queue = new Queue({ storage: new MemoryStorage(), concurrency: 1, }); // Define your job execution logic queue.execute(async (job) => { // Process the job using job.payload return processJob(job.payload); }); await queue.start(); // Example of enqueuing a job and waiting for its result const result = await queue.enqueueAndWait('job-1', payload); ``` ### Characteristics - Suitable for single-process applications, unit tests, and development. - Jobs are processed immediately by the local handler. - State is not persistent and is lost upon process exit. - Offers full API compatibility with RedisStorage. ``` -------------------------------- ### Install @platformatic/job-queue Source: https://github.com/platformatic/job-queue/blob/main/README.md Install the job queue package using npm. ```bash npm install @platformatic/job-queue ``` -------------------------------- ### Producer Example: Enqueueing Jobs Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Demonstrates how to initialize a Queue with RedisStorage and enqueue jobs. Jobs with the same ID are deduplicated. ```typescript import { Queue, RedisStorage } from '@platformatic/job-queue'; const queue = new Queue<{ email: string; template: string }, void>({ storage: new RedisStorage({ host: 'localhost', port: 6379, prefix: 'email-queue' }), }); await queue.start(); // Fire and forget await queue.enqueue('email-123', { email: 'user@example.com', template: 'welcome', }); // Deduplicated - same ID won't be queued twice const result = await queue.enqueue('email-123', { email: 'user@example.com', template: 'welcome', }); // result.status === 'duplicate' await queue.stop(); ``` -------------------------------- ### Initialize and Use PgStorage with Job Queue Source: https://context7.com/platformatic/job-queue/llms.txt Demonstrates setting up a job queue with PostgreSQL storage. Ensure the 'pg' peer dependency is installed. Tables are auto-created on connect. ```typescript import { Queue, PgStorage } from '@platformatic/job-queue' const storage = new PgStorage({ connectionString: process.env.DATABASE_URL ?? 'postgresql://localhost:5432/mydb', tablePrefix: 'jq_', // tables: jq_jobs, jq_queue, jq_processing, jq_results, jq_errors, jq_workers, jq_locks }) // Tables are created automatically on first connect() const queue = new Queue<{ payload: string }, { result: string }>({ storage, concurrency: 10, resultTTL: 2 * 60 * 60_000 }) queue.execute(async (job) => { const result = await processPayload(job.payload.payload) return { result } }) await queue.start() const enqueueResult = await queue.enqueue('pg-job-001', { payload: 'hello' }) console.log(enqueueResult.status) // 'queued' const final = await queue.enqueueAndWait('pg-job-002', { payload: 'world' }, { timeout: 10_000 }) console.log(final.result) // 'processed: world' await queue.stop() ``` -------------------------------- ### Serialization Interface and Examples Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Defines the `Serde` interface for custom serialization and provides examples for JSON, MessagePack, and Protocol Buffers. ```APIDOC ## Serialization Support ### Description Allows pluggable serialization for message payloads and results. ### Interface ```typescript interface Serde { serialize(value: T): Buffer; deserialize(buffer: Buffer): T; } ``` ### JSON Serde Ships with a default JSON serde, accessible via import. ```typescript import { JsonSerde, createJsonSerde } from '@platformatic/job-queue'; // Using the class const jsonSerde = new JsonSerde(); // Using the factory function const jsonSerde = createJsonSerde(); ``` ### Custom Serdes Examples for implementing custom serialization formats. **MessagePack Serde:** ```typescript import { type Serde } from '@platformatic/job-queue'; import { encode, decode } from '@msgpack/msgpack'; const msgpackSerde = (): Serde => ({ serialize: (value) => Buffer.from(encode(value)), deserialize: (buffer) => decode(buffer) as T, }); ``` **Protocol Buffers Serde:** ```typescript import { type Serde } from '@platformatic/job-queue'; import { MyMessage } from './generated/my_message_pb.ts'; // Assuming generated protobuf types const protobufSerde: Serde = { serialize: (value) => Buffer.from(value.toBinary()), deserialize: (buffer) => MyMessage.fromBinary(buffer), }; ``` ``` -------------------------------- ### Consumer Example: Processing Jobs Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Shows how to set up a consumer Queue with RedisStorage, define an idempotent job execution handler, and manage graceful shutdown. ```typescript import { Queue, RedisStorage } from '@platformatic/job-queue'; const queue = new Queue<{ email: string; template: string }, void>({ storage: new RedisStorage({ host: 'localhost', port: 6379, prefix: 'email-queue' }), workerId: 'worker-1', concurrency: 5, }); // Handler MUST be idempotent - it may be called multiple times for the same job queue.execute(async (job) => { // Use job.id as idempotency key with email provider await sendEmail(job.payload.email, job.payload.template, { idempotencyKey: job.id, }); }); queue.on('error', (error) => { console.error('Queue error:', error); }); await queue.start(); // Graceful shutdown process.on('SIGTERM', async () => { await queue.stop(); }); ``` -------------------------------- ### Create and Configure a Job Queue Source: https://context7.com/platformatic/job-queue/llms.txt Demonstrates setting up a Redis-backed job queue with custom configurations for concurrency, retries, timeouts, and result caching. Includes an example of dynamically adjusting result TTL based on job completion status. ```typescript import { Queue, RedisStorage, TimeoutError, JobFailedError } from '@platformatic/job-queue' import pino from 'pino' interface EmailPayload { to: string; subject: string; body: string } interface EmailResult { messageId: string; deliveredAt: number } const storage = new RedisStorage({ url: process.env.REDIS_URL ?? 'redis://localhost:6379', keyPrefix: 'email:', logger: pino() }) const queue = new Queue({ storage, name: 'email-queue', // namespace isolation on shared storage workerId: `worker-${process.pid}`, concurrency: 5, maxRetries: 3, blockTimeout: 5, // seconds to long-poll when queue is empty visibilityTimeout: 30_000, // ms before a processing job is considered stalled resultTTL: 24 * 60 * 60_000, // cache results for 24 hours logger: pino(), afterExecution: async (ctx) => { // Dynamically adjust TTL based on result content if (ctx.status === 'completed' && ctx.result?.deliveredAt) { ctx.ttl = 7 * 24 * 60 * 60_000 // keep delivered emails 7 days } } }) // Register handler — starts consuming immediately after queue.start() queue.execute(async (job) => { // job.signal is an AbortSignal; check it in long-running work if (job.signal.aborted) throw new Error('Aborted') console.log(`[attempt ${job.attempts}] Sending email ${job.id}`) const messageId = await sendEmail(job.payload) return { messageId, deliveredAt: Date.now() } }) // Event listeners queue.on('started', () => console.log('Queue started')) queue.on('stopped', () => console.log('Queue stopped')) queue.on('enqueued', (id) => console.log(`Enqueued: ${id}`)) queue.on('completed', (id, result) => console.log(`Done: ${id}`, result)) queue.on('failed', (id, err) => console.error(`Failed: ${id}`, err.message)) queue.on('failing', (id, err, att) => console.warn(`Retry ${att}: ${id}`, err.message)) queue.on('cancelled', (id) => console.log(`Cancelled: ${id}`)) queue.on('error', (err) => console.error('Queue error:', err)) await queue.start() // Graceful shutdown process.on('SIGTERM', async () => { await queue.stop() // waits for in-flight jobs process.exit(0) }) ``` -------------------------------- ### Custom Serialization with Serde Interface Source: https://context7.com/platformatic/job-queue/llms.txt Illustrates how to implement custom serialization for job payloads and results using the `Serde` interface, with an example using MessagePack. ```APIDOC ## Serialization — `Serde` Interface / `JsonSerde` / Custom Serde ### Custom payload and result serialization The `Serde` interface enables custom binary serialization for payloads and results. ```typescript import { Queue, MemoryStorage, type Serde } from '@platformatic/job-queue' import * as msgpack from 'msgpackr' class MsgPackSerde implements Serde { serialize (value: T): Buffer { return msgpack.pack(value) as Buffer } deserialize (buffer: Buffer): T { return msgpack.unpack(buffer) as T } } interface HeavyPayload { data: Uint8Array; metadata: Record } interface HeavyResult { checksum: string; processedBytes: number } const queue = new Queue({ storage: new MemoryStorage(), payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde(), }) queue.execute(async (job) => { const checksum = computeChecksum(job.payload.data) return { checksum, processedBytes: job.payload.data.length } }) await queue.start() await queue.enqueue('heavy-job-1', { data: new Uint8Array([1, 2, 3, 4, 5]), metadata: { source: 'upload', userId: '99' } }) ``` ### Usage - Implement the `Serde` interface with `serialize` and `deserialize` methods. - Provide instances of your custom `Serde` to the `payloadSerde` and `resultSerde` options in the `Queue` constructor. ``` -------------------------------- ### Queue Execute Method Source: https://github.com/platformatic/job-queue/blob/main/README.md Registers a handler function that will be executed for incoming jobs. This can be called before or after `start()`. ```APIDOC ## `queue.execute(handler): void` Register a job handler. Call before or after `start()`. ```typescript queue.execute(async (job) => { // job.id - unique job identifier // job.payload - the job data // job.attempts - current attempt number (starts at 1) // job.signal - AbortSignal for cancellation return result }) ``` ``` -------------------------------- ### Implement Custom Serialization with MsgPackSerde Source: https://github.com/platformatic/job-queue/blob/main/README.md Implement the Serde interface for custom serialization and deserialization of job payloads and results. This example uses msgpackr. ```typescript import { Serde } from '@platformatic/job-queue' import * as msgpack from 'msgpackr' class MsgPackSerde implements Serde { serialize(value: T): Buffer { return msgpack.pack(value) } deserialize(buffer: Buffer): T { return msgpack.unpack(buffer) as T } } const queue = new Queue({ storage, payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde() }) ``` -------------------------------- ### Producer Process for Enqueuing Tasks Source: https://github.com/platformatic/job-queue/blob/main/README.md Example of a producer process that connects to Redis storage and enqueues a task. Ensure REDIS_URL is set in the environment. ```typescript // producer.ts import { Queue, RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: process.env.REDIS_URL }) const producer = new Queue({ storage }) await producer.start() await producer.enqueue('task-1', { ... }) await producer.stop() ``` -------------------------------- ### Queue Class - Lifecycle Methods Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Methods for managing the lifecycle of the job queue, including starting and stopping the queue. ```APIDOC ## Queue Class - Lifecycle ### Description Methods to control the queue's operational state. ### Methods - `async start(): Promise`: Initializes and starts the job queue processing. - `async stop(): Promise`: Gracefully shuts down the job queue and stops processing jobs. ``` -------------------------------- ### Worker Process with Queue and Reaper Source: https://github.com/platformatic/job-queue/blob/main/README.md Example of a worker process that sets up a Queue with a specified concurrency and a Reaper for stalled jobs. Handles SIGTERM for graceful shutdown. ```typescript // worker.ts import { Queue, RedisStorage, Reaper } from '@platformatic/job-queue' const storage = new RedisStorage({ url: process.env.REDIS_URL }) const queue = new Queue({ storage, workerId: `worker-${process.pid}`, concurrency: 10 }) const reaper = new Reaper({ storage, visibilityTimeout: 30000 }) queue.execute(async (job) => { // Process job return result }) await queue.start() await reaper.start() process.on('SIGTERM', async () => { await queue.stop() await reaper.stop() }) ``` -------------------------------- ### Queue Class Initialization Source: https://github.com/platformatic/job-queue/blob/main/README.md Demonstrates how to initialize the Queue class with a specific storage backend and configuration options. ```APIDOC ## Queue Class The main class that combines producer and consumer functionality. ```typescript import { Queue } from '@platformatic/job-queue' const queue = new Queue(config) ``` ### Configuration | Option | Type | Default | Description | |--------|------|---------|-------------| | `storage` | `Storage` | *required* | Storage backend instance | | `workerId` | `string` | `uuid()` | Unique identifier for this worker | | `concurrency` | `number` | `1` | Number of jobs to process in parallel | | `maxRetries` | `number` | `3` | Maximum retry attempts for failed jobs | | `blockTimeout` | `number` | `5` | Seconds to wait when polling for jobs | | `visibilityTimeout` | `number` | `30000` | Milliseconds before a processing job is considered stalled | | `resultTTL` | `number` | `3600000` | Milliseconds to cache job results (1 hour) | | `logger` | `pino.Logger` | `abstractLogger` | Logger used by queue/producer/consumer | | `afterExecution` | `AfterExecutionHook` | `undefined` | Hook called after execution and before persisting terminal state | | `payloadSerde` | `Serde` | `JsonSerde` | Custom serializer for job payloads | | `resultSerde` | `Serde` | `JsonSerde` | Custom serializer for job results | ``` -------------------------------- ### Run Benchmarks with Valkey Source: https://github.com/platformatic/job-queue/blob/main/README.md Execute benchmark tests using Valkey as the storage backend. Ensure Valkey is running on the default port. ```bash npm run bench:valkey ``` -------------------------------- ### Queue Constructor and Usage Source: https://context7.com/platformatic/job-queue/llms.txt Demonstrates how to instantiate and use the Queue class for background job processing, including setting up storage, defining job payloads and results, registering handlers, and handling events. ```APIDOC ## Queue ### `new Queue(config)` Creates a combined producer/consumer queue. Call `execute()` to register a handler (making it a consumer); without a handler it operates as a producer only. Extends `EventEmitter` for lifecycle and job events. ### Parameters - **config** (object) - Required - Configuration object for the queue. - **storage** (Storage) - Required - The storage backend for the queue. - **name** (string) - Required - A namespace isolation key for shared storage. - **workerId** (string) - Required - Identifier for the current worker. - **concurrency** (number) - Optional - Maximum number of jobs to process concurrently. Defaults to 1. - **maxRetries** (number) - Optional - Maximum number of times a job should be retried on failure. Defaults to 3. - **blockTimeout** (number) - Optional - Seconds to long-poll when the queue is empty. Defaults to 5. - **visibilityTimeout** (number) - Optional - Milliseconds before a processing job is considered stalled. Defaults to 30000. - **resultTTL** (number) - Optional - Cache time-to-live for job results in milliseconds. Defaults to 24 * 60 * 60 * 1000 (24 hours). - **logger** (Logger) - Optional - Pino logger instance. - **afterExecution** (function) - Optional - Hook to dynamically adjust TTL or mutate results after execution. ### Methods - **`execute(handler: (job: Job) => Promise)`**: Registers a job handler function. If no handler is registered, the queue acts as a producer only. - **`start(): Promise`**: Starts the queue consumer. - **`stop(): Promise`**: Stops the queue consumer, waiting for in-flight jobs to complete. - **`on(event: string, listener: function)`**: Registers an event listener. Supported events: 'started', 'stopped', 'enqueued', 'completed', 'failed', 'failing', 'cancelled', 'error'. ### Request Example ```typescript import { Queue, RedisStorage, TimeoutError, JobFailedError } from '@platformatic/job-queue' import pino from 'pino' interface EmailPayload { to: string; subject: string; body: string } interface EmailResult { messageId: string; deliveredAt: number } const storage = new RedisStorage({ url: process.env.REDIS_URL ?? 'redis://localhost:6379', keyPrefix: 'email:', logger: pino() }) const queue = new Queue({ storage, name: 'email-queue', workerId: `worker-${process.pid}`, concurrency: 5, maxRetries: 3, blockTimeout: 5, visibilityTimeout: 30_000, resultTTL: 24 * 60 * 60_000, logger: pino(), afterExecution: async (ctx) => { if (ctx.status === 'completed' && ctx.result?.deliveredAt) { ctx.ttl = 7 * 24 * 60 * 60_000 } } }) queue.execute(async (job) => { if (job.signal.aborted) throw new Error('Aborted') console.log(`[attempt ${job.attempts}] Sending email ${job.id}`) const messageId = await sendEmail(job.payload) return { messageId, deliveredAt: Date.now() } }) queue.on('started', () => console.log('Queue started')) queue.on('stopped', () => console.log('Queue stopped')) queue.on('enqueued', (id) => console.log(`Enqueued: ${id}`)) queue.on('completed', (id, result) => console.log(`Done: ${id}`, result)) queue.on('failed', (id, err) => console.error(`Failed: ${id}`, err.message)) queue.on('failing', (id, err, att) => console.warn(`Retry ${att}: ${id}`, err.message)) queue.on('cancelled', (id) => console.log(`Cancelled: ${id}`)) queue.on('error', (err) => console.error('Queue error:', err)) await queue.start() process.on('SIGTERM', async () => { await queue.stop() process.exit(0) }) ``` ``` -------------------------------- ### Run Tests with Valkey Storage Source: https://github.com/platformatic/job-queue/blob/main/README.md Execute tests with Valkey as the storage backend. Requires a Valkey instance running on localhost:6380. ```bash npm run test:valkey ``` -------------------------------- ### Queue Lifecycle Event Listeners Source: https://github.com/platformatic/job-queue/blob/main/README.md Register event listeners for queue lifecycle events such as 'started' and 'stopped' to monitor the queue's operational status. ```typescript queue.on('started', () => { console.log('Queue started') }) queue.on('stopped', () => { console.log('Queue stopped') }) ``` -------------------------------- ### PgStorage Configuration Source: https://context7.com/platformatic/job-queue/llms.txt Demonstrates how to configure and use PgStorage for job queue persistence with PostgreSQL. It shows automatic table creation and basic queue operations. ```APIDOC ## PgStorage PostgreSQL-backed storage using `SELECT FOR UPDATE SKIP LOCKED` for concurrent dequeue and `LISTEN/NOTIFY` for pub/sub. Auto-creates required tables on `connect()`. ### Configuration ```typescript import { Queue, PgStorage } from '@platformatic/job-queue' const storage = new PgStorage({ connectionString: process.env.DATABASE_URL ?? 'postgresql://localhost:5432/mydb', tablePrefix: 'jq_' }) const queue = new Queue<{ payload: string }, { result: string }>({ storage }) await queue.start() await queue.enqueue('pg-job-001', { payload: 'hello' }) await queue.stop() ``` ### Usage - Initialize `PgStorage` with connection details. - Pass the storage instance to the `Queue` constructor. - Tables are created automatically on the first `connect()` call. ``` -------------------------------- ### Run All Tests Source: https://github.com/platformatic/job-queue/blob/main/README.md Execute tests against all configured backends. ```bash npm test ``` -------------------------------- ### Cancel a Queued Job Source: https://github.com/platformatic/job-queue/blob/main/README.md Use `cancel` to stop a job that has not yet started processing. The result indicates the cancellation status: 'cancelled', 'not_found', 'processing', or 'completed'. ```typescript const result = await queue.cancel('job-123') ``` -------------------------------- ### package.json Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Project metadata and scripts for building, testing, and publishing the job queue package. ```json { "name": "@platformatic/job-queue", "type": "module", "main": "dist/index.js", "types": "dist/index.d.ts", "exports": { ".": { "types": "./dist/index.d.ts", "import": "./dist/index.js" } }, "files": ["dist"], "scripts": { "build": "tsc -p tsconfig.build.json", "clean": "rm -rf dist", "lint": "eslint", "lint:fix": "eslint --fix", "prepublishOnly": "npm run clean && npm run build", "test": "node --test test/*.test.ts", "typecheck": "tsc --noEmit" }, "engines": { "node": ">=22.19.0" }, "dependencies": { "fast-write-atomic": "^0.4.0", "iovalkey": "^0.2.0" }, "devDependencies": { "eslint": "^9.0.0", "neostandard": "^0.12.0", "typescript": "^5.7.0" } } ``` -------------------------------- ### Queue Configuration Interface Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Details the configuration options available when creating a new Queue instance. ```APIDOC ## Queue Configuration ### Description Defines the structure for configuring a `Queue` instance. ### Interface ```typescript interface QueueConfig { // Storage backend (required) storage: Storage; // Serialization payloadSerde?: Serde; // Payload serde (default: JSON) resultSerde?: Serde; // Result serde (default: JSON) // Consumer options workerId?: string; // Unique worker ID (default: random UUID) concurrency?: number; // Parallel job processing (default: 1) blockTimeout?: number; // Blocking dequeue timeout in seconds (default: 5) maxRetries?: number; // Default max retry attempts (default: 3) // Stalled job recovery visibilityTimeout?: number; // Max processing time before job is considered stalled (default: 30000ms) processingQueueTTL?: number; // TTL for processing queue keys in ms (default: 604800000 = 7 days) // Result cache options resultTTL?: number; // Default TTL for stored results and errors in ms (default: 3600000 = 1 hour) } ``` ``` -------------------------------- ### Get Cached Job Result with getResult Source: https://context7.com/platformatic/job-queue/llms.txt Fetch the cached result of a completed job. Returns null if the job is not found, pending, failed, or the result has expired. ```typescript import { Queue, MemoryStorage } from '@platformatic/job-queue' interface TranscodeResult { outputUrl: string; durationMs: number } const queue = new Queue<{ videoUrl: string }, TranscodeResult>({ storage: new MemoryStorage(), resultTTL: 48 * 60 * 60_000 }) await queue.start() const result = await queue.getResult('transcode-video-99') if (result) { // result is typed as TranscodeResult console.log('Output URL:', result.outputUrl) console.log('Duration:', result.durationMs, 'ms') } else { console.log('Result not available (job pending, failed, or TTL expired)') } ``` -------------------------------- ### Initialize FileStorage Source: https://github.com/platformatic/job-queue/blob/main/README.md Set up `FileStorage` for single-node deployments, using the local filesystem to persist job data. Ensures atomic writes and FIFO ordering. ```typescript import { FileStorage } from '@platformatic/job-queue' const storage = new FileStorage({ basePath: '/var/lib/myapp/queue' }) ``` -------------------------------- ### Get job status Source: https://context7.com/platformatic/job-queue/llms.txt Use `getStatus` to retrieve detailed information about a job, including its state, attempts, and result or error. Returns `null` if the job is not found or has expired. ```typescript import { Queue, RedisStorage } from '@platformatic/job-queue' const queue = new Queue({ storage: new RedisStorage() }) await queue.start() const status = await queue.getStatus('invoice-7890') if (!status) { console.log('Job not found or expired') } else { console.log('ID:', status.id) console.log('State:', status.state) // 'queued' | 'processing' | 'failing' | 'completed' | 'failed' console.log('Created:', new Date(status.createdAt).toISOString()) console.log('Attempts:', status.attempts) if (status.state === 'completed') { console.log('Result:', status.result) } if (status.state === 'failed') { console.log('Error:', status.error?.message) console.log('Code:', status.error?.code) } } ``` -------------------------------- ### Run Redis Benchmarks Source: https://github.com/platformatic/job-queue/blob/main/README.md Command to run request/response benchmarks over Redis. ```bash # With Redis npm run bench:redis ``` -------------------------------- ### Start and Monitor Reaper for Stalled Jobs Source: https://github.com/platformatic/job-queue/blob/main/README.md Use the Reaper to monitor stalled jobs and requeue them. This is useful when running multiple workers. Ensure visibilityTimeout matches your queue's configuration. ```typescript import { Reaper } from '@platformatic/job-queue' const reaper = new Reaper({ storage, visibilityTimeout: 30000 // Same as your queue's visibilityTimeout }) await reaper.start() reaper.on('stalled', (id) => { console.log(`Job ${id} was stalled and requeued`) }) // On shutdown await reaper.stop() ``` -------------------------------- ### Get Current Job Status Source: https://github.com/platformatic/job-queue/blob/main/README.md Retrieve the current status of a job using its ID. The status object includes state, creation time, attempts, and potentially the result or error details. ```typescript const status = await queue.getStatus('job-123') ``` -------------------------------- ### Initialize Built-in Storage Implementations Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Instantiate Redis, Memory, or File storage for the job queue. Redis is suitable for distributed and persistent queues, Memory for testing, and File for single-node persistence. ```typescript import { RedisStorage, MemoryStorage, FileStorage } from '@platformatic/job-queue'; // Redis/Valkey storage (distributed, persistent) const redisStorage = new RedisStorage({ host: 'localhost', port: 6379, prefix: 'myqueue', }); // In-memory storage (single process, testing) const memoryStorage = new MemoryStorage(); // File storage (single node, persistent) const fileStorage = new FileStorage({ directory: '/var/lib/job-queue', }); ``` -------------------------------- ### Reaper Configuration and Usage Source: https://context7.com/platformatic/job-queue/llms.txt Explains how to set up and use the Reaper to monitor and requeue stalled jobs. Includes options for single reaper and high-availability deployments with leader election. ```APIDOC ## Reaper The Reaper monitors for stalled jobs (processing longer than `visibilityTimeout`) and requeues them. ### Initialization ```typescript import { Queue, Reaper, RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: 'redis://localhost:6379' }) const queue = new Queue({ storage, visibilityTimeout: 30_000 }) // Single reaper const reaper = new Reaper({ storage, visibilityTimeout: 30_000 }) reaper.on('stalled', (id) => console.warn(`Job ${id} stalled, requeuing`)) // High-availability reaper with leader election (Redis only) const haReaper = new Reaper({ storage, visibilityTimeout: 30_000, leaderElection: { enabled: true, lockTTL: 30_000, renewalInterval: 10_000, acquireRetryInterval: 5_000 } }) haReaper.on('leadershipAcquired', () => console.log('Reaper is now leader')) haReaper.on('leadershipLost', () => console.log('Reaper lost leadership')) await queue.start() await haReaper.start() ``` ### Key Features - Monitors jobs exceeding `visibilityTimeout`. - Subscribes to storage events and performs initial scans. - Supports leader election for high-availability setups (Redis only). ``` -------------------------------- ### Configure RedisStorage for Production Source: https://context7.com/platformatic/job-queue/llms.txt Configure RedisStorage for production use with Redis 7+ or Valkey 8+. It leverages Lua scripts and `BLMOVE` for efficient operations. Requires the `iovalkey` peer dependency. Supports namespacing for logical queues on a single Redis instance. ```typescript import { Queue, Reaper, RedisStorage } from '@platformatic/job-queue' // Single storage instance shared between queue and reaper const storage = new RedisStorage({ url: process.env.REDIS_URL ?? 'redis://localhost:6379', keyPrefix: 'myapp:jobs:', // all Redis keys prefixed with this }) // Multiple logical queues on the same Redis instance via namespacing const emailQueue = new Queue({ storage, name: 'email' }) const reportQueue = new Queue({ storage, name: 'reports' }) // Redis keys: myapp:jobs:email:queue, myapp:jobs:reports:queue, etc. emailQueue.execute(async (job) => { await sendEmail(job.payload) return { sent: true } }) await emailQueue.start() await reportQueue.start() await emailQueue.enqueue('welcome-user-42', { to: 'user@example.com', subject: 'Welcome!' }) // Clean up (testing helper — removes all keys matching keyPrefix*) await (storage as any).clear() await emailQueue.stop() await reportQueue.stop() ``` -------------------------------- ### Run Tests with Redis Storage Source: https://github.com/platformatic/job-queue/blob/main/README.md Execute tests with Redis as the storage backend. Requires a Redis instance running on localhost:6379. ```bash npm run test:redis ``` -------------------------------- ### Storage Backends Source: https://github.com/platformatic/job-queue/blob/main/README.md Configuration options for different storage backends, including in-memory, Redis, and filesystem storage. ```APIDOC ## Storage Backends ### Description Configure the persistence layer for the job queue. Choose between `MemoryStorage`, `RedisStorage`, or `FileStorage` based on your deployment needs. ### MemoryStorage In-memory storage, suitable for development and testing. ```typescript import { MemoryStorage } from '@platformatic/job-queue' const storage = new MemoryStorage() ``` ### RedisStorage Production-ready storage using Redis or Valkey. Supports atomic operations, blocking dequeues, and real-time notifications. ```typescript import { RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: 'redis://localhost:6379', keyPrefix: 'myapp:', // Optional prefix for all keys logger // Optional pino-compatible logger }) ``` ### FileStorage Filesystem-based storage for single-node deployments. Ensures atomic writes and FIFO ordering. ```typescript import { FileStorage } from '@platformatic/job-queue' const storage = new FileStorage({ basePath: '/var/lib/myapp/queue' }) ``` ``` -------------------------------- ### FileStorage Initialization Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Instantiate FileStorage with a specified directory for storing queue data. This is used for single-node persistence. ```typescript import { FileStorage } from '@platformatic/job-queue'; const fileStorage = new FileStorage({ directory: '/var/lib/job-queue', // Base directory for all data }); ``` -------------------------------- ### Initialize RedisStorage Source: https://github.com/platformatic/job-queue/blob/main/README.md Configure `RedisStorage` for production environments, connecting to a Redis or Valkey instance. Supports atomic operations, blocking dequeues, and real-time notifications. ```typescript import { RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: 'redis://localhost:6379', keyPrefix: 'myapp:', // Optional prefix for all keys logger // Optional pino-compatible logger }) ``` -------------------------------- ### Initialize Queue with MemoryStorage Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Instantiate a Queue using MemoryStorage for testing or single-process applications. Jobs are processed immediately. ```typescript import { Queue, MemoryStorage } from '@platformatic/job-queue'; const queue = new Queue({ storage: new MemoryStorage(), concurrency: 1, }); queue.execute(async (job) => { return processJob(job.payload); }); await queue.start(); // Works exactly like Redis storage const result = await queue.enqueueAndWait('job-1', payload); ``` -------------------------------- ### tsconfig.build.json (Publishing) Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md TypeScript configuration for building the project for publishing, generating declarations and source maps. ```json { "extends": "./tsconfig.json", "compilerOptions": { "noEmit": false, "declaration": true, "declarationMap": true, "sourceMap": true, "outDir": "dist", "rootDir": "src", "rewriteRelativeImportExtensions": true }, "include": ["src/**/*.ts"], "exclude": ["node_modules", "test"] } ``` -------------------------------- ### Initialize MemoryStorage Source: https://github.com/platformatic/job-queue/blob/main/README.md Instantiate `MemoryStorage` for development and testing purposes. This storage backend keeps all job data in memory. ```typescript import { MemoryStorage } from '@platformatic/job-queue' const storage = new MemoryStorage() ``` -------------------------------- ### Implement Custom Serialization with MessagePack Source: https://context7.com/platformatic/job-queue/llms.txt Shows how to use a custom `Serde` implementation for binary serialization, like MessagePack, for job payloads and results. Defaults to JSON serialization. ```typescript import { Queue, MemoryStorage, type Serde } from '@platformatic/job-queue' import * as msgpack from 'msgpackr' // Custom MessagePack serde class MsgPackSerde implements Serde { serialize (value: T): Buffer { return msgpack.pack(value) as Buffer } deserialize (buffer: Buffer): T { return msgpack.unpack(buffer) as T } } interface HeavyPayload { data: Uint8Array; metadata: Record } interface HeavyResult { checksum: string; processedBytes: number } const queue = new Queue({ storage: new MemoryStorage(), payloadSerde: new MsgPackSerde(), resultSerde: new MsgPackSerde(), }) queue.execute(async (job) => { const checksum = computeChecksum(job.payload.data) return { checksum, processedBytes: job.payload.data.length } }) await queue.start() await queue.enqueue('heavy-job-1', { data: new Uint8Array([1, 2, 3, 4, 5]), metadata: { source: 'upload', userId: '99' } }) ``` -------------------------------- ### Implement Protocol Buffers Serde Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Implement a serde for Protocol Buffers. Requires generated protobuf classes and their methods for binary conversion. Ensure the type matches your protobuf message. ```typescript import { MyMessage } from './generated/my_message_pb.ts'; const protobufSerde: Serde = { serialize: (value) => Buffer.from(value.toBinary()), deserialize: (buffer) => MyMessage.fromBinary(buffer), }; ``` -------------------------------- ### Run Tests with Memory Storage Source: https://github.com/platformatic/job-queue/blob/main/README.md Execute tests using only in-memory storage. This is useful for quick testing without external dependencies. ```bash npm run test:memory ``` -------------------------------- ### Stop Test Infrastructure with Docker Source: https://github.com/platformatic/job-queue/blob/main/README.md Use Docker to stop the Redis and Valkey containers. ```bash npm run docker:down ``` -------------------------------- ### Configure and Run Reaper for Stalled Jobs Source: https://context7.com/platformatic/job-queue/llms.txt Sets up a Reaper to monitor and requeue stalled jobs. Configure 'visibilityTimeout' to match the queue's. Supports leader election for high-availability with Redis. ```typescript import { Queue, Reaper, RedisStorage } from '@platformatic/job-queue' const storage = new RedisStorage({ url: 'redis://localhost:6379' }) const queue = new Queue({ storage, visibilityTimeout: 30_000, // must match reaper's visibilityTimeout concurrency: 10 }) // Single reaper (no leader election) const reaper = new Reaper({ storage, visibilityTimeout: 30_000 }) reaper.on('stalled', (id) => console.warn(`Job ${id} stalled, requeuing`)) reaper.on('error', (err) => console.error('Reaper error:', err)) // High-availability reaper with leader election (Redis only) const haReaper = new Reaper({ storage, visibilityTimeout: 30_000, leaderElection: { enabled: true, lockTTL: 30_000, // lock expires if leader crashes renewalInterval: 10_000, // leader renews every 10s acquireRetryInterval: 5_000 // followers retry acquisition every 5s } }) haReaper.on('leadershipAcquired', () => { console.log(`Reaper ${haReaper.reaperId} is now leader`) }) haReaper.on('leadershipLost', () => { console.log(`Reaper ${haReaper.reaperId} lost leadership`) }) queue.execute(async (job) => { await doWork(job.payload) return {} }) await queue.start() await haReaper.start() process.on('SIGTERM', async () => { await queue.stop() await haReaper.stop() process.exit(0) }) ``` -------------------------------- ### Queue Class - Enqueue Methods Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Methods for adding jobs to the queue, supporting both fire-and-forget and request/response patterns. ```APIDOC ## Queue Class - Enqueue Operations ### Description Methods for submitting jobs to the queue. ### Methods - `async enqueue(id: string, payload: TPayload, options?: EnqueueOptions): Promise`: Adds a job to the queue for background processing (fire-and-forget). - `async enqueueAndWait(id: string, payload: TPayload, options?: EnqueueAndWaitOptions): Promise`: Adds a job to the queue and waits for its result (request/response). ``` -------------------------------- ### Configure MemoryStorage for Development Source: https://context7.com/platformatic/job-queue/llms.txt Use MemoryStorage for development and testing. It requires no external dependencies but data is lost on process restart. Multiple queues can be instantiated with separate MemoryStorage instances. ```typescript import { Queue, MemoryStorage } from '@platformatic/job-queue' const storage = new MemoryStorage() const queue = new Queue<{ task: string }, { done: boolean }>({ storage }) queue.execute(async (job) => { await processTask(job.payload.task) return { done: true } }) await queue.start() await queue.enqueue('task-1', { task: 'send-welcome-email' }) await queue.stop() // Multiple isolated queues sharing no state const queue2 = new Queue({ storage: new MemoryStorage() }) // completely separate ``` -------------------------------- ### Graceful Shutdown with Job Queue (TypeScript) Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Demonstrates how to gracefully shut down the job queue, ensuring all in-flight jobs are finished before unregistering the worker. ```typescript closeWithGrace({ delay: 10000 }, async () => { await queue.stop(); // Finishes in-flight jobs, unregisters worker }); ``` -------------------------------- ### ESLint Configuration Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Configures ESLint for TypeScript projects using the neostandard preset. ```javascript import neostandard from 'neostandard' export default neostandard({ ts: true, }) ``` -------------------------------- ### Initialize Queue with TypeScript Generics Source: https://github.com/platformatic/job-queue/blob/main/README.md Instantiate the Queue class with specific types for job payloads and results using TypeScript generics. ```typescript import { Queue } from '@platformatic/job-queue' const queue = new Queue(config) ``` -------------------------------- ### FileStorage Configuration Interface Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Defines the configuration for FileStorage, requiring a base directory for all queue-related data. ```typescript interface FileStorageConfig { directory: string; // Base directory for queue data (required) } ``` -------------------------------- ### Enqueue Flow Logic Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Illustrates the steps involved in the enqueue operation, including checking for existing messages and storing new ones. ```plaintext enqueue(id, payload, { resultTTL? }) │ ▼ ┌─────────────────────────────────┐ │ HGET {prefix}:jobs {id} │ │ Check if message ID exists │ └─────────────────────────────────┘ │ ├── exists with state "completed" ──► Return cached result │ ├── exists with state "queued" or "processing" ──► Return duplicate │ └── not exists or "failed" │ ▼ ┌─────────────────────────────────┐ │ MULTI │ │ HSET {prefix}:jobs {id} │ │ "queued:{timestamp}" │ │ LPUSH {prefix}:queue {msg} │ │ PUBLISH {prefix}:events │ │ {id}:queued │ │ EXEC │ └─────────────────────────────────┘ │ ▼ Return { status: 'queued' } ``` -------------------------------- ### RedisStorage Source: https://context7.com/platformatic/job-queue/llms.txt Production-grade storage using Redis 7+ or Valkey 8+. Uses Lua scripts for critical operations and pub/sub for notifications. ```APIDOC ## RedisStorage ### Description Production-grade storage using Redis 7+ or Valkey 8+. Uses atomic Lua scripts for all critical operations, `BLMOVE` for blocking dequeue, and Redis pub/sub for real-time notifications. Requires the `iovalkey` peer dependency. ### Usage ```typescript import { Queue, Reaper, RedisStorage } from '@platformatic/job-queue' // Single storage instance shared between queue and reaper const storage = new RedisStorage({ url: process.env.REDIS_URL ?? 'redis://localhost:6379', keyPrefix: 'myapp:jobs:', // all Redis keys prefixed with this }) // Multiple logical queues on the same Redis instance via namespacing const emailQueue = new Queue({ storage, name: 'email' }) const reportQueue = new Queue({ storage, name: 'reports' }) // Redis keys: myapp:jobs:email:queue, myapp:jobs:reports:queue, etc. emailQueue.execute(async (job) => { await sendEmail(job.payload) return { sent: true } }) await emailQueue.start() await reportQueue.start() await emailQueue.enqueue('welcome-user-42', { to: 'user@example.com', subject: 'Welcome!' }) // Clean up (testing helper — removes all keys matching keyPrefix*) await (storage as any).clear() await emailQueue.stop() await reportQueue.stop() ``` ``` -------------------------------- ### Implement MessagePack Serde Source: https://github.com/platformatic/job-queue/blob/main/DESIGN.md Create a custom serde for MessagePack serialization. Requires the '@msgpack/msgpack' library. Ensure the generic type T matches your data structure. ```typescript import { type Serde } from '@platformatic/job-queue'; import { encode, decode } from '@msgpack/msgpack'; // MessagePack serde example const msgpackSerde = (): Serde => ({ serialize: (value) => Buffer.from(encode(value)), deserialize: (buffer) => decode(buffer) as T, }); ```