### Install @boringnode/queue Source: https://github.com/boringnode/queue/blob/main/README.md Install the package using npm. This is the first step to using the queue system. ```bash npm install @boringnode/queue ``` -------------------------------- ### AdapterFactory Example Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md An example demonstrating how to define a map of adapter factories for different storage systems like Redis and PostgreSQL. ```typescript const factories = { redis: () => new RedisAdapter(...), postgres: () => new KnexAdapter(...), } ``` -------------------------------- ### Initialize and Start Worker Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Worker.md Demonstrates how to initialize a Worker with Redis adapter, retry strategy, and worker-specific configurations. Includes graceful shutdown handlers and starting the worker to process jobs. ```typescript import { Worker, redis, exponentialBackoff } from '@boringnode/queue' const worker = new Worker({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, locations: ['./app/jobs/**/*.ts'], retry: { maxRetries: 3, backoff: exponentialBackoff(), }, worker: { concurrency: 5, // Process 5 jobs concurrently idleDelay: '2s', // Wait 2s when no jobs available stalledThreshold: '30s', // Recover jobs stalled for 30s stalledInterval: '30s', // Check every 30s maxStalledCount: 1, // Max 1 recovery per job gracefulShutdown: true, // Wait for jobs on SIGTERM onShutdownSignal: async () => { console.log('Shutdown signal received, cleaning up...') }, }, }) // Graceful shutdown handlers process.on('SIGINT', () => { console.log('SIGINT received, stopping worker...') worker.stop() }) process.on('SIGTERM', () => { console.log('SIGTERM received, stopping worker...') worker.stop() }) // Start processing try { await worker.start(['emails', 'notifications', 'default']) } finally { await QueueManager.destroy() } ``` -------------------------------- ### JobFactory Example Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Demonstrates how to use a custom job factory to integrate with IoC containers for job instantiation. ```typescript await QueueManager.init({ jobFactory: async (JobClass) => { return app.container.make(JobClass) } }) ``` -------------------------------- ### Worker.init() Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Worker.md Initializes the worker, setting up the QueueManager and adapter connection. This is called automatically by `start()`. ```APIDOC ## Method init ### Description Initialize the worker. Sets up the QueueManager and adapter connection. Called automatically by `start()`. ### Method `async init(): void` ### Request Example ```typescript const worker = new Worker(config) await worker.init() // QueueManager is now initialized ``` ``` -------------------------------- ### Start a Worker Source: https://github.com/boringnode/queue/blob/main/README.md Instantiate a Worker with configuration and start it to process jobs from specified queues. ```typescript import { Worker } from '@boringnode/queue' const worker = new Worker(config) await worker.start(['default', 'email']) ``` -------------------------------- ### Example: Multiple Job Dispatch Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Shows how to dispatch multiple jobs and log the number of jobs dispatched. ```typescript const { jobIds } = await SendEmailJob.dispatchMany(payloads) .group('newsletter-jan-2025') .run() console.log(`Dispatched ${jobIds.length} jobs`) ``` -------------------------------- ### from Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/ScheduleBuilder.md Sets the start date boundary for the schedule. Jobs will not be dispatched before this specified date. ```APIDOC ## from(date: Date): this ### Description Set the start boundary for the schedule. No jobs will be dispatched before this date. ### Parameters #### Path Parameters - **date** (Date) - Required - Start boundary (inclusive) ### Returns This builder for method chaining ### Example ```typescript // Campaign emails start on Jan 1, 2025 const startDate = new Date('2025-01-01') await CampaignEmailJob.schedule({ campaignId: 'new-year' }) .from(startDate) .every('1h') .run() // Reporting starts next Monday const nextMonday = new Date() nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7)) await ReportJob.schedule({ type: 'weekly' }) .from(nextMonday) .cron('0 9 * * 1') .run() ``` ``` -------------------------------- ### Database Schema Setup with QueueSchemaService Source: https://github.com/boringnode/queue/blob/main/README.md Use `QueueSchemaService` to create or manage the necessary database tables for the Knex adapter. Supports custom columns and table names. ```typescript import { QueueSchemaService } from '@boringnode/queue' import Knex from 'knex' const connection = Knex({ client: 'pg', connection: '...' }) const schemaService = new QueueSchemaService(connection) // Create tables with default names await schemaService.createJobsTable() await schemaService.createSchedulesTable() // Or extend with custom columns await schemaService.createJobsTable('queue_jobs', (table) => { table.string('tenant_id', 255).nullable() }) ``` ```typescript import { QueueSchemaService } from '@boringnode/queue' import { BaseSchema } from '@adonisjs/lucid/schema' export default class extends BaseSchema { async up() { const schemaService = new QueueSchemaService(this.db.connection().getWriteClient()) await schemaService.createJobsTable() await schemaService.createSchedulesTable() } async down() { const schemaService = new QueueSchemaService(this.db.connection().getWriteClient()) await schemaService.dropSchedulesTable() await schemaService.dropJobsTable() } } ``` -------------------------------- ### Example: Configuring Job Retention Options Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Illustrates setting job retention policies for completed and failed jobs within a Job class using static options. ```typescript class MyJob extends Job { static options: JobOptions = { // Keep last 1000 completed jobs removeOnComplete: { count: 1000 }, // Keep failed jobs for 7 days removeOnFail: { age: '7d' }, } } ``` -------------------------------- ### Initialize Production QueueManager Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Use this example to initialize the QueueManager in a production environment. It configures the Redis adapter, job locations, retry policies with exponential backoff, and worker settings. ```typescript import { QueueManager, redis, knex, exponentialBackoff } from '@boringnode/queue' import { pino } from 'pino' const logger = pino({ level: process.env.LOG_LEVEL || 'info' }) await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: process.env.REDIS_HOST, port: parseInt(process.env.REDIS_PORT || '6379'), password: process.env.REDIS_PASSWORD, db: parseInt(process.env.REDIS_DB || '0'), retryStrategy: (times) => Math.min(times * 50, 2000), }), }, locations: ['./app/jobs/**/*.js'], retry: { maxRetries: 3, backoff: exponentialBackoff({ baseDelay: '1s', maxDelay: '5m', jitter: true, }), }, worker: { concurrency: parseInt(process.env.WORKER_CONCURRENCY || '5'), idleDelay: '2s', stalledThreshold: '30s', stalledInterval: '30s', maxStalledCount: 2, gracefulShutdown: true, }, logger, }) ``` -------------------------------- ### Initialize QueueManager with Pino Logger Source: https://github.com/boringnode/queue/blob/main/README.md Integrate Pino logger with QueueManager for structured logging. Ensure Pino is installed and configured before initializing the QueueManager. ```typescript import { pino } from 'pino' await QueueManager.init({ // ... logger: pino(), }) ``` -------------------------------- ### Full ScheduleBuilder Example Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/ScheduleBuilder.md Demonstrates various scheduling configurations including daily cleanup, periodic syncs, weekday reports, limited-run campaigns, and yearly events with date boundaries. Imports multiple job types for comprehensive usage. ```typescript import { CleanupJob, SyncJob, ReportJob } from './jobs' // Daily cleanup at midnight UTC const { scheduleId: cleanupId } = await CleanupJob.schedule({ days: 30 }) .id('daily-cleanup') .cron('0 0 * * *') .timezone('UTC') .run() console.log(`Cleanup schedule created: ${cleanupId}`) // Sync every 5 minutes const { scheduleId: syncId } = await SyncJob.schedule({ source: 'api' }) .every('5m') .run() // Weekday report at 9 AM Paris time const { scheduleId: reportId } = await ReportJob.schedule({ format: 'pdf' }) .id('weekday-report') .cron('0 9 * * 1-5') .timezone('Europe/Paris') .run() // Limited promotion campaign: Jan 15-31, every 2 hours, max 4 runs const start = new Date('2025-01-15') const end = new Date('2025-01-31') const { scheduleId: promoId } = await PromoJob.schedule({ code: 'JAN-SALE' }) .between(start, end) .every('2h') .limit(4) .run() // Yearly event on specific dates with boundaries const { scheduleId: eventId } = await AnnualEventJob.schedule({ year: 2025 }) .id('annual-event') .cron('0 12 15 * *') // 15th of each month .timezone('America/New_York') .from(new Date('2025-01-01')) .to(new Date('2025-12-31')) .run() ``` -------------------------------- ### Find and Log Schedule Start Boundary Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Schedule.md Retrieves a schedule by its ID and logs its start boundary date. Jobs will not be dispatched before this date. Checks if the 'from' property is set. ```typescript const schedule = await Schedule.find('campaign-emails') if (schedule.from) { console.log(`Campaign starts: ${schedule.from.toISOString()}`) } ``` -------------------------------- ### Example: Single Job Dispatch with Dedup Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Demonstrates dispatching a single job with deduplication and checking the deduped outcome. ```typescript const { jobId, deduped } = await SendEmailJob.dispatch(payload) .dedup({ id: 'order-123', ttl: '1m', replace: true }) .run() if (deduped === 'replaced') { console.log('Job payload updated') } else if (deduped === 'added') { console.log('New job created') } else if (deduped === 'skipped') { console.log('Duplicate ignored') } ``` -------------------------------- ### Duration Type Examples Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Illustrates how to use the Duration type with both numeric and string representations. ```typescript const timeout: Duration = '30s' // 30 seconds const delay: Duration = 5000 // 5000 milliseconds const interval: Duration = '5m' // 5 minutes ``` -------------------------------- ### between Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/ScheduleBuilder.md Sets both the start and end date boundaries for the schedule, acting as a shorthand for calling .from() and .to(). ```APIDOC ## between(from: Date, to: Date): this ### Description Set both start and end boundaries for the schedule. Shorthand for `.from(from).to(to)`. ### Parameters #### Path Parameters - **from** (Date) - Required - Start boundary - **to** (Date) - Required - End boundary ### Returns This builder for method chaining ### Example ```typescript // Limited-time promotion: Jan 15-31 const start = new Date('2025-01-15') const end = new Date('2025-01-31') await LimitedPromoJob.schedule({ code: 'JAN-SALE' }) .between(start, end) .every('2h') .run() ``` ``` -------------------------------- ### Initialize Worker Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Worker.md Manually initialize the worker instance, setting up the QueueManager and adapter connections. This is automatically called by the start() method. ```typescript const worker = new Worker(config) await worker.init() // QueueManager is now initialized ``` -------------------------------- ### Set Schedule Start Boundary Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/ScheduleBuilder.md Utilize the `from` method to define the earliest date a schedule can dispatch jobs. This boundary is inclusive. ```typescript // Campaign emails start on Jan 1, 2025 const startDate = new Date('2025-01-01') await CampaignEmailJob.schedule({ campaignId: 'new-year' }) .from(startDate) .every('1h') .run() // Reporting starts next Monday const nextMonday = new Date() nextMonday.setDate(nextMonday.getDate() + ((1 + 7 - nextMonday.getDay()) % 7)) await ReportJob.schedule({ type: 'weekly' }) .from(nextMonday) .cron('0 9 * * 1') .run() ``` -------------------------------- ### Manage and Monitor Schedules with Schedule API Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Schedule.md This example demonstrates how to find a specific schedule by ID, inspect its properties, and perform actions like pausing, resuming, triggering, and deleting it. It also shows how to list all schedules, filter them by status, and monitor their last run times and run limits. ```typescript import { Schedule } from '@boringnode/queue' // Find and manage a specific schedule const schedule = await Schedule.find('daily-cleanup') if (schedule) { console.log(`Schedule: ${schedule.id}`) console.log(`Job: ${schedule.name}`) console.log(`Status: ${schedule.status}`) console.log(`Runs: ${schedule.runCount}`) console.log(`Next: ${schedule.nextRunAt?.toISOString()}`) console.log(`Last: ${schedule.lastRunAt?.toISOString()}`) // Pause it await schedule.pause() // Later, resume await schedule.resume() // Trigger immediately await schedule.trigger() // Finally delete await schedule.delete() } // List all schedules const allSchedules = await Schedule.list() console.log(`Total schedules: ${allSchedules.length}`) // List active schedules const active = await Schedule.list({ status: 'active' }) active.forEach(s => { console.log(`- ${s.id} (${s.name}): next at ${s.nextRunAt}`) }) // Find paused schedules and resume them const paused = await Schedule.list({ status: 'paused' }) for (const schedule of paused) { await schedule.resume() console.log(`Resumed: ${schedule.id}`) } // Monitoring: find schedules that haven't run recently const allActive = await Schedule.list({ status: 'active' }) const oneHourAgo = new Date(Date.now() - 3600000) for (const schedule of allActive) { if (!schedule.lastRunAt || schedule.lastRunAt < oneHourAgo) { console.warn(`Schedule ${schedule.id} hasn't run in over an hour`) } } // Find schedules nearing their run limit const limited = allActive.filter(s => s.limit !== null) for (const schedule of limited) { const remaining = schedule.limit! - schedule.runCount if (remaining === 1) { console.warn(`Schedule ${schedule.id} will complete after next run`) } } ``` -------------------------------- ### Scheduling Jobs Source: https://github.com/boringnode/queue/blob/main/README.md Provides examples for scheduling jobs to run at specific intervals or according to cron expressions. It also details how to manage these schedules, including finding, pausing, resuming, triggering, and deleting them. ```APIDOC ## Scheduling Jobs ### Description Schedule jobs to run at recurring intervals or using cron expressions. Provides methods for managing schedules, including finding, pausing, resuming, triggering, and deleting them. ### Scheduling a Job - **Fixed Interval:** ```typescript await MyJob.schedule({ /* payload */ }).every('10s'); ``` - **Cron Schedule:** ```typescript await MyJob.schedule({ /* payload */ }) .id('unique-job-id') .cron('0 0 * * *') .timezone('Europe/Paris'); ``` ### Schedule Options - `.id(string)`: Assigns a unique identifier to the schedule. - `.every(duration)`: Sets a fixed interval for the job (e.g., '5s', '1m', '1h'). - `.cron(expression)`: Defines the schedule using a cron expression. - `.timezone(tz)`: Specifies the timezone for the cron schedule (default: 'UTC'). - `.from(date)`: Sets the start boundary for the schedule. - `.to(date)`: Sets the end boundary for the schedule. - `.limit(n)`: Sets the maximum number of times the job can run. ### Schedule Management - **Finding a Schedule:** ```typescript const schedule = await Schedule.find('unique-job-id'); ``` - **Pausing a Schedule:** ```typescript await schedule.pause(); ``` - **Resuming a Schedule:** ```typescript await schedule.resume(); ``` - **Triggering a Schedule (Run Now):** ```typescript await schedule.trigger(); ``` - **Deleting a Schedule:** ```typescript await schedule.delete(); ``` - **Listing Schedules:** ```typescript const allSchedules = await Schedule.list(); const activeSchedules = await Schedule.list({ status: 'active' }); ``` ``` -------------------------------- ### JobContext Execution Example Source: https://github.com/boringnode/queue/blob/main/_autodocs/types.md Demonstrates how to access and log information from the JobContext within a job's execute method, such as job ID, attempt number, queue, and priority. ```typescript async execute() { console.log(`Job ${this.context.jobId} attempt ${this.context.attempt}`) console.log(`Queue: ${this.context.queue}, Priority: ${this.context.priority}`) } ``` -------------------------------- ### Worker.start() Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Worker.md Starts processing jobs from specified queues. This method blocks until the worker is stopped. Jobs are processed concurrently up to the configured limit. ```APIDOC ## Method start ### Description Start processing jobs from the specified queues. This method blocks until the worker is stopped via `stop()` or a shutdown signal (SIGINT/SIGTERM). Jobs are processed concurrently up to the configured concurrency limit. The worker polls queues in the order specified, giving priority to earlier queues. ### Method `async start(queues?: string[]): Promise` ### Parameters #### Query Parameters - **queues** (string[]) - Optional - Default: `['default']` - Queue names to process in priority order ### Returns Promise that resolves when the worker stops ### Request Example ```typescript // Process single queue await worker.start() // Process multiple queues (priority order) await worker.start(['high-priority', 'default', 'low-priority']) // With signal handling process.on('SIGINT', () => worker.stop()) process.on('SIGTERM', () => worker.stop()) await worker.start(['emails', 'notifications']) ``` ``` -------------------------------- ### Schedule Job Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Job.md Create a schedule to run a job on a recurring basis using cron or interval configurations. Returns a builder for fluent schedule setup. ```APIDOC ## schedule(payload: T extends Job ? P : never): ScheduleBuilder

