### Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/contributing.md Clone the repository, navigate to the directory, and install dependencies. ```bash # Fork the repo on GitHub, then: git clone https://github.com/YOUR_USERNAME/bunqueue.git cd bunqueue bun install ``` -------------------------------- ### Install from source Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/installation.md Install bunqueue from its source code repository. ```bash git clone https://github.com/egeominotti/bunqueue.git cd bunqueue bun install bun run build ``` -------------------------------- ### Server Mode Commands Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/installation.md Commands to start the bunqueue server and check its version. ```bash # Start server bunqueue start # Check version bunqueue --version ``` -------------------------------- ### Embedded Mode Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/installation.md Example of setting up a Queue and Worker in embedded mode. ```typescript import { Queue, Worker } from 'bunqueue/client'; // Both Queue and Worker must have embedded: true const queue = new Queue('test', { embedded: true }); const worker = new Worker('test', async (job) => { console.log('Processing:', job.data); return { success: true }; }, { embedded: true }); await queue.add('hello', { message: 'bunqueue is working!' }); ``` -------------------------------- ### Development Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/faq.md Commands to clone the repository, install dependencies, run tests, and start the development server. ```bash git clone https://github.com/egeominotti/bunqueue cd bunqueue bun install bun test bun run dev ``` -------------------------------- ### Benchmark Setup and Execution Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/comparison.mdx Commands to clone the repository, install dependencies, start Redis and bunqueue servers, and run the benchmark. ```bash # Clone the repository git clone https://github.com/egeominotti/bunqueue.git cd bunqueue bun install # Start Redis (required for BullMQ) redis-server --daemonize yes # Start bunqueue server bun run start & # Run the benchmark bun run bench/comparison/run.ts ``` -------------------------------- ### Persistence Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/migration.md Example of setting up data persistence for bunqueue in embedded mode. ```typescript const queue = new Queue('my-queue', { embedded: true, dataPath: './data/bunq.db' }); ``` -------------------------------- ### TypeScript Type Definitions Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/installation.md Example import statement for bunqueue TypeScript type definitions. ```typescript import type { Job, JobOptions, WorkerOptions, StallConfig, DlqConfig, DlqEntry } from 'bunqueue/client'; ``` -------------------------------- ### Full Production Setup Environment Variables Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example environment variables for a full production setup of bunqueue. ```bash # /etc/env NODE_ENV=production DATA_PATH=/var/lib/bunq.db TCP_PORT=6789 HTTP_PORT=6790 AUTH_TOKENS=prod-token-abc123,deploy-token-xyz789 ``` -------------------------------- ### Complete Example - App Setup and Routes Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/hono.md A comprehensive example demonstrating Hono setup, middleware, queue initialization, and API routes for enqueuing jobs and checking status. ```typescript import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { logger } from 'hono/logger'; import { Queue, Worker, shutdownManager } from 'bunqueue/client'; const app = new Hono(); // Middleware app.use('*', logger()); app.use('/api/*', cors()); // Queues (embedded mode) const queues = { emails: new Queue('emails', { embedded: true }), reports: new Queue('reports', { embedded: true }), webhooks: new Queue('webhooks', { embedded: true }), }; // Enqueue job app.post('/api/send-email', async (c) => { const { to, subject, template, data } = await c.req.json(); const job = await queues.emails.add('send', { to, subject, template, data, }, { attempts: 3, backoff: 5000, removeOnComplete: true, }); return c.json({ queued: true, jobId: job.id }); }); // Generate report (long-running task) app.post('/api/reports/generate', async (c) => { const { type, filters, format } = await c.req.json(); const job = await queues.reports.add('generate', { type, filters, format, requestedBy: c.req.header('X-User-ID'), }, { timeout: 300000, // 5 minutes priority: 10, }); return c.json({ jobId: job.id, statusUrl: `/api/jobs/reports/${job.id}`, }); }); // Poll job status app.get('/api/jobs/:queue/:id/poll', async (c) => { const { queue: queueName, id } = c.req.param(); const queue = new Queue(queueName, { embedded: true }); const job = await queue.getJob(id); if (!job) { return c.json({ error: 'Not found' }, 404); } return c.json({ id: job.id, name: job.name, progress: job.progress, result: job.returnvalue ?? null, error: job.failedReason ?? null, }); }); // Graceful shutdown process.on('SIGINT', () => { shutdownManager(); process.exit(0); }); export default app; ``` -------------------------------- ### Install bunqueue Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/migration.md Instructions to remove BullMQ and Redis and install bunqueue. ```bash # Remove BullMQ and Redis bun remove bullmq ioredis # Install bunqueue bun add bunqueue ``` -------------------------------- ### Quickstart Example Source: https://github.com/egeominotti/bunqueue/blob/main/README.md Basic usage of Bunqueue to add and process jobs. ```typescript import { Bunqueue } from 'bunqueue/client'; const app = new Bunqueue('emails', { embedded: true, processor: async (job) => { console.log(`Sending to ${job.data.to}`); return { sent: true }; }, }); await app.add('send', { to: 'alice@example.com' }); ``` -------------------------------- ### CLI Server Start Command Source: https://github.com/egeominotti/bunqueue/blob/main/CLAUDE.md Example command to start the BunQueue server using the CLI. ```bash # Server bunqueue start --tcp-port 6789 --data-path ./data/queue.db ``` -------------------------------- ### Connect AI Agents (MCP) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Shows how to connect AI agents like Claude to Bunqueue using the MCP server, providing examples for both command-line and configuration file setup. ```bash # Claude Code claude mcp add bunqueue -- bunx bunqueue-mcp ``` ```json // Claude Desktop / Cursor / Windsurf { "mcpServers": { "bunqueue": { "command": "bunx", "args": ["bunqueue-mcp"] } } } ``` -------------------------------- ### Configuration File Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx An example of a bunqueue configuration file using TypeScript to define settings for storage, authentication, and backup. ```typescript import { defineConfig } from 'bunqueue'; export default defineConfig({ storage: { dataPath: './data/bunq.db' }, auth: { tokens: [process.env.AUTH_TOKEN!] }, backup: { enabled: true, bucket: 'my-backups' }, }); ``` -------------------------------- ### Create a Typed Queue Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Example of creating a typed queue for email jobs. ```typescript import { Queue } from 'bunqueue/client'; // Create a typed queue interface EmailJob { to: string; subject: string; body: string; } const emailQueue = new Queue('emails', { embedded: true }); ``` -------------------------------- ### Monitoring Setup Source: https://github.com/egeominotti/bunqueue/blob/main/README.md Command to start bunqueue with Prometheus and Grafana for monitoring. ```bash # Start with Prometheus + Grafana docker compose --profile monitoring up -d ``` -------------------------------- ### List and restore backups Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Command-line examples for listing and restoring bunqueue backups. ```bash # List backups first bunqueue backup list # Then restore by key bunqueue backup restore backups/bunq-2026-01-30T12:00:00.db --force ``` -------------------------------- ### Bunqueue client example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example of using the Queue and Worker classes from bunqueue/client to connect to a running bunqueue server. ```typescript import { Queue, Worker } from 'bunqueue/client'; // Connects to localhost:6789 by default const queue = new Queue('emails'); await queue.add('send', { to: 'user@example.com' }); // Worker also connects to the server const worker = new Worker('emails', async (job) => { await sendEmail(job.data); return { sent: true }; }); ``` -------------------------------- ### Start bunqueue server Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Starts the bunqueue server in server mode. ```bash bunqueue start --tcp-port 6789 --http-port 6790 ``` -------------------------------- ### Start bunqueue service Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Command to start the bunqueue service. ```bash systemctl start bunqueue ``` -------------------------------- ### Full Example: Producer and Consumer with Graceful Shutdown Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md A complete example showing both the producer (queue) and consumer (worker) in embedded mode, including graceful shutdown handling. ```typescript import { Queue, Worker, shutdownManager } from 'bunqueue/client'; interface EmailJob { to: string; subject: string; } // Producer - must have embedded: true const queue = new Queue('emails', { embedded: true }); // Add some jobs await queue.add('welcome', { to: 'new@user.com', subject: 'Welcome!' }); await queue.add('newsletter', { to: 'sub@user.com', subject: 'News' }); // Consumer - must have embedded: true const worker = new Worker('emails', async (job) => { console.log(`Sending ${job.data.subject} to ${job.data.to}`); await job.updateProgress(100); return { sent: true }; }, { embedded: true, concurrency: 3 }); worker.on('completed', (job) => { console.log(`✓ ${job.id}`); }); // Graceful shutdown process.on('SIGINT', async () => { await worker.close(); shutdownManager(); process.exit(0); }); ``` -------------------------------- ### Adding Job Options Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/getting-started-five-minutes.mdx Examples of adding jobs with options like priority, delay, retries, timeouts, and idempotency. ```typescript // High priority job - processed before lower priority jobs await queue.add('urgent-alert', data, { priority: 10 }); // Delayed job - processed after 30 seconds await queue.add('reminder', data, { delay: 30_000 }); // Custom retry configuration await queue.add('payment', data, { attempts: 5, backoff: { type: 'exponential', delay: 1000 }, timeout: 60_000, }); // Idempotent job - same jobId won't be added twice await queue.add('daily-report', data, { jobId: 'report-2024-01-15' }); ``` -------------------------------- ### Starting bunqueue with S3 Backup Enabled Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/s3-backup-recovery.mdx Example of starting the bunqueue server with S3 backup configuration passed as environment variables. ```bash S3_BACKUP_ENABLED=1 \ S3_BUCKET=my-backups \ S3_REGION=us-east-1 \ bunqueue start --data-path ./data/queue.db ``` -------------------------------- ### Environment Variables Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/production-deployment.mdx Example environment variables for configuring bunqueue in production. ```bash # Server TCP_PORT=6789 HTTP_PORT=6790 HOST=0.0.0.0 DATA_PATH=/var/lib/bunqueue/queue.db # Authentication AUTH_TOKENS=your-secret-token-here # Timeouts SHUTDOWN_TIMEOUT_MS=30000 WORKER_TIMEOUT_MS=30000 # S3 Backup S3_BACKUP_ENABLED=1 S3_BUCKET=my-bunqueue-backups S3_ACCESS_KEY_ID=your-key S3_SECRET_ACCESS_KEY=your-secret S3_REGION=us-east-1 S3_BACKUP_INTERVAL=21600000 # Every 6 hours S3_BACKUP_RETENTION=7 # Keep 7 days ``` -------------------------------- ### Point-in-Time Recovery Restore Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example of restoring from a WAL file backup. ```bash # Restore cp /backup/bunq.db* data/ ``` -------------------------------- ### Monitoring Jobs Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/getting-started-five-minutes.mdx Code examples for checking job counts, retrieving a specific job, and controlling the queue (pause/resume). ```typescript // Get job counts by state const counts = await queue.getJobCountsAsync(); console.log(counts); // { waiting: 10, active: 3, completed: 150, failed: 2, delayed: 5 } // Get a specific job const job = await queue.getJob('job-id-here'); console.log(job?.data, job?.progress); // Queue control await queue.pause(); // Stop processing await queue.resume(); // Resume processing ``` -------------------------------- ### Hono Basic Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/framework-integrations.mdx Integrate bunqueue with Hono for basic job queuing from HTTP handlers. ```typescript import { Hono } from 'hono'; import { Queue, Worker } from 'bunqueue/client'; // Create queue and worker const emailQueue = new Queue<{ to: string; subject: string }>('emails', { embedded: true, }); const worker = new Worker('emails', async (job) => { await sendEmail(job.data.to, job.data.subject); return { sent: true }; }, { embedded: true, concurrency: 5 }); // Create Hono app const app = new Hono(); app.post('/api/send-email', async (c) => { const { to, subject } = await c.req.json(); const job = await emailQueue.add('send', { to, subject }); return c.json({ jobId: job.id, status: 'queued' }); }); app.get('/api/job/:id', async (c) => { const job = await emailQueue.getJob(c.req.param('id')); if (!job) return c.json({ error: 'Not found' }, 404); return c.json({ id: job.id, state: await job.getState(), progress: job.progress, data: job.data, }); }); export default app; ``` -------------------------------- ### Kubernetes Probes Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example Kubernetes liveness and readiness probe configurations for bunqueue. ```yaml livenessProbe: httpGet: path: /healthz port: 6790 initialDelaySeconds: 5 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 6790 initialDelaySeconds: 5 periodSeconds: 5 ``` -------------------------------- ### Set authentication tokens Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example of setting authentication tokens for bunqueue. ```bash AUTH_TOKENS=token1,token2,token3 ``` -------------------------------- ### Run the Benchmark Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/benchmarks.mdx Commands to clone the repository, install dependencies, start the server, and run the benchmark. ```bash # Clone the repository git clone https://github.com/egeominotti/bunqueue.git cd bunqueue bun install # Start the bunqueue server (for TCP tests) bun run start & # Run the comprehensive benchmark bun run bench/comprehensive.ts ``` -------------------------------- ### Build bunqueue from source Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Commands to clone the bunqueue repository, install dependencies, and build a standalone executable. ```bash # Clone the repository git clone https://github.com/egeominotti/bunqueue.git cd bunqueue # Install dependencies bun install # Build standalone binary bun run build ``` -------------------------------- ### PM2 Configuration for Compiled Binary Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example PM2 ecosystem configuration file for deploying a compiled bunqueue binary. ```javascript // ecosystem.config.js module.exports = { apps: [{ name: 'bunqueue', script: '/usr/local/bin/bunqueue', // Compiled binary args: 'start', instances: 1, // Single instance only - no cluster mode exec_mode: 'fork', autorestart: true, watch: false, max_memory_restart: '512M', env: { NODE_ENV: 'production', DATA_PATH: '/var/lib/bunq.db', TCP_PORT: 6789, HTTP_PORT: 6790, }, error_file: '/var/log/error.log', out_file: '/var/log/out.log', log_date_format: 'YYYY-MM-DD HH:mm:ss Z', merge_logs: true, }] }; ``` -------------------------------- ### Elysia Basic Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/framework-integrations.mdx Integrate bunqueue with Elysia for basic task queuing from HTTP handlers. ```typescript import { Elysia } from 'elysia'; import { Queue, Worker } from 'bunqueue/client'; const taskQueue = new Queue<{ type: string; payload: unknown }>('tasks', { embedded: true, }); const worker = new Worker('tasks', async (job) => { switch (job.data.type) { case 'resize-image': return await resizeImage(job.data.payload); case 'generate-pdf': return await generatePdf(job.data.payload); default: throw new Error(`Unknown task type: ${job.data.type}`); } }, { embedded: true, concurrency: 3 }); const app = new Elysia() .post('/tasks', async ({ body }) => { const job = await taskQueue.add(body.type, body); return { jobId: job.id, status: 'queued' }; }) .get('/tasks/:id', async ({ params }) => { const job = await taskQueue.getJob(params.id); if (!job) return { error: 'Not found' }; return { id: job.id, state: await job.getState(), progress: job.progress, }; }) .get('/tasks/counts', async () => { return await taskQueue.getJobCountsAsync(); }) .listen(3000); ``` -------------------------------- ### Install bunqueue globally from build Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Commands to copy the built bunqueue executable to the system path for global access. ```bash # Copy to system path sudo cp dist/bunqueue /usr/local/bin/ # Verify installation bunqueue --version ``` -------------------------------- ### Persistence Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/examples.md Demonstrates how to configure bunqueue for data persistence using the `dataPath` option in embedded mode. ```typescript const queue = new Queue('tasks', { embedded: true, dataPath: './data/bunq.db' }); ``` -------------------------------- ### Full Example Source: https://github.com/egeominotti/bunqueue/blob/main/skills/bunqueue/SKILL.md A comprehensive example demonstrating the setup and usage of Bunqueue with various configurations including routes, concurrency, retries, circuit breakers, TTL, priority aging, deduplication, rate limiting, and DLQ. It also shows middleware usage, job triggering, cron jobs, and graceful shutdown. ```typescript import { Bunqueue, shutdownManager } from 'bunqueue/client'; const app = new Bunqueue('my-app', { embedded: true, routes: { 'process': async (job) => ({ id: job.data.payload, status: 'done' }), 'notify': async (job) => ({ sent: true }), 'alert': async (job) => ({ alerted: true }), }, concurrency: 10, retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' }, circuitBreaker: { threshold: 5, resetTimeout: 30000 }, ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } }, priorityAging: { interval: 60000, minAge: 300000, boost: 1 }, deduplication: { ttl: 5000 }, rateLimit: { max: 100, duration: 1000 }, dlq: { autoRetry: true, maxAge: 604800000 }, }); app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); app.trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) }) .trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data }); await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' }); await app.add('process', { payload: 'ORD-001' }); process.on('SIGINT', async () => { await app.close(); shutdownManager(); }); ``` -------------------------------- ### Basic Setup Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/elysia.md This example demonstrates how to set up a basic integration between Bunqueue and Elysia, including defining a typed job and adding it to the queue. ```typescript import { Elysia } from 'elysia'; import { Queue, Worker } from 'bunqueue/client'; // Create typed queues in embedded mode interface EmailJob { to: string; subject: string; body: string; } const emailQueue = new Queue('emails', { embedded: true }); const app = new Elysia() .post('/emails', async ({ body }) => { const job = await emailQueue.add('send', body as EmailJob); return { jobId: job.id, status: 'queued' }; }) .listen(3000); ``` -------------------------------- ### SandboxedWorker API Examples Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/worker.md Examples of using the SandboxedWorker API for starting, getting stats, and stopping. ```typescript // Start the worker pool worker.start(); // Get stats const stats = worker.getStats(); // { total: 4, busy: 2, idle: 2, restarts: 0 } // Graceful shutdown await worker.stop(); ``` -------------------------------- ### Run your application Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/getting-started-five-minutes.mdx Command to run the application file (e.g., app.ts) using bun. ```bash bun run app.ts ``` -------------------------------- ### Separate Worker Process - API Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example of the API server process in a separate worker setup, using bunqueue client to add tasks to a queue. ```typescript // api.ts - Your web server import { Queue } from 'bunqueue/client'; const queue = new Queue('tasks', { embedded: true }); app.post('/task', async (c) => { await queue.add('process', { data: '...' }); return c.json({ ok: true }); }); ``` -------------------------------- ### Systemd service management commands Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Commands to install, enable, start, and check the status of the bunqueue systemd service. ```bash # Install sudo systemctl daemon-reload sudo systemctl enable bunqueue sudo systemctl start bunqueue # Check status sudo systemctl status bunqueue sudo journalctl -u bunqueue -f ``` -------------------------------- ### Add Jobs to the Queue Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Demonstrates adding single jobs, jobs with options, and multiple jobs in a batch. ```typescript // Add a single job const job = await emailQueue.add('send-email', { to: 'user@example.com', subject: 'Welcome!', body: 'Thanks for signing up.' }); console.log(`Job created: ${job.id}`); // Add with options await emailQueue.add('send-email', data, { priority: 10, // Higher = processed first delay: 5000, // Wait 5 seconds before processing attempts: 3, // Retry up to 3 times backoff: 1000, // Wait 1 second between retries }); // Add multiple jobs (batch optimized) await emailQueue.addBulk([ { name: 'send-email', data: { to: 'a@test.com', subject: 'Hi', body: '...' } }, { name: 'send-email', data: { to: 'b@test.com', subject: 'Hi', body: '...' } }, ]); ``` -------------------------------- ### Start the Server Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/cli.md Start the bunqueue server with various options for ports, host binding, persistent storage, authentication, and configuration files. ```bash # Start with defaults (TCP: 6789, HTTP: 6790) bunqueue start # Custom ports bunqueue start --tcp-port 7000 --http-port 7001 # Bind to specific host bunqueue start --host 127.0.0.1 -p 6789 # With persistent storage bunqueue start --data-path ./data/production.db # With authentication AUTH_TOKENS=secret-token bunqueue start # With a config file bunqueue start --config ./bunqueue.config.ts ``` -------------------------------- ### Start Command Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/configuration.md Then start normally. ```bash bunqueue start ``` -------------------------------- ### HTTP Health Check Examples Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Examples of using curl to check bunqueue health endpoints. ```bash # HTTP health check (detailed) curl http://localhost:6790/health # {"ok":true,"status":"healthy","uptime":3600,"version":"2.5.7", # "queues":{"waiting":5,"active":2},"connections":{"ws":0,"sse":0}, # "memory":{"heapUsed":45,"heapTotal":64,"rss":128}} # Simple liveness probe curl http://localhost:6790/healthz # OK # Readiness probe curl http://localhost:6790/ready # {"ok":true,"ready":true} # Queue stats curl http://localhost:6790/stats # {"ok":true,"stats":{"waiting":5,"active":2,"completed":1000,"dlq":0}} # Prometheus metrics (text format) curl http://localhost:6790/prometheus ``` -------------------------------- ### Start the engine and run the workflow Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/workflow-engine.mdx This example shows how to initialize the workflow engine, register a defined workflow, and then execute it with specific input. ```typescript const engine = new Engine({ embedded: true }); engine.register(order); await engine.start(); const run = await engine.run('order-pipeline', { orderId: 'ORD-1' }); ``` -------------------------------- ### Client SDK Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/public/llms.txt Example demonstrating how to create a queue, add a job, and set up a worker using the bunqueue client SDK. ```typescript import { Queue, Worker } from 'bunqueue/client'; // Create a queue const queue = new Queue('emails'); // Add a job await queue.add('send-welcome', { to: 'user@example.com', subject: 'Welcome!' }); // Process jobs const worker = new Worker('emails', async (job) => { await sendEmail(job.data); return { sent: true }; }); ``` -------------------------------- ### Separate Worker Process - Worker Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx Example of a separate worker process that consumes tasks from a bunqueue queue. ```typescript // worker.ts - Separate process import { Worker } from 'bunqueue/client'; new Worker('tasks', async (job) => { // Heavy processing here return { done: true }; }, { embedded: true, concurrency: 10 }); console.log('Worker started'); ``` -------------------------------- ### Simple Mode (All-in-One) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Illustrates how to use `Bunqueue` in Simple Mode, which combines Queue and Worker functionalities with built-in support for routes, middleware, cron jobs, and job addition. ```typescript import { Bunqueue } from 'bunqueue/client'; const app = new Bunqueue('notifications', { embedded: true, routes: { 'send-email': async (job) => { await sendEmail(job.data.to); return { sent: true }; }, 'send-sms': async (job) => { await sendSMS(job.data.to); return { sent: true }; }, }, concurrency: 10, }); // Middleware (wraps every job) app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); // Cron jobs await app.cron('daily-report', '0 9 * * *', { type: 'summary' }); // Add jobs await app.add('send-email', { to: 'alice@example.com' }); // Graceful shutdown await app.close(); ``` -------------------------------- ### Full Example Source: https://github.com/egeominotti/bunqueue/blob/main/README.md A comprehensive example demonstrating the initialization of a Bunqueue instance with various advanced configurations and job handling. ```typescript import { Bunqueue, shutdownManager } from 'bunqueue/client'; const app = new Bunqueue<{ payload: string }>('my-app', { embedded: true, routes: { 'process': async (job) => ({ id: job.data.payload, status: 'done' }), 'notify': async (job) => ({ sent: true }), 'alert': async (job) => ({ alerted: true }), }, concurrency: 10, retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' }, circuitBreaker: { threshold: 5, resetTimeout: 30000 }, ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } }, priorityAging: { interval: 60000, minAge: 300000, boost: 1 }, deduplication: { ttl: 5000 }, rateLimit: { max: 100, duration: 1000 }, dlq: { autoRetry: true, maxAge: 604800000 }, }); app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); app .trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) }) .trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data }); await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' }); await app.add('process', { payload: 'ORD-001' }); process.on('SIGINT', async () => { await app.close(); shutdownManager(); }); ``` -------------------------------- ### Get job counts (async) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/queue.md Example of getting job counts asynchronously, which works with TCP connections. ```typescript // Get job counts (async - works with TCP) const counts = await queue.getJobCountsAsync(); ``` -------------------------------- ### Get job counts (sync) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/queue.md Example of getting job counts synchronously, which is only available in embedded mode. ```typescript // Get job counts (sync - embedded mode only) const counts = queue.getJobCounts(); // { waiting: 10, active: 2, completed: 100, failed: 3 } ``` -------------------------------- ### Persistence with SQLite Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Demonstrates how to configure Bunqueue for persistent job storage using SQLite by passing the `dataPath` option or setting the `DATA_PATH` environment variable. ```typescript import { Queue, Worker } from 'bunqueue/client'; // Option 1: Pass dataPath directly (recommended) const queue = new Queue('tasks', { embedded: true, dataPath: './data/bunqueue.db' }); const worker = new Worker('tasks', processor, { embedded: true, dataPath: './data/bunqueue.db' }); // Option 2: Environment variable // DATA_PATH=./data/bunqueue.db bun run app.ts ``` -------------------------------- ### Code Comments Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/contributing.md Examples of code comments for functions. ```typescript /** Brief description */ function simpleFunction() {} /** * Longer description for complex functions * @param input - Description * @returns Description */ function complexFunction(input: string): Result {} ``` -------------------------------- ### Workflow Engine Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Demonstrates the creation and execution of a multi-step workflow using Bunqueue's Workflow Engine, including defining steps, compensation logic, waiting for signals, and starting/signaling workflows. ```typescript import { Workflow, Engine } from 'bunqueue/workflow'; const flow = new Workflow('order-pipeline') .step('validate', async (ctx) => { const { orderId } = ctx.input as { orderId: string }; return { orderId }; }) .step('charge', async (ctx) => { return { txId: 'tx_123' }; }, { compensate: async () => { // Auto-rollback if a later step fails await refundPayment('tx_123'); }, }) .waitFor('manager-approval') // Pauses until signal received .step('ship', async (ctx) => { const approval = ctx.signals['manager-approval']; return { shipped: true }; }); const engine = new Engine({ embedded: true }); engine.register(flow); const run = await engine.start('order-pipeline', { orderId: 'ORD-1' }); // Later, when the manager approves: await engine.signal(run.id, 'manager-approval', { approved: true }); ``` -------------------------------- ### Embedded vs TCP Server Mode Mismatch Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Illustrates the correct and incorrect configurations for embedded and TCP server modes to avoid common errors. ```typescript // ✅ Correct - both embedded const queue = new Queue('tasks', { embedded: true }); const worker = new Worker('tasks', handler, { embedded: true }); // ✅ Correct - both TCP (server must be running) const queue = new Queue('tasks'); const worker = new Worker('tasks', handler); // ❌ Wrong - mixed modes = timeout error const queue = new Queue('tasks', { embedded: true }); const worker = new Worker('tasks', handler); // Missing embedded: true! ``` -------------------------------- ### Get counts grouped by priority (async) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/queue.md Example of getting job counts grouped by priority asynchronously (works with TCP). ```typescript // Async version const byPriority = await queue.getCountsPerPriorityAsync(); ``` -------------------------------- ### Get counts grouped by priority (sync) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/queue.md Example of getting job counts grouped by priority synchronously (embedded mode only). ```typescript // Get counts grouped by priority const byPriority = queue.getCountsPerPriority(); // { 0: 50, 10: 20, 100: 5 } ``` -------------------------------- ### Update Imports Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/migration.md Example of updating imports from BullMQ to bunqueue. ```typescript // Before (BullMQ) import { Queue, Worker, QueueEvents } from 'bullmq'; import Redis from 'ioredis'; const connection = new Redis(); const queue = new Queue('my-queue', { connection }); // After (bunqueue) import { Queue, Worker, QueueEvents } from 'bunqueue/client'; const queue = new Queue('my-queue', { embedded: true }); // No connection needed - uses in-process SQLite! ``` -------------------------------- ### Handle Worker Events Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Demonstrates how to listen for and handle various worker events like completion, failure, progress, and activation. ```typescript worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed:`, result); }); worker.on('failed', (job, error) => { console.error(`Job ${job.id} failed:`, error.message); }); worker.on('progress', (job, progress) => { console.log(`Job ${job.id} progress: ${progress}%`); }); worker.on('active', (job) => { console.log(`Job ${job.id} started`); }); ``` -------------------------------- ### Embedded Mode Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/deployment.mdx An example of embedding bunqueue within an application process using Node.js and Hono framework. It shows how to create a Queue and a Worker. ```typescript // app.ts import { Queue, Worker } from 'bunqueue/client'; // Your web framework (Hono, Elysia, Express...) const app = new Hono(); // Queue is embedded in the same process (uses DATA_PATH for persistence) const emailQueue = new Queue('emails', { embedded: true }); app.post('/send-email', async (c) => { const { to, subject } = await c.req.json(); await emailQueue.add('send', { to, subject }); return c.json({ queued: true }); }); // Worker runs in the same process new Worker('emails', async (job) => { await sendEmail(job.data); return { sent: true }; }, { embedded: true, concurrency: 5 }); export default app; ``` -------------------------------- ### Start the TCP server Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/getting-started-five-minutes.mdx Command to start the bunqueue server in TCP mode, specifying the port and data path. ```bash # Start the server bunqueue start --tcp-port 6789 --data-path ./data/queue.db ``` -------------------------------- ### Get a Job - Response (Not Found) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/api/http.md Example JSON response when a job is not found. ```json { "ok": false, "error": "Job not found" } ``` -------------------------------- ### JSON Output Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/cli.md Demonstrates using the --json flag to get output in JSON format and piping it to jq. ```bash bunqueue stats --json | jq '.jobs.waiting' ``` -------------------------------- ### Get a Job - Response (Success) Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/api/http.md Example JSON response when a job is found. ```json { "ok": true, "job": { "id": "019ce9d7-6983-7000-946f-48737be2b0f9", "queue": "emails", "data": {"to": "user@test.com"}, "priority": 0, "createdAt": 1700000000000, "runAt": 1700000000000, "startedAt": 1700000001000, "completedAt": null, "attempts": 1, "maxAttempts": 3, "backoff": 1000, "progress": 50, "tags": ["onboarding"], "lifo": false, "removeOnComplete": false, "removeOnFail": false } } ``` -------------------------------- ### Get a Job - Request Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/api/http.md Example cURL command to retrieve a job by its ID. ```bash curl http://localhost:6790/jobs/019ce9d7-6983-7000-946f-48737be2b0f9 ``` -------------------------------- ### Full Migration Example: BullMQ vs. BunQueue Source: https://github.com/egeominotti/bunqueue/blob/main/MIGRATION_FROM_BULLMQ.md A comprehensive side-by-side comparison of setting up queues, workers, event listeners, and scheduling jobs in BullMQ and BunQueue. ```typescript // ============ BEFORE (BullMQ) ============ import { Queue, QueueEvents } from 'your-bullmq-package'; const queueEvents = new QueueEvents(); queueEvents.on('start', async (queueName, jobHandlerId, jobId, processId) => { // mark sync as started in DB }); queueEvents.on('end', async (queueName, jobHandlerId, jobId, stdout, exitCode) => { // mark sync as completed in DB }); queueEvents.on('error', async (queueName, jobHandlerId, jobId, error, signalCode) => { // log error }); const productQueue = await Queue.upsert({ queueName: 'product', events: queueEvents, jobHandlerOptions: { bunPath: 'PATH_DOES_NOT_MATTER', concurrency: 2, handlersAmount: 2, processorPath: `${process.cwd()}/src/modules/product/index.sync.ts`, } }); await productQueue.add({ data: { ...customerJobData }, repeat: { pattern: '0 0 0 * * *' } }); // ============ AFTER (bunqueue) ============ import { Queue, SandboxedWorker, QueueEvents } from 'bunqueue/client'; const connection = { host: 'localhost', port: 6789 }; // 1. Create queue const productQueue = new Queue('product', { connection }); // 2. Create worker (replaces processorPath + handlersAmount) const productWorker = new SandboxedWorker('product', { processor: './src/modules/product/index.sync.ts', concurrency: 4, autoRestart: true, }); await productWorker.start(); // 3. Event listeners const productEvents = new QueueEvents('product'); productEvents.on('active', async ({ jobId }) => { // mark sync as started in DB }); productEvents.on('completed', async ({ jobId, returnvalue }) => { // mark sync as completed in DB }); productEvents.on('failed', async ({ jobId, failedReason }) => { // log error }); // 4. Schedule recurring job await productQueue.upsertJobScheduler('daily-product-sync', { pattern: '0 0 0 * * *' }, { name: 'sync', data: { ...customerJobData } } ); ``` -------------------------------- ### Update Events Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/migration.md Example showing that worker events are the same for both BullMQ and bunqueue. ```typescript // Same in both BullMQ and bunqueue worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed`); }); worker.on('failed', (job, err) => { console.error(`Job ${job.id} failed:`, err.message); }); worker.on('progress', (job, progress) => { console.log(`Job ${job.id}: ${progress}%`); }); ``` -------------------------------- ### Full Example Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/simple-mode.md A comprehensive example demonstrating various Bunqueue configurations and usage patterns. ```typescript import { Bunqueue, shutdownManager } from 'bunqueue/client'; const app = new Bunqueue<{ payload: string }>('my-app', { embedded: true, routes: { 'process': async (job) => ({ id: job.data.payload, status: 'done' }), 'notify': async (job) => ({ sent: true }), 'alert': async (job) => ({ alerted: true }), }, concurrency: 10, retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' }, circuitBreaker: { threshold: 5, resetTimeout: 30000 }, ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } }, priorityAging: { interval: 60000, minAge: 300000, boost: 1 }, deduplication: { ttl: 5000 }, rateLimit: { max: 100, duration: 1000 }, dlq: { autoRetry: true, maxAge: 604800000 }, }); app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); app .trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) }) .trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data }); await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' }); await app.add('process', { payload: 'ORD-001' }); process.on('SIGINT', async () => { await app.close(); shutdownManager(); }); ``` -------------------------------- ### Commit Messages Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/contributing.md Example commit messages following Conventional Commits. ```bash feat: add stall detection for workers fix: resolve memory leak in event listeners docs: update API reference refactor: simplify batch operations test: add DLQ filtering tests ``` -------------------------------- ### Real-World Example: Image Processing Pipeline Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/job-pipelines-flows.mdx An example of an image processing pipeline using Bunqueue, demonstrating the definition of a flow and the setup of workers for different queues. ```typescript import { FlowProducer, Worker } from 'bunqueue/client'; const flow = new FlowProducer({ embedded: true }); // Define the pipeline async function processImage(imageUrl: string) { return await flow.add({ name: 'update-cdn', queueName: 'cdn', data: { imageUrl }, children: [ { name: 'generate-thumbnails', queueName: 'images', data: { imageUrl, sizes: [100, 300, 800] }, children: [ { name: 'download-original', queueName: 'images', data: { imageUrl }, }, ], }, { name: 'extract-metadata', queueName: 'images', data: { imageUrl }, opts: { ignoreDependencyOnFailure: true }, }, ], }); } // Workers for each queue new Worker('images', async (job) => { switch (job.name) { case 'download-original': return await downloadImage(job.data.imageUrl); case 'generate-thumbnails': const original = await job.getChildrenValues(); return await createThumbnails(original, job.data.sizes); case 'extract-metadata': return await extractEXIF(job.data.imageUrl); } }, { embedded: true }); new Worker('cdn', async (job) => { const results = await job.getChildrenValues(); await uploadToCDN(results); return { published: true }; }, { embedded: true }); ``` -------------------------------- ### Test Structure Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/contributing.md Example structure for writing tests using a testing framework. ```typescript describe('Feature', () => { beforeEach(() => { // Setup }); afterEach(() => { // Cleanup }); it('should do something', () => { // Test }); }); ``` -------------------------------- ### Create a Worker to Process Jobs Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/guide/quickstart.md Example of creating a worker that processes email jobs, updates progress, logs messages, and returns a result. ```typescript import { Worker } from 'bunqueue/client'; const worker = new Worker('emails', async (job) => { console.log(`Processing: ${job.name}`); // Update progress await job.updateProgress(50, 'Sending email...'); // Do the work await sendEmail(job.data); // Log messages await job.log('Email sent successfully'); // Return a result return { sent: true, timestamp: Date.now() }; }, { embedded: true, // Required for embedded mode concurrency: 5, // Process 5 jobs in parallel }); ``` -------------------------------- ### Install Bun Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/faq.md Commands to install Bun on macOS, Linux, and Windows. ```bash # macOS/Linux curl -fsSL https://bun.sh/install | bash # Windows (PowerShell) powershell -c "irm bun.sh/install.ps1 | iex" # Homebrew brew install oven-sh/bun/bun ``` -------------------------------- ### Health Checks Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/blog/production-deployment.mdx Examples of how to perform health checks on bunqueue. ```bash # Basic health check curl http://localhost:6790/health # Returns: { "status": "ok", "uptime": 3600, "memory": {...} } # TCP ping (from your application) const queue = new Queue('test', { connection: { host: 'localhost', port: 6789 }, }); await queue.waitUntilReady(); // Pings the server ``` -------------------------------- ### Get Job State - Response Source: https://github.com/egeominotti/bunqueue/blob/main/docs/src/content/docs/api/http.md Example JSON response for retrieving a job's state. ```json { "ok": true, "id": "019ce9d7-...", "state": "active" } ``` -------------------------------- ### Install BunQueue Source: https://github.com/egeominotti/bunqueue/blob/main/MIGRATION_FROM_BULLMQ.md Command to install the bunqueue package using Bun. ```bash bun add bunqueue ```