### Quick Start Example Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md A basic example demonstrating how to set up a Queue and a Worker in GroupMQ. ```typescript import Redis from "ioredis"; import { Queue, Worker } from "groupmq"; const redis = new Redis("redis://127.0.0.1:6379"); const queue = new Queue({ redis, namespace: "orders", // Will be prefixed with 'groupmq:' jobTimeoutMs: 30_000, // How long before job times out logger: true, // Enable logging (optional) }); await queue.add({ groupId: "user:42", data: { type: "charge", amount: 999 }, orderMs: Date.now(), // or event.createdAtMs maxAttempts: 5, }); const worker = new Worker({ queue, concurrency: 1, // Process 1 job at a time (can increase for parallel processing) handler: async (job) => { console.log(`Processing:`, job.data); }, }); worker.run(); ``` -------------------------------- ### Quick start Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/blog/introducing-groupmq.mdx A basic example demonstrating how to set up a GroupMQ queue and worker to process jobs. ```typescript import Redis from 'ioredis'; import { Queue, Worker } from 'groupmq'; const redis = new Redis('redis://127.0.0.1:6379'); type Payload = { type: 'charge' | 'refund'; amount: number }; const queue = new Queue({ redis, namespace: 'orders', jobTimeoutMs: 30_000, }); await queue.add({ groupId: 'user:42', data: { type: 'charge', amount: 999 }, orderMs: Date.now(), maxAttempts: 5, }); const worker = new Worker({ queue, concurrency: 4, // Process up to 4 jobs simultaneously async handler(job) { // job.data is fully typed if (job.data.type === 'charge') { // charge... } }, }); worker.run(); ``` -------------------------------- ### Install GroupMQ and ioredis Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/installation.mdx Install GroupMQ and ioredis using pnpm, npm, or yarn. ```bash pnpm add groupmq ioredis # or npm i groupmq ioredis # or yarn add groupmq ioredis ``` -------------------------------- ### Install Dependencies and Run Development Server Source: https://github.com/openpanel-dev/groupmq/blob/main/website/README.md Commands to install dependencies and start the development server for the GroupMQ website. ```bash # Install dependencies pnpm install # Run the development server pnpm dev ``` -------------------------------- ### Worker Setup Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-worker.mdx Example of setting up a new Worker instance with various configuration options. ```typescript import Redis from 'ioredis' import { Queue, Worker } from 'groupmq' const redis = new Redis() const queue = new Queue<{ foo: string }>({ redis, namespace: 'demo' }) const worker = new Worker<{ foo: string }>({ queue, heartbeatMs: 10_000, maxAttempts: 5, backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500), async handler(job) { // do work with job.data return { ok: true } }, }) ``` -------------------------------- ### Method 2: 'scheduler' - Example Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Example of initializing a queue with the 'scheduler' ordering method and adding jobs. Jobs are buffered for orderingWindowMs before processing. ```typescript const queue = new Queue({ redis, namespace: 'batch-queue', orderingMethod: 'scheduler', orderingWindowMs: 2000, // 2-second buffer }); // Multiple services adding events with timestamps await queue.add({ groupId: 'user-123', data: { event: 'page_view' }, orderMs: 1704067200000 // Unix timestamp from source system }); // Job won't be processed until 2 seconds after the first job in the group ``` -------------------------------- ### Setup Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-job.mdx Initializes Redis and Queue, then adds a job. ```typescript import Redis from 'ioredis' import { Queue } from 'groupmq' const redis = new Redis() const queue = new Queue<{ n: number }>({ redis, namespace: 'docs' }) const job = await queue.add({ groupId: 'g1', data: { n: 1 } }) ``` -------------------------------- ### Queue Configuration Example with Custom schedulerLockTtlMs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/configuration-options.mdx Example of creating a queue with a reduced schedulerLockTtlMs for testing fast repeating jobs. ```typescript const queue = new Queue({ redis, namespace: 'fast-queue', schedulerLockTtlMs: 50, // Allow sub-second repeats (testing only) }); ``` -------------------------------- ### producer.ts Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/quick-example.mdx Minimal producer using GroupMQ. ```typescript import Redis from 'ioredis'; import { Queue } from 'groupmq'; const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'quick', keepCompleted: 100, keepFailed: 100, }); await queue.add({ groupId: 'user:1', data: { id: 'a1', ms: 250 }, }); ``` -------------------------------- ### Worker Configuration Example with Custom schedulerIntervalMs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/configuration-options.mdx Example of creating a worker with a reduced schedulerIntervalMs for more frequent scheduler checks. ```typescript new Worker({ queue, schedulerIntervalMs: 10, // Check every 10ms async handler(job) { /* ... */ }, }); ``` -------------------------------- ### Method 1: 'none' - Example Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Example of initializing a queue with the 'none' ordering method and adding a job. Jobs are processed immediately in orderMs order. ```typescript const queue = new Queue({ redis, namespace: 'simple-queue', orderingMethod: 'none', // or omit entirely }); // Jobs processed immediately in orderMs order as they arrive await queue.add({ groupId: 'user-123', data: { action: 'update' }, orderMs: Date.now() }); ``` -------------------------------- ### worker.ts Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/quick-example.mdx Minimal worker using GroupMQ. ```typescript import Redis from 'ioredis'; import { Queue, Worker } from 'groupmq'; const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'quick', }); const worker = new Worker({ queue, async handler(job) { await new Promise((r) => setTimeout(r, job.data.ms)); return `done ${job.id}`; }, }); worker.on('completed', (job) => { console.log('completed', job.id); }); worker.run(); ``` -------------------------------- ### GroupMQ Queue and Worker Setup Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/groupmq-vs-bullmq.mdx Demonstrates the equivalent setup in GroupMQ, emphasizing groupId for FIFO. ```typescript import { Queue, Worker } from 'groupmq'; import Redis from 'ioredis'; const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); // Provide a type parameter for full type safety in your worker type Payload = { foo?: string; qux?: string }; const queue = new Queue({ redis, namespace: 'foo', }); // Add jobs - use groupId to control per-group FIFO await queue.add({ groupId: 'group-1', data: { foo: 'bar' } }); await queue.add({ groupId: 'group-2', data: { qux: 'baz' } }); const worker = new Worker({ queue, // pass the queue instance (not a name) async handler(job) { console.log(job.data); // typed as Payload }, }); worker.run(); ``` -------------------------------- ### Install GroupMQ Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Install the groupmq and ioredis packages using npm. ```bash npm i groupmq ioredis ``` -------------------------------- ### Developing Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/contributing.mdx Commands to install dependencies, run tests, and execute benchmarks. ```bash pnpm i pnpm test pnpm bench -- --mq groupmq --jobs 500 --workers 4 --job-type cpu --multi-process ``` -------------------------------- ### Basic Worker Example Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/processing-jobs.mdx This example shows how to create a basic worker that processes jobs and listens for completion and failure events. ```typescript import { Worker } from 'groupmq'; const worker = new Worker({ queue, async handler(job) { // Your work here return `processed ${job.id}`; }, }); worker.on('completed', (job) => { console.log('completed', job.id); }); worker.on('failed', (job) => { console.log('failed', job.id, job.failedReason); }); worker.run(); ``` -------------------------------- ### Example Job with Specific Time Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md An example of adding a job to run at a specific time. ```typescript await queue.add({ groupId: 'user:123', data: { action: 'scheduled-report' }, runAt: new Date('2025-12-31T23:59:59Z'), }); ``` -------------------------------- ### BullMQ Queue and Worker Setup Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/groupmq-vs-bullmq.mdx Demonstrates how to create a queue and a worker in BullMQ. ```typescript import { Queue, Worker } from 'bullmq'; import Redis from 'ioredis'; const myQueue = new Queue('foo'); async function addJobs() { await myQueue.add('myJobName', { foo: 'bar' }); await myQueue.add('myJobName', { qux: 'baz' }); } await addJobs(); const connection = new Redis({ maxRetriesPerRequest: null }); const worker = new Worker( 'foo', async (job) => { console.log(job.data); // { foo: 'bar' } then { qux: 'baz' } }, { connection }, ); ``` -------------------------------- ### Setup Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Imports Redis and Queue, then initializes a new Queue instance with various configuration options including namespace, job timeouts, retry attempts, and ordering strategies. ```typescript import Redis from 'ioredis' import { Queue } from 'groupmq' const redis = new Redis() const queue = new Queue<{ foo: string }>({ redis, namespace: 'demo', jobTimeoutMs: 30_000, maxAttempts: 3, orderingMethod: 'none', // or 'scheduler' or 'in-memory' orderingWindowMs: 0, // required for 'scheduler' and 'in-memory' methods orderingMaxWaitMultiplier: 3, // (optional) default: 3, max wait = windowMs × multiplier orderingGracePeriodDecay: 1.0, // (optional) default: 1.0, decay factor for 'in-memory' orderingMaxBatchSize?: number; // Max jobs to collect in batch for in-memory (default: 10) schedulerLockTtlMs: 1500, // (optional) default: 1500ms, controls min repeat interval }) ``` -------------------------------- ### Example Job with Delay Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md An example of adding a job with a delay. ```typescript await queue.add({ groupId: 'user:123', data: { action: 'send-reminder' }, delay: 3600000, // Run in 1 hour }); ``` -------------------------------- ### Testing Commands Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Commands to install dependencies, build the project, and run tests. ```bash npm i npm run build npm test ``` -------------------------------- ### BullMQ (illustrative) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/groupmq-vs-bullmq.mdx Example of setting concurrency in BullMQ. ```typescript // BullMQ (illustrative) new Worker('queue', async (job) => { /* ... */ }, { concurrency: 10 }); ``` -------------------------------- ### Worker with TypeScript Typing Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/processing-jobs.mdx This example demonstrates how a worker automatically gets full type inference for `job.data` when the `Queue` is created with a generic type. ```typescript type Task = { id: string; ms: number }; const queue = new Queue({ redis, namespace: 'tasks' }); new Worker({ queue, async handler(job) { job.data.id; // string job.data.ms; // number }, }).run(); ``` -------------------------------- ### Adding Jobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/adding-jobs.mdx Examples of adding jobs with different options like groupId, delay, runAt, and repeat. ```typescript // Basic add await queue.add({ groupId: 'user:1', data: { id: '1', ms: 200 }, }); // Delay by 2 seconds await queue.add({ groupId: 'user:1', data: { id: '2', ms: 200 }, delay: 2000, }); // Schedule at a specific time await queue.add({ groupId: 'user:2', data: { id: '3', ms: 200 }, runAt: Date.now() + 5000, }); // Repeat every 5 seconds await queue.add({ groupId: 'user:cron', data: { id: 'tick', ms: 100 }, repeat: { every: 5000 }, }); // Repeat using cron await queue.add({ groupId: 'user:cron', data: { id: 'nightly', ms: 100 }, repeat: { pattern: '0 0 * * *' }, }); ``` -------------------------------- ### Fast repeating jobs configuration Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/jobs.mdx Example configuration for sub-second repeats. ```typescript // ⚠️ NOT RECOMMENDED for production - use sparingly const queue = new Queue({ redis, namespace: 'fast-queue', schedulerLockTtlMs: 50, // Allow fast lock acquisition }); new Worker({ queue, schedulerIntervalMs: 10, // Check every 10ms cleanupIntervalMs: 100, // Run cleanup every 100ms async handler(job) { /* ... */ }, }).run(); await queue.add({ groupId: 'fast-cron', data: { task: 'tick' }, repeat: { every: 100 }, // Every 100ms - now possible! }); ``` -------------------------------- ### In-Memory Ordering Example Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Demonstrates how to configure and use the 'in-memory' ordering method, where workers hold jobs for a grace period to maintain order. ```typescript const queue = new Queue({ redis, namespace: 'realtime-queue', orderingMethod: 'in-memory', orderingWindowMs: 200, // 200ms grace period }); // API gateway forwarding requests (might arrive out of order) await queue.add({ groupId: 'session-abc', data: { request: 'update_profile' }, orderMs: Date.now() }); // Worker will wait up to 200ms for more jobs from this session // If another job arrives within 200ms, timer resets ``` -------------------------------- ### Troubleshooting: Jobs not processing (scheduler method) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Provides a code example to check if the scheduler is running and how to inspect the buffering groups in Redis. ```typescript // Check scheduler is running const worker = new Worker({ queue, handler: async (job) => { /* ... */ }, runScheduler: true, // Ensure this is true }); // Check buffer state const bufferingGroups = await redis.zrange('groupmq:my-queue:buffering', 0, -1, 'WITHSCORES'); console.log('Groups in buffer:', bufferingGroups); ``` -------------------------------- ### In-Memory Resetting Grace Window Example Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Illustrates the concept of a resetting grace window in the 'in-memory' ordering method. Each new job resets the timer, ensuring jobs arriving in succession are grouped. ```text Time: 0ms → First job arrives, start 200ms timer Time: 150ms → Second job arrives, RESET to 200ms (wait until 350ms) Time: 300ms → Third job arrives, RESET to 200ms (wait until 500ms) Time: 600ms → No new jobs for 200ms, process all 3 jobs in orderMs order ``` -------------------------------- ### PM2 (cluster) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/scaling-workers.mdx Commands to start workers using PM2 in cluster mode. ```bash pm2 start worker.js -i max # or a fixed number pm2 start worker.js -i 4 ``` -------------------------------- ### cluster.ts Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/groupmq-vs-bullmq.mdx Example of process-level scaling for GroupMQ across CPUs using Node.js cluster module. ```typescript import cluster from 'node:cluster'; import os from 'node:os'; import Redis from 'ioredis'; import { Queue, Worker } from 'groupmq'; if (cluster.isPrimary) { const n = os.cpus().length; for (let i = 0; i < n; i++) cluster.fork(); } else { const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'app' }); new Worker({ queue, async handler(job) { await new Promise((r) => setTimeout(r, job.data.ms)); } }).run(); } ``` -------------------------------- ### BullBoard Integration Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Example of integrating GroupMQ with BullBoard for visual monitoring and management. ```typescript import { createBullBoard } from '@bull-board/api'; import { ExpressAdapter } from '@bull-board/express'; import { BullBoardGroupMQAdapter } from 'groupmq'; import express from 'express'; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath('/admin/queues'); createBullBoard({ queues: [ new BullBoardGroupMQAdapter(queue, { displayName: 'Order Processing', description: 'Processes customer orders', readOnlyMode: false, // Allow job manipulation through UI }), ], serverAdapter, }); const app = express(); app.use('/admin/queues', serverAdapter.getRouter()); app.listen(3000, () => { console.log('BullBoard running at http://localhost:3000/admin/queues'); }); ``` -------------------------------- ### Running the Worker Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-worker.mdx Starts the worker's job processing loop. ```typescript await worker.run() ``` -------------------------------- ### Node Cluster - cluster.ts Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/scaling-workers.mdx Example using Node.js cluster module to manage worker processes. ```typescript import cluster from 'node:cluster'; import os from 'node:os'; import Redis from 'ioredis'; import { Queue, Worker } from 'groupmq'; const instances = process.env.WORKERS ? Number(process.env.WORKERS) : os.cpus().length; if (cluster.isPrimary) { for (let i = 0; i < instances; i++) cluster.fork(); } else { const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'app' }); new Worker({ queue, async handler(job) { await new Promise((r) => setTimeout(r, job.data.ms)); } }).run(); } ``` -------------------------------- ### E-commerce Orders - 'none' Ordering Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Example for e-commerce scenarios where orders are expected to arrive in sequence, utilizing the 'none' ordering method for maximum efficiency. ```typescript // Orders arrive in sequence, no special ordering needed const queue = new Queue({ redis, namespace: 'orders', orderingMethod: 'none', }); ``` -------------------------------- ### Worker Event Handling Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-worker.mdx Example of subscribing to various worker events like 'ready', 'completed', 'failed', and 'error'. ```typescript worker.on('ready', () => console.log('worker ready')) worker.on('completed', (job) => console.log('done', job.id)) worker.on('failed', (job) => console.log('failed', job.id, job.failedReason)) worker.on('graceful-timeout', (job) => console.warn('timed out', job.id)) worker.on('error', (err) => console.error('worker error', err)) worker.on('stalled', (jobId, groupId) => console.warn('job stalled', jobId, groupId)) ``` -------------------------------- ### getState() Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-job.mdx Gets the current status inferred from Redis. ```typescript const state = await job.getState() ``` -------------------------------- ### Worker Configuration for Stalled Job Detection Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Example of configuring stalled job detection parameters when creating a new Worker instance. ```typescript const worker = new Worker({ queue, handler: async (job) => { /* ... */ }, // Stalled job detection stalledInterval: 30000, // Check every 30 seconds maxStalledCount: 1, // Fail after 1 stall stalledGracePeriod: 0, // No grace period (use for clock skew) }) ``` -------------------------------- ### Batch ETL Processing - 'scheduler' Ordering Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Example for batch ETL processes that receive data in large batches, using the 'scheduler' method with a 3-second buffer for coordination. ```typescript // Data arrives in large batches from multiple sources const queue = new Queue({ redis, namespace: 'etl', orderingMethod: 'scheduler', orderingWindowMs: 3000, // 3-second buffer for batch coordination }); ``` -------------------------------- ### add(opts) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Adds a job to a group. Examples show adding a one-off job, a delayed job, and a repeating cron job. ```typescript // One-off job await queue.add({ groupId: 'user:1', data: { foo: 'bar' } }) // Delayed job (run after 5s) await queue.add({ groupId: 'user:1', data: { foo: 'baz' }, delay: 5000 }) // Cron job (every minute) await queue.add({ groupId: 'reports', data: { type: 'daily' }, repeat: { pattern: '* * * * *' }, }) ``` -------------------------------- ### Single-process concurrency Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/scaling-workers.mdx Example of setting concurrency on a Worker to process multiple jobs within a single process. ```typescript const worker = new Worker({ queue, concurrency: 4, // Process up to 4 jobs at once async handler(job) { await fetch('https://api.example.com/process', { method: 'POST', body: JSON.stringify(job.data) }); }, }); ``` -------------------------------- ### GroupMQ (multiple workers in one process) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/groupmq-vs-bullmq.mdx Example of running multiple GroupMQ workers in a single Node.js process. ```typescript import Redis from 'ioredis'; import { Queue, Worker } from 'groupmq'; const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'app' }); const workers = Array.from({ length: 4 }, () => new Worker({ queue, async handler(job) { await new Promise((r) => setTimeout(r, job.data.ms)); } }) ); workers.forEach((w) => w.run()); ``` -------------------------------- ### Monitoring Stalled Jobs with Events Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Example of listening to the 'stalled' event on a Worker to log warnings and increment metrics. ```typescript worker.on('stalled', (jobId, groupId) => { console.warn(`Job ${jobId} from group ${groupId} was stalled`) // Alert your monitoring system metrics.increment('jobs.stalled', { groupId }) // Note: Job has already been recovered automatically }) ``` -------------------------------- ### Stalled Job Recovery Configuration Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-worker.mdx Example of configuring a Worker with stalled job recovery options and handling the 'stalled' event. ```typescript const worker = new Worker({ queue, handler: async (job) => { /* ... */ }, stalledInterval: 30000, // Check every 30 seconds maxStalledCount: 1, // Fail after 1 stall stalledGracePeriod: 0, // No grace period }) worker.on('stalled', (jobId, groupId) => { console.warn(`Job ${jobId} stalled and recovered`) metrics.increment('jobs.stalled') }) ``` -------------------------------- ### Manual Stalled Job Check API Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Example of manually checking for stalled jobs using the queue API. ```typescript // Check for stalled jobs manually const result = await queue.checkStalledJobs() console.log(`Found ${result.stalled} stalled jobs`) ``` -------------------------------- ### Graceful Shutdown and Recovery Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Examples of gracefully shutting down a worker, waiting for the queue to be empty, and recovering delayed groups. ```typescript // Stop worker gracefully - waits for current job to finish await worker.close(gracefulTimeoutMs); // Wait for queue to be empty const isEmpty = await queue.waitForEmpty(timeoutMs); // Recover groups that might be stuck due to ordering delays const recoveredCount = await queue.recoverDelayedGroups(); ``` -------------------------------- ### Queue Methods Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Methods for interacting with the queue, including getting job counts, job IDs, job instances, group information, and manipulating jobs and scheduler operations. ```typescript // Job counts and status const counts = await queue.getJobCounts(); // { active: 5, waiting: 12, delayed: 3, total: 20, uniqueGroups: 8 } const activeCount = await queue.getActiveCount(); const waitingCount = await queue.getWaitingCount(); const delayedCount = await queue.getDelayedCount(); const completedCount = await queue.getCompletedCount(); const failedCount = await queue.getFailedCount(); // Get job IDs by status const activeJobIds = await queue.getActiveJobs(); const waitingJobIds = await queue.getWaitingJobs(); const delayedJobIds = await queue.getDelayedJobs(); // Get Job instances by status const completedJobs = await queue.getCompletedJobs(limit); // returns Job[] const failedJobs = await queue.getFailedJobs(limit); // Group information const groups = await queue.getUniqueGroups(); // ['user:123', 'order:456'] const groupCount = await queue.getUniqueGroupsCount(); const jobsInGroup = await queue.getGroupJobCount('user:123'); // Get specific job const job = await queue.getJob(jobId); // returns Job instance // Job manipulation await queue.remove(jobId); await queue.retry(jobId); // Re-enqueue a failed job await queue.promote(jobId); // Promote delayed job to waiting await queue.changeDelay(jobId, newDelayMs); await queue.updateData(jobId, newData); // Scheduler operations await queue.runSchedulerOnce(); // Manual scheduler run await queue.promoteDelayedJobs(); // Promote delayed jobs await queue.recoverDelayedGroups(); // Recover stuck groups // Cleanup and shutdown await queue.waitForEmpty(timeoutMs); await queue.close(); ``` -------------------------------- ### Queue Initialization with Ordering Methods Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Demonstrates how to initialize a GroupMQ Queue with different ordering methods and the required orderingWindowMs. ```typescript const queue = new Queue({ redis, namespace: 'my-queue', orderingMethod: 'none' | 'scheduler' | 'in-memory', orderingWindowMs: 1000, // Required for 'scheduler' and 'in-memory' }); ``` -------------------------------- ### Basic Queue Creation Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/creating-a-queue.mdx Creates a new queue instance with Redis and a namespace. ```typescript import Redis from 'ioredis'; import { Queue } from 'groupmq'; const redis = new Redis('redis://localhost:6379', { maxRetriesPerRequest: null }); const queue = new Queue<{ id: string; ms: number }>({ redis, namespace: 'orders', }); ``` -------------------------------- ### Default Configuration Recommendations Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Recommended configuration for stalled job detection in most use cases. ```json { stalledInterval: 30000, // 30 seconds maxStalledCount: 1, // Fail after 1 stall stalledGracePeriod: 0, // No grace period } ``` -------------------------------- ### Low Overhead Configuration Recommendations Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Configuration for low overhead, checking less frequently. ```json { stalledInterval: 60000, // Check every minute maxStalledCount: 1, // Fail fast stalledGracePeriod: 0, // No grace period } ``` -------------------------------- ### Combining Ordering with Job Delays Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Demonstrates how ordering methods work with delayed jobs. The ordering window only applies after the job's delay expires. ```typescript await queue.add({ groupId: 'user-123', data: { action: 'reminder' }, orderMs: Date.now(), delayUntil: Date.now() + 60000, // Delay 1 minute }); // The ordering window only applies AFTER the delay expires ``` -------------------------------- ### IO-bound (simulate high IOPS) Source: https://github.com/openpanel-dev/groupmq/blob/main/benchmark/BENCHMARK.md Commands to run IO-bound benchmarks simulating high IOPS for BullMQ and GroupMQ. ```bash jiti benchmark/index.ts --mq bullmq --job io --concurrency 32 --duration 60 --rate 0 --name bullmq_io_c32 ``` ```bash jiti benchmark/index.ts --mq groupmq --job io --concurrency 32 --duration 60 --rate 0 --name groupmq_io_c32 ``` -------------------------------- ### Baseline (CPU-bound) Source: https://github.com/openpanel-dev/groupmq/blob/main/benchmark/BENCHMARK.md Commands to run CPU-bound benchmarks for BullMQ and GroupMQ. ```bash jiti benchmark/index.ts --mq bullmq --job cpu --concurrency 8 --jobs 10000 --name bullmq_cpu_c8 ``` ```bash jiti benchmark/index.ts --mq groupmq --job cpu --concurrency 8 --jobs 10000 --name groupmq_cpu_c8 ``` -------------------------------- ### Running Redis with Docker Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Command to run a local Redis instance using Docker for testing. ```bash docker run --rm -p 6379:6379 redis:7 ``` -------------------------------- ### Worker Concurrency Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Example of configuring worker concurrency to process multiple jobs in parallel from different groups. ```typescript const worker = new Worker({ queue, concurrency: 8, // Process up to 8 jobs simultaneously handler: async (job) => { // Jobs from different groups can run in parallel // Jobs from the same group still run sequentially }, }); ``` -------------------------------- ### Troubleshooting: Jobs processed out of order (in-memory method) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Shows how to increase the `orderingWindowMs` to resolve jobs being processed out of order with the 'in-memory' method. ```typescript const queue = new Queue({ redis, namespace: 'my-queue', orderingMethod: 'in-memory', orderingWindowMs: 500, // Increased from 200ms }); ``` -------------------------------- ### promote() Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-job.mdx Promotes this job if delayed so it runs immediately. ```typescript await job.promote() ``` -------------------------------- ### Backpressure & pickup latency under bursty load Source: https://github.com/openpanel-dev/groupmq/blob/main/benchmark/BENCHMARK.md Commands to run benchmarks for backpressure and pickup latency under bursty load for BullMQ and GroupMQ. ```bash jiti benchmark/index.ts --mq bullmq --job cpu --concurrency 4 --rate 200 --duration 30 --name bullmq_burst ``` ```bash jiti benchmark/index.ts --mq groupmq --job cpu --concurrency 4 --rate 200 --duration 30 --name groupmq_burst ``` -------------------------------- ### Add Options Type Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/configuration-options.mdx Defines the available options for adding jobs to a queue. ```typescript type AddOptions = { groupId: string; data: T; orderMs?: number; maxAttempts?: number; delay?: number; runAt?: Date | number; repeat?: { every: number } | { pattern: string }; jobId?: string; // idempotence }; ``` -------------------------------- ### High Reliability Configuration Recommendations Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/stalled-jobs.mdx Configuration for high reliability, checking more frequently and allowing more stalls. ```json { stalledInterval: 15000, // Check more frequently maxStalledCount: 2, // Allow 2 stalls before failing stalledGracePeriod: 5000, // 5s grace for clock skew } ``` -------------------------------- ### Troubleshooting: High latency (scheduler method) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Suggests switching to the 'in-memory' method with a smaller window to reduce latency when using the scheduler method. ```typescript const queue = new Queue({ redis, namespace: 'my-queue', orderingMethod: 'in-memory', // Lower overhead orderingWindowMs: 200, }); ``` -------------------------------- ### Worker Methods Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Methods for checking worker status, retrieving current jobs, getting metrics, and performing graceful shutdowns. ```typescript // Check worker status const isProcessing = worker.isProcessing(); // Get current job(s) being processed const currentJob = worker.getCurrentJob(); // { job: ReservedJob, processingTimeMs: 1500 } | null // For concurrency > 1 const currentJobs = worker.getCurrentJobs(); // [{ job: ReservedJob, processingTimeMs: 1500 }, ...] // Get worker metrics const metrics = worker.getWorkerMetrics(); // { jobsInProgress: 2, lastJobPickupTime: 1234567890, ... } // Graceful shutdown await worker.close(gracefulTimeoutMs); ``` -------------------------------- ### Scheduler Method Validation Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Illustrates GroupMQ's validation for the 'scheduler' method's `orderingWindowMs` parameter, showing valid, recommended, and invalid configurations. ```typescript orderingWindowMs: 50 // ❌ Below 100ms - Disabled with warning orderingWindowMs: 100 // ✅ Minimum - Works but scheduler overhead high orderingWindowMs: 500 // ⚠️ Below 1000ms - Warning to consider 'in-memory' orderingWindowMs: 2000 // ✅ Recommended - Good balance ``` -------------------------------- ### Real-time Analytics - 'in-memory' Ordering Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Configuration for real-time analytics where events might be slightly out of order due to network latency, using the 'in-memory' method with a 200ms window. ```typescript // Events may arrive 50-200ms out of order due to network const queue = new Queue({ redis, namespace: 'analytics', orderingMethod: 'in-memory', orderingWindowMs: 200, }); ``` -------------------------------- ### Failure & retry behavior Source: https://github.com/openpanel-dev/groupmq/blob/main/benchmark/BENCHMARK.md Commands to run benchmarks for failure and retry behavior for BullMQ and GroupMQ. ```bash jiti benchmark/index.ts --mq bullmq --job io --concurrency 16 --jobs 5000 --fail-rate 0.05 --retries 3 --name bullmq_fail_retry ``` ```bash jiti benchmark/index.ts --mq groupmq --job io --concurrency 16 --jobs 5000 --fail-rate 0.05 --retries 3 --name groupmq_fail_retry ``` -------------------------------- ### In-Memory Ordering with Grace Period Decay Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Configures the 'in-memory' ordering method with grace period decay to make the system more responsive when jobs keep arriving. ```typescript const queue = new Queue({ redis, namespace: 'analytics', orderingMethod: 'in-memory', orderingWindowMs: 100, orderingGracePeriodDecay: 0.8, // 20% decay }); // Timeline: // Job 1 at 0ms: wait 100ms // Job 2 at 80ms: wait 80ms (100 × 0.8) // Job 3 at 160ms: wait 64ms (80 × 0.8) // Job 4 at 224ms: wait 51ms (64 × 0.8) // Total: ~275ms vs ~320ms without decay ``` -------------------------------- ### Worker Methods and Metrics Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-worker.mdx Demonstrates calling worker methods like run, close, and retrieving metrics. ```typescript await worker.run() // later const metrics = worker.getWorkerMetrics() console.log(metrics.currentJobId, metrics.blockingStats) const current = worker.getCurrentJob() if (current) console.log('processing', current.job.id, current.processingTimeMs) await worker.close(30_000) ``` -------------------------------- ### checkStalledJobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Manually check for and recover stalled jobs. Returns array of `[jobId, groupId, action]` tuples. Usually called automatically by workers. ```typescript const now = Date.now() const results = await queue.checkStalledJobs(now, 1000, 1) // Results: ['job-123', 'group-1', 'recovered', 'job-456', 'group-2', 'failed', ...] ``` -------------------------------- ### TypeScript Typing for Queue Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/creating-a-queue.mdx Provides a type parameter to Queue for type-safe worker handlers. ```typescript type OrderPayload = { id: string; ms: number }; const queue = new Queue({ redis, namespace: 'orders', }); // Inferred types inside handler: job.data is OrderPayload new Worker({ queue, async handler(job) { job.data.id; // string job.data.ms; // number }, }).run(); ``` -------------------------------- ### Order by Timestamp Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/key-features.mdx Jobs are processed in the correct order based on the provided timestamp, with an optional global ordering delay. ```typescript await queue.add({ groupId: 'acct:42', data: { event: 'withdrawal' }, orderMs: Date.now() - 1000, // earlier event }); ``` -------------------------------- ### Queue Options Type Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/configuration-options.mdx Defines the available options for configuring a queue. ```typescript type QueueOptions = { logger?: Logger | boolean; redis: Redis; namespace: string; jobTimeoutMs?: number; maxAttempts?: number; reserveScanLimit?: number; orderingDelayMs?: number; keepCompleted?: number; keepFailed?: number; schedulerLockTtlMs?: number; // default: 1500ms }; ``` -------------------------------- ### Cron & Repeats Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/key-features.mdx Schedule repeating jobs using intervals or cron patterns. ```typescript // Every 5 seconds await queue.add({ groupId: 'cron', data: { id: 'tick', ms: 50 }, repeat: { every: 5000 } }); // Cron pattern (midnight) await queue.add({ groupId: 'cron', data: { id: 'nightly', ms: 50 }, repeat: { pattern: '0 0 * * *' } }); ``` -------------------------------- ### promote(jobId) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Promotes a delayed job to run immediately. ```typescript await queue.promote(job.id) ``` -------------------------------- ### Multi-Region API - 'in-memory' Ordering Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Configuration for multi-region APIs where requests can be out of order due to cross-region latency, employing the 'in-memory' method with a 300ms window. ```typescript // Requests may arrive out of order due to cross-region latency const queue = new Queue({ redis, namespace: 'api-requests', orderingMethod: 'in-memory', orderingWindowMs: 300, }); ``` -------------------------------- ### Queue and Worker Configuration Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/error-handling-retries.mdx Demonstrates setting default max attempts at the queue level, overriding at the job level, and capping attempts with custom backoff at the worker level. ```typescript // Queue-level defaults const queue = new Queue({ redis, namespace: 'app', maxAttempts: 3, }); // Job-level override await queue.add({ groupId: 'g1', data: { id: '1' }, maxAttempts: 5, }); // Worker-level cap and custom backoff const worker = new Worker({ queue, maxAttempts: 5, backoff: (attempt) => Math.min(30000, 2 ** (attempt - 1) * 500), async handler(job) { throw new Error('boom'); }, }); ``` -------------------------------- ### Retries & Failures Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/key-features.mdx Configure retry attempts and handle job failures with backoff strategies. ```typescript // Queue default const queue = new Queue({ redis, namespace: 'app', maxAttempts: 3 }); // Job override await queue.add({ groupId: 'g1', data: { id: 'x' }, maxAttempts: 5 }); // Worker backoff new Worker({ queue, maxAttempts: 5, backoff: (attempt) => Math.min(30_000, 2 ** (attempt - 1) * 500), async handler(job) { /* ... */ }, }).run(); ``` -------------------------------- ### In-Memory Method Validation Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Shows GroupMQ's validation for the 'in-memory' method's `orderingWindowMs` parameter, highlighting recommended and capped values. ```typescript orderingWindowMs: 20 // ⚠️ Below 50ms - Warning (may not be effective) orderingWindowMs: 200 // ✅ Recommended - Good for network jitter orderingWindowMs: 500 // ✅ Good - Handles moderate latency orderingWindowMs: 1500 // ❌ Above 1000ms - Capped at 1000ms with warning ``` -------------------------------- ### Job Instance Methods Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Methods available on individual Job instances for manipulation and state retrieval. ```typescript const job = await queue.getJob(jobId); // Manipulate the job await job.remove(); await job.retry(); await job.promote(); await job.changeDelay(newDelayMs); await job.updateData(newData); await job.update(newData); // Alias for updateData // Get job state const state = await job.getState(); // 'active' | 'waiting' | 'delayed' | 'completed' | 'failed' // Serialize job const json = job.toJSON(); ``` -------------------------------- ### getJobsByStatus Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Fetch jobs across multiple statuses. Best-effort ordering, primarily for dashboards (BullBoard). ```typescript const jobs = await queue.getJobsByStatus(['waiting', 'active', 'failed'], 0, 50) ``` -------------------------------- ### promoteDelayedJobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Promote delayed jobs that are now ready. Workers call this periodically. ```typescript await queue.promoteDelayedJobs() ``` -------------------------------- ### complete({ id, groupId }) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Marks a reserved job as completed. ```typescript await queue.complete({ id: job.id, groupId: job.groupId }) ``` -------------------------------- ### Cron / repeating jobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/jobs.mdx Create repeating jobs with a fixed interval or a cron pattern. ```typescript // Every 5 seconds await queue.add({ groupId: 'cron', data: { type: 'reindex', payload: { index: 'products' } }, repeat: { every: 5000 } }); // Cron pattern (every day at midnight) await queue.add({ groupId: 'cron', data: { type: 'send-email', payload: { to: 'ops@x.com', subject: 'Daily report' } }, repeat: { pattern: '0 0 * * *' } }); ``` -------------------------------- ### getJob Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Fetch a single job as a `Job` entity with enriched fields (compatible with BullBoard expectations). ```typescript const j = await queue.getJob(job.id) console.log(j.status, j.attemptsMade, j.data) ``` -------------------------------- ### Logging with Custom Logger Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Using a custom logger implementation (compatible with pino/winston) for Queue and Worker. ```typescript import type { LoggerInterface } from 'groupmq'; const customLogger: LoggerInterface = { debug: (msg: string, ...args: any[]) => { /* custom logging */ }, info: (msg: string, ...args: any[]) => { /* custom logging */ }, warn: (msg: string, ...args: any[]) => { /* custom logging */ }, error: (msg: string, ...args: any[]) => { /* custom logging */ }, }; const queue = new Queue({ redis, namespace: 'orders', logger: customLogger, }); ``` -------------------------------- ### Disabling Scheduler on a Worker Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Shows how to disable the scheduler on a specific worker, useful when you want to control which workers promote buffered groups. ```typescript const worker = new Worker({ queue, handler: async (job) => { /* ... */ }, runScheduler: false, // This worker won't promote buffered groups }); ``` -------------------------------- ### getActiveJobs, getWaitingJobs, getDelayedJobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Lists of job IDs by state. ```typescript const ids = await queue.getWaitingJobs() ``` -------------------------------- ### reserve() Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Reserves the next available job immediately and non-blockingly. Returns null if no job is available. ```typescript const job = await queue.reserve() if (job) { // process } ``` -------------------------------- ### In-Memory Ordering with Max Wait Multiplier Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/ordering-methods.mdx Configures the 'in-memory' ordering method with a maximum wait multiplier to prevent infinite waiting. ```typescript const queue = new Queue({ redis, namespace: 'my-queue', orderingMethod: 'in-memory', orderingWindowMs: 100, orderingMaxWaitMultiplier: 5, // Max 500ms total wait }); ``` -------------------------------- ### reserveBlocking(timeoutSec?, blockUntil?) Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Blocks for up to `timeoutSec` seconds waiting for the next available job, adapting timeouts to activity and delayed jobs. ```typescript // Wait up to 5s for a job const job = await queue.reserveBlocking(5) if (job) { // process } ``` -------------------------------- ### Queue Options Source: https://github.com/openpanel-dev/groupmq/blob/main/README.md Defines the configuration options for a GroupMQ queue. ```typescript type QueueOptions = { redis: Redis; namespace: string; logger?: boolean | LoggerInterface; jobTimeoutMs?: number; maxAttempts?: number; reserveScanLimit?: number; keepCompleted?: number; keepFailed?: number; schedulerLockTtlMs?: number; orderingMethod?: OrderingMethod; orderingWindowMs?: number; orderingMaxWaitMultiplier?: number; orderingGracePeriodDecay?: number; orderingMaxBatchSize?: number; }; type OrderingMethod = 'none' | 'scheduler' | 'in-memory'; ``` -------------------------------- ### Worker Options Type Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/configuration-options.mdx Defines the available options for configuring a worker. ```typescript type WorkerOptions = { queue: Queue; name?: string; handler: (job: ReservedJob) => Promise; heartbeatMs?: number; // default: queue.jobTimeoutMs/3 onError?: (err: unknown, job?: ReservedJob) => void; maxAttempts?: number; // default: queue.maxAttemptsDefault backoff?: (attempt: number) => number; // ms enableCleanup?: boolean; // default: true cleanupIntervalMs?: number; // default: 60_000 schedulerIntervalMs?: number; // default: 1_000 blockingTimeoutSec?: number; // default: 5 atomicCompletion?: boolean; // default: true logger?: Logger | true; concurrency?: number; // default: 1 }; ``` -------------------------------- ### getCompletedJobs, getFailedJobs Source: https://github.com/openpanel-dev/groupmq/blob/main/website/src/content/docs/api-queue.mdx Convenience: return completed/failed as `Job` entities. ```typescript const finished = await queue.getCompletedJobs(10) ```