### Description Create a schedule to run this job on a recurring basis (cron or interval). ### Method Static method of the Job class. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **payload** (Payload) - Required - The data to pass on each run. ### Returns `ScheduleBuilder` for fluent configuration. ### Request Example ```typescript // Cron schedule (daily at midnight) await CleanupJob.schedule({ days: 30 }) .id('cleanup-daily') .cron('0 0 * * *') .timezone('Europe/Paris') .run() // Interval schedule (every 5 minutes) await SyncJob.schedule({ source: 'api' }) .every('5m') .run() ``` ``` -------------------------------- ### Dispatching a Batch of Jobs and Retrieving IDs Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobBatchDispatcher.md Execute the dispatch of a configured batch of jobs using the `run` method. It returns a `DispatchManyResult` object containing an array of job IDs for all dispatched jobs. This example shows a complete configuration chain. ```typescript const recipients = [ { to: 'user1@example.com', subject: 'Welcome' }, { to: 'user2@example.com', subject: 'Welcome' }, { to: 'user3@example.com', subject: 'Welcome' }, ] const { jobIds } = await SendEmailJob.dispatchMany(recipients) .group('onboarding-batch') .toQueue('emails') .priority(2) .run() console.log(`Dispatched ${jobIds.length} jobs`) jobIds.forEach(id => console.log(` - ${id}`)) ``` -------------------------------- ### Set Schedule Start and End Boundaries Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/ScheduleBuilder.md The `between` method provides a shorthand for setting both the start and end boundaries of a schedule simultaneously. ```typescript // Limited-time promotion: Jan 15-31 const start = new Date('2025-01-15') const end = new Date('2025-01-31') await LimitedPromoJob.schedule({ code: 'JAN-SALE' }) .between(start, end) .every('2h') .run() ``` -------------------------------- ### Initialize Knex Adapter Source: https://github.com/boringnode/queue/blob/main/README.md Set up the Knex adapter for databases like PostgreSQL, MySQL, or SQLite. Provide database connection configuration. ```typescript import { knex } from '@boringnode/queue/drivers/knex_adapter' const adapter = knex({ client: 'pg', connection: { host: 'localhost', database: 'myapp' }, }) ``` -------------------------------- ### with Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobBatchDispatcher.md Specifies a particular adapter to be used for all jobs within this batch. This method is chainable. ```APIDOC ## with ### Description Use a specific adapter for all jobs in this batch. ### Method `with(adapter: string | (() => Adapter)): this` ### Parameters #### Path Parameters - **adapter** (string | (() => Adapter)) - Required - Adapter name or factory function ### Returns This dispatcher for method chaining ### Example ```typescript // Use named adapter await Job.dispatchMany(payloads).with('redis') // Use adapter factory const customAdapter = () => new MyAdapter() await Job.dispatchMany(payloads).with(customAdapter) ``` ``` -------------------------------- ### Initialize Sync Adapter for Testing Source: https://github.com/boringnode/queue/blob/main/README.md Configure the Sync adapter for immediate job execution. Suitable for lightweight local development and testing scenarios where background processing is not required. ```typescript import { sync } from '@boringnode/queue/drivers/sync_adapter' const adapter = sync() // Jobs execute immediately ``` -------------------------------- ### getConfigResolver() Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Gets the configuration resolver used for merging global, queue-level, and job-level options. This is primarily for internal use. ```APIDOC ## getConfigResolver(): QueueConfigResolver ### Description Get the resolver responsible for effective queue/job runtime configuration. Used internally to merge global, queue-level, and job-level options. ### Returns Configuration resolver instance ### Throws `E_QUEUE_NOT_INITIALIZED` if `init()` hasn't been called ``` -------------------------------- ### Initialize Redis Adapter Source: https://github.com/boringnode/queue/blob/main/README.md Configure the Redis adapter for production use. You can provide connection options directly or use an existing ioredis instance. ```typescript import { redis } from '@boringnode/queue/drivers/redis_adapter' // With options const adapter = redis({ host: 'localhost', port: 6379 }) // With existing ioredis instance import { Redis } from 'ioredis' const connection = new Redis({ host: 'localhost' }) const adapter = redis(connection) ``` -------------------------------- ### Run Benchmark Source: https://github.com/boringnode/queue/blob/main/README.md Execute the benchmark tests using npm. The `--realistic` flag simulates a more accurate workload. ```bash npm run benchmark -- --realistic ``` -------------------------------- ### Dispatch Multiple Jobs in a Batch Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobBatchDispatcher.md Demonstrates how to prepare and dispatch multiple jobs in a batch using the `dispatchMany` method. This includes setting the target queue, priority, group, and specifying the adapter. It also shows how to retrieve and log the dispatched job IDs and later monitor a specific job's status. ```typescript import { SendEmailJob } from './jobs/send_email_job' // Prepare batch const recipients = [ { to: 'alice@example.com', subject: 'Welcome Alice' }, { to: 'bob@example.com', subject: 'Welcome Bob' }, { to: 'charlie@example.com', subject: 'Welcome Charlie' }, ] // Dispatch batch with options const { jobIds } = await SendEmailJob.dispatchMany(recipients) .toQueue('onboarding') // Target queue .priority(2) // All jobs: priority 2 .group('onboarding-wave-1') // Group for monitoring .with('redis') // Use Redis adapter .run() // Log results console.log(`Dispatched ${jobIds.length} jobs:`) jobIds.forEach((id, i) => { console.log(` ${i + 1}. ${id} → ${recipients[i].to}`) }) // Later: retrieve and monitor specific jobs const jobId = jobIds[0] const job = await QueueManager.use().getJob(jobId, 'onboarding') if (job) { console.log(`Job status: ${job.status}`) } ``` -------------------------------- ### QueueManager.init Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Initializes the queue system with the provided configuration. This method must be called before any other queue operations. It sets up adapters, and can optionally auto-discover and register job classes. ```APIDOC ## async init(config: QueueManagerConfig): this ### Description Initialize the queue system with the given configuration. This must be called before using the queue system. It validates the configuration, registers adapters, and optionally auto-discovers and registers job classes from configured locations. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **config** (QueueManagerConfig) - Required - Queue system configuration. See Configuration Fields below. ### Configuration Fields #### Path Parameters None #### Query Parameters None #### Request Body - **default** (string) - Required - Name of the default adapter - **adapters** (Record) - Required - Map of adapter names to factory functions - **retry** (RetryConfig) - Optional - Global retry policy for all jobs - **defaultJobOptions** (JobOptions) - Optional - Global default options for all jobs - **queues** (Record) - Optional - Per-queue configuration (retry, adapter, defaults) - **worker** (WorkerConfig) - Optional - Worker runtime options (concurrency, timeouts, etc.) - **locations** (string[]) - Optional - Glob patterns for auto-discovering job classes - **autoLoadJobs** (boolean) - Optional - Whether to load jobs from `locations` during `init()` (default: true) - **logger** (Logger) - Optional - Logger instance for runtime output (default: consoleLogger) - **jobFactory** (JobFactory) - Optional - Custom job instantiation function for DI containers - **internalOperationWrapper** ((fn) => Promise) - Optional - Wrapper for internal adapter operations (OTel instrumentation) - **executionWrapper** ((fn, job, queue) => Promise) - Optional - Wrapper for job execution (OTel/tracing) ### Returns This instance for chaining ### Throws - `E_CONFIGURATION_ERROR` if configuration is invalid (missing adapters, invalid default adapter, etc.) - `E_ADAPTER_INIT_ERROR` if an adapter factory throws ### Request Example ```typescript import { QueueManager, redis, exponentialBackoff } from '@boringnode/queue' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, locations: ['./app/jobs/**/*.ts'], retry: { maxRetries: 3, backoff: exponentialBackoff({ baseDelay: '1s', maxDelay: '1m' }), }, worker: { concurrency: 5, idleDelay: '2s', stalledThreshold: '30s', }, }) ``` ``` -------------------------------- ### Get Logger Instance Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Obtain the logger instance configured for the queue runtime. This logger can be used for logging queue-related events and messages. ```typescript const logger = QueueManager.getLogger() logger.info('Queue system initialized') ``` -------------------------------- ### Initialize and Use QueueManager Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Initializes the QueueManager with multiple adapters, job locations, retry strategies, and worker configurations. Demonstrates dispatching a job and retrieving a specific adapter for low-level operations. Includes graceful shutdown handling. ```typescript import { QueueManager, redis, knex, exponentialBackoff } from '@boringnode/queue' import Knex from 'knex' // Initialize with multiple adapters await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), postgres: knex({ client: 'pg', connection: { host: 'localhost', database: 'queue_db' }, }), }, locations: ['./app/jobs/**/*.ts'], retry: { maxRetries: 3, backoff: exponentialBackoff(), }, worker: { concurrency: 5, idleDelay: '2s', stalledThreshold: '30s', }, logger: pino(), // Custom logger }) // Use the queue system await SendEmailJob.dispatch({ to: 'user@example.com' }) // Get a specific adapter for low-level operations const adapter = QueueManager.use('redis') const job = await adapter.getJob('job-123', 'emails') // Testing using fake = QueueManager.fake() await MyJob.dispatch({ data: 'test' }) fake.assertPushed(MyJob) // Auto-restored here // Shutdown process.on('SIGTERM', async () => { console.log('Shutting down gracefully...') await QueueManager.destroy() }) ``` -------------------------------- ### Handle Invalid Schedule Configuration Source: https://github.com/boringnode/queue/blob/main/_autodocs/errors.md Catch 'E_INVALID_SCHEDULE_CONFIG' to inform users about invalid schedule setups, such as missing or duplicate cron/interval specifications. ```typescript try { // Missing both cron and interval await MyJob.schedule(payload).run() } catch (error) { if (error.code === 'E_INVALID_SCHEDULE_CONFIG') { console.error('Schedule must have cron or interval:', error.message) } } ``` ```typescript try { // Both specified (invalid) await MyJob.schedule(payload) .cron('0 0 * * *') .every('5m') .run() } catch (error) { if (error.code === 'E_INVALID_SCHEDULE_CONFIG') { console.error('Schedule cannot have both cron and interval') } } ``` -------------------------------- ### Create Queue Tables with Knex Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueSchemaService.md Use this snippet to create the necessary database tables for the queue system when using Knex directly. Ensure Knex is properly configured. ```typescript import Knex from 'knex' import { QueueSchemaService } from '@boringnode/queue' const connection = Knex({ client: 'pg', connection: { host: 'localhost', user: 'postgres', password: 'secret', database: 'myapp', }, }) const schemaService = new QueueSchemaService(connection) // Create tables await schemaService.createJobsTable() await schemaService.createSchedulesTable() console.log('Queue tables created') ``` -------------------------------- ### Get Job Factory Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Retrieve the configured job factory for custom job instantiation. This factory can be used to create job instances manually. ```typescript const factory = QueueManager.getJobFactory() if (factory) { const jobInstance = await factory(MyJobClass) } ``` -------------------------------- ### Configure Sync Adapter Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Configure the Sync adapter for development and testing only. Jobs execute immediately within the dispatch call. ```typescript import { sync } from '@boringnode/queue/drivers/sync_adapter' const syncAdapter = sync() // Jobs execute immediately (testing only) await QueueManager.init({ default: 'sync', adapters: { sync: sync() }, // ... }) ``` -------------------------------- ### Dispatch Job and Get Result Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobDispatcher.md Dispatch a job to the queue and retrieve the dispatch result, which includes the job ID and deduplication outcome if applicable. ```typescript const { jobId } = await SendEmailJob.dispatch({ to: 'user@example.com' }) .toQueue('emails') .priority(1) .in('5m') .run() console.log(`Dispatched job: ${jobId}`) ``` -------------------------------- ### Get Next Retry Date Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/BackoffStrategy.md Determine the exact Date for the next retry based on the current attempt number. Useful for scheduling retries. ```typescript const strategy = new BackoffStrategy({ strategy: 'exponential', baseDelay: '1s', }) const nextRetry = strategy.getNextRetryAt(3) console.log(`Retry at: ${nextRetry.toISOString()}`) ``` -------------------------------- ### Simple Job Dispatch Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobDispatcher.md Demonstrates how to dispatch a job with basic payload. ```APIDOC ## Simple Dispatch This shows a basic job dispatch with a payload. ### Method Signature `Job.dispatch(payload: object)` ### Parameters - **payload** (object) - Required - The data to be passed to the job. ### Example ```typescript await SendEmailJob.dispatch({ to: 'user@example.com', subject: 'Hello' }) ``` ``` -------------------------------- ### Initialize Knex Adapter with Existing Instance or Custom Table Source: https://github.com/boringnode/queue/blob/main/README.md Configure the Knex adapter using an existing Knex instance or specify a custom table name for job storage. ```typescript // With existing Knex instance import Knex from 'knex' const connection = Knex({ client: 'pg', connection: '...' }) const adapter = knex(connection) // Custom table name const adapter = knex(config, 'custom_jobs_table') ``` -------------------------------- ### Handle E_INVALID_CRON_EXPRESSION in ScheduleBuilder Source: https://github.com/boringnode/queue/blob/main/_autodocs/errors.md This example demonstrates how to catch invalid cron expressions or timezones when scheduling jobs. It logs the error message and the original cause. ```typescript try { await CleanupJob.schedule({ days: 30 }) .cron('invalid cron') .timezone('Europe/Paris') .run() } catch (error) { if (error.code === 'E_INVALID_CRON_EXPRESSION') { console.error(`Invalid cron: ${error.message}`) console.error(`Original error:`, error.cause) } } ``` -------------------------------- ### Get Configuration Resolver Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Retrieve the configuration resolver responsible for merging queue and job runtime configurations. This is an internal utility for managing configuration options. ```typescript const resolver = QueueManager.getConfigResolver() ``` -------------------------------- ### Get Queue Adapter Instance Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Retrieves an adapter instance by its registered name or the default adapter if no name is specified. Adapter instances are cached for reuse. ```typescript // Get default adapter const adapter = QueueManager.use() // Get specific adapter const redisAdapter = QueueManager.use('redis') const postgresAdapter = QueueManager.use('postgres') // Perform low-level operations const job = await adapter.getJob('job-id', 'queue-name') ``` -------------------------------- ### Configure Redis Adapter Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Configure the Redis adapter with connection options or an existing ioredis instance. Options include host, port, password, database, key prefix, and connection timeouts. ```typescript import { redis } from '@boringnode/queue/drivers/redis_adapter' // With connection options const redisAdapter = redis({ host: 'localhost', port: 6379, password: 'secret', db: 0, keyPrefix: 'myapp::queue::', connectTimeout: 10000, maxRetriesPerRequest: null, }) // With existing ioredis instance import { Redis } from 'ioredis' const connection = new Redis({ host: 'localhost' }) const redisAdapter = redis(connection) // Caller manages connection lifecycle ``` -------------------------------- ### Initialize QueueManager with Custom Pino Logger Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Configure the QueueManager to use a custom logger, such as Pino with 'pino-pretty' for colorful console output. Ensure the logger conforms to the expected interface. ```typescript import { pino } from 'pino' const logger = pino({ level: 'info', transport: { target: 'pino-pretty', options: { colorize: true }, }, }) await QueueManager.init({ default: 'redis', adapters: { redis: redis() }, logger, // ... }) ``` -------------------------------- ### List Schedules with Options Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Schedule.md Retrieve a list of schedules, optionally filtering by status. Use this to get all schedules or specific subsets like active or paused ones. ```typescript // List all schedules const all = await Schedule.list() // List only active schedules const active = await Schedule.list({ status: 'active' }) // List only paused schedules const paused = await Schedule.list({ status: 'paused' }) ``` -------------------------------- ### Job Dispatch with Options Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobDispatcher.md Illustrates dispatching a job with various options like queue, priority, and deduplication. ```APIDOC ## Dispatch with Options This example shows how to dispatch a job with advanced options to control its execution and queuing behavior. ### Method Chaining Jobs can be dispatched and configured using method chaining. ### Available Options - **toQueue(queueName: string)**: Specifies the queue the job should be dispatched to. - **priority(level: number)**: Sets the priority level for the job. - **in(delay: Duration)**: Sets a delay before the job can be executed. - **group(groupId: string)**: Assigns the job to a group. - **dedup(options: DedupOptions)**: Configures deduplication for the job. - **with(driver: string)**: Specifies the driver to use for dispatching. - **run()**: Executes the dispatch operation. ### Parameters for `dedup` - **id** (string) - Required - A unique identifier for deduplication. - **ttl** (Duration) - Required - Time-to-live for the deduplication key. - **replace** (boolean) - Optional - If true, replaces an existing job in the queue. - **extend** (boolean) - Optional - If true, extends the TTL of an existing job. ### Example ```typescript const result = await SendEmailJob.dispatch({ to: 'user@example.com' }) .toQueue('high-priority') .priority(1) .in('5m') .group('batch-jan') .dedup({ id: 'welcome-123', ttl: '1d' }) .with('redis') .run() console.log(`Job ${result.jobId} dispatched, dedup outcome: ${result.deduped}`) ``` ``` -------------------------------- ### Initialize QueueManager with Configuration Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Initializes the queue system with adapter definitions, retry policies, worker options, and job locations. Must be called before using queue functionalities. ```typescript import { QueueManager, redis, exponentialBackoff } from '@boringnode/queue' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, locations: ['./app/jobs/**/*.ts'], retry: { maxRetries: 3, backoff: exponentialBackoff({ baseDelay: '1s', maxDelay: '1m' }), }, worker: { concurrency: 5, idleDelay: '2s', stalledThreshold: '30s', }, }) ``` -------------------------------- ### Initialize Fake Adapter for Testing Source: https://github.com/boringnode/queue/blob/main/README.md Use the Fake adapter with `QueueManager.fake()` for testing. It allows dispatching jobs and asserting their presence, payload, and count. ```typescript import { QueueManager } from '@boringnode/queue' import { redis } from '@boringnode/queue/drivers/redis_adapter' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost' }), }, locations: ['./app/jobs/**/*.ts'], }) // The `using` keyword automatically restores the real adapters when // the variable goes out of scope (at the end of the test function). using fake = QueueManager.fake() await SendEmailJob.dispatch({ to: 'user@example.com' }) fake.assertPushed(SendEmailJob) fake.assertPushed(SendEmailJob, { queue: 'default', payload: (payload) => payload.to === 'user@example.com', }) fake.assertPushedCount(1) ``` -------------------------------- ### with Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobDispatcher.md Specifies a custom adapter to be used for dispatching the job, overriding any default or queue-level configurations. ```APIDOC ## with(adapter: string | (() => Adapter)): this ### Description Use a specific adapter for this job. Overrides the default or queue-level adapter. ### Parameters #### Path Parameters N/A #### Query Parameters N/A #### Request Body - **adapter** (string | (() => Adapter)) - Required - Adapter name or factory function ### Request Example ```typescript // Use named adapter await Job.dispatch(payload).with('redis') // Use adapter factory const customAdapter = () => new MyAdapter() await Job.dispatch(payload).with(customAdapter) ``` ### Response #### Success Response (200) - **this** (this) - This dispatcher for method chaining #### Response Example N/A ``` -------------------------------- ### Start Worker Processing Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/Worker.md Begin processing jobs from specified queues. This method blocks until the worker is stopped. Jobs are processed concurrently up to the configured limit, respecting the priority order of the provided queues. ```typescript // Process single queue await worker.start() // Process multiple queues (priority order) await worker.start(['high-priority', 'default', 'low-priority']) // With signal handling process.on('SIGINT', () => worker.stop()) process.on('SIGTERM', () => worker.stop()) await worker.start(['emails', 'notifications']) ``` -------------------------------- ### Configure Multiple Queues Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Initialize QueueManager with multiple queues, specifying adapters and queue-specific retry configurations. ```typescript await QueueManager.init({ default: 'redis', adapters: { redis: redis(), postgres: knex({...}), }, queues: { 'emails': { adapter: 'redis', retry: { maxRetries: 5 }, }, 'webhooks': { adapter: 'postgres', retry: { maxRetries: 3 }, }, 'background': { adapter: 'redis', defaultJobOptions: { priority: 10 }, }, }, // ... }) ``` -------------------------------- ### Initialize Queue Manager with Worker Options Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Configure the default adapter, worker concurrency, idle delay, timeouts, and shutdown behavior. The `onShutdownSignal` callback allows for custom cleanup logic before the worker exits. ```typescript await QueueManager.init({ default: 'redis', adapters: { redis: redis() }, worker: { concurrency: 5, // Process 5 jobs in parallel idleDelay: '2s', // Wait 2s when queue empty timeout: '30s', // Default job timeout stalledThreshold: '30s', // Stall threshold stalledInterval: '30s', // Stall check frequency maxStalledCount: 1, // Max stall recoveries gracefulShutdown: true, // Wait for jobs on shutdown onShutdownSignal: async () => { console.log('Shutting down...') await cleanup() }, }, // ... }) ``` -------------------------------- ### Grouping Jobs in a Batch Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/JobBatchDispatcher.md Assign a group ID to all jobs in a batch using the `group` method. This facilitates filtering and monitoring of related jobs in UI dashboards. The example demonstrates chaining multiple configuration methods before running the batch. ```typescript const recipients = [ { to: 'user1@example.com' }, { to: 'user2@example.com' }, { to: 'user3@example.com' }, ] const { jobIds } = await SendEmailJob.dispatchMany(recipients) .group('newsletter-jan-2025') .toQueue('emails') .priority(3) .run() console.log(`Dispatched ${jobIds.length} jobs in group newsletter-jan-2025`) ``` -------------------------------- ### Instantiate QueueSchemaService Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueSchemaService.md Create a new schema service instance bound to a Knex database connection. This is required when using the Knex adapter with PostgreSQL, MySQL, or SQLite. ```typescript import Knex from 'knex' import { QueueSchemaService } from '@boringnode/queue' const connection = Knex({ client: 'pg', connection: '...' }) const schemaService = new QueueSchemaService(connection) await schemaService.createJobsTable() await schemaService.createSchedulesTable() ``` -------------------------------- ### Initialize QueueManager Source: https://github.com/boringnode/queue/blob/main/_autodocs/configuration.md Initialize the queue system with QueueManager.init(). Configure default adapter, job locations, retry strategy, worker concurrency, and custom logger or job factory. ```typescript import { QueueManager, redis, knex, exponentialBackoff } from '@boringnode/queue' await QueueManager.init({ // Required default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, // Optional locations: ['./app/jobs/**/*.ts'], retry: { maxRetries: 3, backoff: exponentialBackoff() }, worker: { concurrency: 5, idleDelay: '2s' }, logger: customLogger, jobFactory: async (JobClass) => app.container.make(JobClass), }) ``` -------------------------------- ### fake() Source: https://github.com/boringnode/queue/blob/main/_autodocs/api-reference/QueueManager.md Replaces all queue adapters with a fake adapter for testing purposes. This fake adapter records pushed jobs and provides assertion helpers. It can be used with the `using` keyword for automatic restoration or manually with `restore()`. ```APIDOC ## fake(): FakeAdapter ### Description Replace all adapters with a fake adapter for testing. The fake adapter records pushed jobs and exposes assertion helpers. Use the `using` keyword to automatically restore the previous configuration when the variable goes out of scope. ### Returns `FakeAdapter` instance with assertion methods ### Example ```typescript // Auto-restore with using keyword (requires Node.js 13.2+) using fake = QueueManager.fake() await SendEmailJob.dispatch({ to: 'user@example.com' }) fake.assertPushed(SendEmailJob) fake.assertPushed(SendEmailJob, { queue: 'default', payload: (p) => p.to === 'user@example.com', }) fake.assertPushedCount(1) // Automatically restored at end of scope // Or manual restoration const fake = QueueManager.fake() try { await SendEmailJob.dispatch({ to: 'user@example.com' }) fake.assertPushed(SendEmailJob) } finally { QueueManager.restore() } ``` ``` -------------------------------- ### Configure Queue Manager Source: https://github.com/boringnode/queue/blob/main/README.md Initialize the QueueManager with default adapter and adapter configurations. Specify job locations for auto-discovery. ```typescript import { QueueManager } from '@boringnode/queue' import { redis } from '@boringnode/queue/drivers/redis_adapter' await QueueManager.init({ default: 'redis', adapters: { redis: redis({ host: 'localhost', port: 6379 }), }, locations: ['./app/jobs/**/*.ts'], }) ```