### Bunqueue Server Output Example Source: https://bunqueue.dev/guide/cli Example output when starting the bunqueue server, showing version and listening ports. ```text bunqueue v2.1.8 TCP server listening on port 6789 HTTP server listening on port 6790 Database: ./data/bunq.db ``` -------------------------------- ### Clone Repository and Install Dependencies Source: https://bunqueue.dev/guide/comparison Clones the bunqueue repository and installs its dependencies using bun. This is the initial setup step for running the benchmark. ```bash # Clone the repository git clone https://github.com/egeominotti/bunqueue.git cd bunqueue bun install ``` -------------------------------- ### Full Bunqueue Application Setup Source: https://bunqueue.dev/guide/simple-mode A comprehensive example demonstrating the setup of a Bunqueue application with various configurations including routes, retry strategies, circuit breakers, TTL, priority aging, deduplication, rate limiting, and DLQ. ```typescript import { Bunqueue, shutdownManager } from 'bunqueue/client'; const app = new Bunqueue<{ payload: string }>('my-app', { embedded: true, routes: { 'process': async (job) => ({ id: job.data.payload, status: 'done' }), 'notify': async (job) => ({ sent: true }), 'alert': async (job) => ({ alerted: true }), }, concurrency: 10, retry: { maxAttempts: 3, delay: 1000, strategy: 'jitter' }, circuitBreaker: { threshold: 5, resetTimeout: 30000 }, ttl: { defaultTtl: 600000, perName: { 'verify-otp': 60000 } }, priorityAging: { interval: 60000, minAge: 300000, boost: 1 }, deduplication: { ttl: 5000 }, rateLimit: { max: 100, duration: 1000 }, dlq: { autoRetry: true, maxAge: 604800000 }, }); app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); app .trigger({ on: 'process', create: 'notify', data: (r) => ({ payload: r.id }) }) .trigger({ on: 'process', event: 'failed', create: 'alert', data: (_, j) => j.data }); await app.cron('cleanup', '0 2 * * *', { payload: 'nightly' }); await app.add('process', { payload: 'ORD-001' }); process.on('SIGINT', async () => { await app.close(); shutdownManager(); }); ``` -------------------------------- ### Start bunqueue Server and Check Version Source: https://bunqueue.dev/guide/installation Commands to start the bunqueue server and check its installed version. ```bash # Start server bunqueue start # Check version bunqueue --version ``` -------------------------------- ### Start bunqueue with default config Source: https://bunqueue.dev/guide/configuration After creating the configuration file, start the bunqueue server using the `bunqueue start` command. ```bash bunqueue start ``` -------------------------------- ### Start Application Source: https://bunqueue.dev/guide/benchmarks Starts the application using Bun. This should be run after Redis is active. ```bash bun run start & ``` -------------------------------- ### Start Bunqueue Server with Configuration File Source: https://bunqueue.dev/guide/cli Starts the bunqueue server using a configuration file for settings. ```bash bunqueue start --config ./bunqueue.config.ts ``` -------------------------------- ### Install bunqueue from source Source: https://bunqueue.dev/guide/installation Clone the repository, install dependencies, and build the project from its source code. ```bash git clone https://github.com/egeominotti/bunqueue.git cd bunqueue bun install bun run build ``` -------------------------------- ### Complete QueueGroup Example Source: https://bunqueue.dev/guide/queue-group This snippet demonstrates the full setup of QueueGroups, including creating groups, queues, workers, adding jobs, and handling graceful shutdowns. It shows how different queues within different groups can interact. ```typescript import { QueueGroup, shutdownManager } from 'bunqueue/client'; interface OrderData { orderId: string; amount: number; } interface NotificationData { userId: string; message: string; } // Create groups const orders = new QueueGroup('orders'); const notifications = new QueueGroup('notifications'); // Create queues const orderQueue = orders.getQueue('process', { embedded: true }); const emailQueue = notifications.getQueue('email', { embedded: true }); // Create workers const orderWorker = orders.getWorker('process', async (job) => { console.log(`Processing order: ${job.data.orderId}`); // Create notification after order await emailQueue.add('order-confirmation', { userId: 'user-123', message: `Order ${job.data.orderId} confirmed!`, }); return { processed: true }; }, { embedded: true, concurrency: 5 }); const emailWorker = notifications.getWorker('email', async (job) => { console.log(`Sending email to: ${job.data.userId}`); return { sent: true }; }, { embedded: true, concurrency: 3 }); // Add an order await orderQueue.add('new-order', { orderId: 'ORD-001', amount: 99.99 }); // Check queues in each group console.log('Order queues:', orders.listQueues()); console.log('Notification queues:', notifications.listQueues()); // Graceful shutdown process.on('SIGINT', async () => { await orderWorker.close(); await emailWorker.close(); shutdownManager(); process.exit(0); }); ``` -------------------------------- ### Install Bunqueue MCP Server Source: https://bunqueue.dev/guide/mcp Installs the bunqueue package which includes the bundled bunqueue-mcp binary. This is a one-time setup command. ```bash # one-time: bun add bunqueue (installs the bundled bunqueue-mcp binary) claude mcp add bunqueue -- bunx bunqueue-mcp ``` -------------------------------- ### Install Bunqueue Source: https://bunqueue.dev/guide/workflow Install the bunqueue package using bun. ```bash bun add bunqueue ``` -------------------------------- ### Development Environment Variables Source: https://bunqueue.dev/guide/env-vars Example environment variables for a development setup, including ports, data path, and log level. ```dotenv TCP_PORT=6789 HTTP_PORT=6790 DATA_PATH=./data/dev.db LOG_LEVEL=debug LOG_FORMAT=text ``` -------------------------------- ### Start Bunqueue Server with Defaults Source: https://bunqueue.dev/guide/cli Starts the bunqueue server using default TCP and HTTP ports. ```bash bunqueue start ``` -------------------------------- ### Install Bunqueue Source: https://bunqueue.dev/guide/migration Remove existing BullMQ and ioredis packages and install bunqueue. ```bash # Remove BullMQ and Redis bun remove bullmq ioredis # Install bunqueue bun add bunqueue ``` -------------------------------- ### Install Bunqueue MCP Server Source: https://bunqueue.dev/guide/quickstart Instructions for installing the bunqueue-mcp binary and its optional peer dependency for running the MCP server. This allows AI agents to interact with Bunqueue. ```bash # Claude Code — bunqueue-mcp is a binary bundled with bunqueue, so install it first bun add bunqueue bun add @modelcontextprotocol/sdk # optional peer dependency, required only for the MCP server claude mcp add bunqueue -- bunx bunqueue-mcp ``` -------------------------------- ### Start Bunqueue Server with Authentication Source: https://bunqueue.dev/guide/cli Starts the bunqueue server with authentication enabled using an environment variable for the token. ```bash AUTH_TOKENS=secret-token bunqueue start ``` -------------------------------- ### Start Redis Server Source: https://bunqueue.dev/guide/benchmarks Starts the Redis server in the background. This is a prerequisite for running the benchmarks. ```bash redis-server --daemonize yes ``` -------------------------------- ### Development Configuration Example Source: https://bunqueue.dev/guide/configuration An example configuration file tailored for development environments, enabling debug logging and specifying a development database path. ```typescript import { defineConfig } from 'bunqueue'; export default defineConfig({ server: { tcpPort: 6789, httpPort: 6790, }, storage: { dataPath: './data/dev.db', }, logging: { level: 'debug', }, }); ``` -------------------------------- ### Install Bunqueue Globally from Build Source: https://bunqueue.dev/guide/deployment Copy the compiled standalone Bunqueue binary to a system path for global access and verify the installation. ```bash # Copy to system path sudo cp dist/bunqueue /usr/local/bin/ # Verify installation bunqueue --version ``` -------------------------------- ### Check Bun Installation Source: https://bunqueue.dev/guide/mcp Verify that the Bun runtime is installed and accessible in your system's PATH. ```shell which bun ``` -------------------------------- ### Production Environment Variables Source: https://bunqueue.dev/guide/env-vars Example environment variables for a production setup, including ports, data path, log settings, authentication tokens, and S3 backup configuration. ```dotenv TCP_PORT=6789 HTTP_PORT=6790 DATA_PATH=/var/lib/production.db LOG_LEVEL=info LOG_FORMAT=json AUTH_TOKENS=prod-token-abc123,prod-token-xyz789 # S3 Backup S3_BACKUP_ENABLED=1 S3_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE S3_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY S3_BUCKET=company-bunqueue-backups S3_REGION=us-east-1 S3_BACKUP_INTERVAL=3600000 S3_BACKUP_RETENTION=30 S3_BACKUP_PREFIX=production/ ``` -------------------------------- ### Install Bunqueue MCP and SDK Source: https://bunqueue.dev/guide/cron Install Bunqueue and the optional ModelContextProtocol (MCP) SDK for AI agent cron management. The MCP server requires the SDK, while queue-only installs can skip it to reduce size. ```bash bun add bunqueue # bunqueue-mcp is a binary bundled with bunqueue bun add @modelcontextprotocol/sdk # optional peer dependency, required only for the MCP server claude mcp add bunqueue -- bunx bunqueue-mcp ``` -------------------------------- ### Install bunqueue and MCP SDK Source: https://bunqueue.dev/guide/use-cases Install the bunqueue package and the optional MCP SDK for running the MCP server. Use bunx to add bunqueue as an MCP tool. ```bash bun add bunqueue bun add @modelcontextprotocol/sdk # optional peer dependency, required only for the MCP server claude mcp add bunqueue -- bunx bunqueue-mcp ``` -------------------------------- ### Start Bunqueue Server for MCP Source: https://bunqueue.dev/guide/server Starts the Bunqueue server with data path persistence, preparing it for AI agent connections. ```bash # Start bunqueue server bunqueue start --data-path ./data/queue.db ``` -------------------------------- ### Start Server Source: https://bunqueue.dev/guide/elysia Starts the web server on port 3000. Logs a message to the console indicating the server is running. ```typescript // ============================================ // Start Server // ============================================ app.listen(3000, () => { console.log('Server running at http://localhost:3000'); }); ``` -------------------------------- ### Bunqueue Production Setup with Client and Worker Source: https://bunqueue.dev/guide/deployment Illustrates a full production setup using Bunqueue client and worker configurations. Includes job queuing, DLQ configuration, and worker settings for concurrency and error handling. ```typescript import { Queue, Worker } from 'bunqueue/client'; const queue = new Queue('production-tasks', { embedded: true, defaultJobOptions: { attempts: 5, backoff: 5000, removeOnComplete: true, } }); // Configure DLQ alerts queue.setDlqConfig({ maxEntries: 1000, maxAge: 7 * 24 * 60 * 60 * 1000, // 7 days }); // Regular jobs (buffered writes - high throughput) await queue.add('send-email', { to: 'user@example.com' }); // Critical jobs (immediate disk write - no data loss) await queue.add('process-payment', { orderId: '123' }, { durable: true }); // Worker with production settings new Worker('production-tasks', async (job) => { await job.updateProgress(0, 'Starting...'); try { const result = await processJob(job.data); await job.log(`Completed: ${JSON.stringify(result)}`); return result; } catch (error) { await job.log(`Error: ${error.message}`); throw error; } }, { embedded: true, concurrency: 10, heartbeatInterval: 5000, }); ``` -------------------------------- ### Start Bunqueue Service Source: https://bunqueue.dev/guide/deployment Command to start the Bunqueue service using systemctl. ```bash systemctl start bunqueue ``` -------------------------------- ### Start Bunqueue Server with Persistent Storage Source: https://bunqueue.dev/guide/cli Starts the bunqueue server with a specified data path for persistent storage. ```bash bunqueue start --data-path ./data/production.db ``` -------------------------------- ### Install bunqueue and run MCP binary (No Install) Source: https://bunqueue.dev/guide/mcp A one-liner command to fetch the latest bunqueue and run its MCP binary without a prior installation. Note that this command does not auto-install optional peer dependencies. ```bash bunx --package=bunqueue bunqueue-mcp ``` -------------------------------- ### Verify Bunqueue Build and Start Server Source: https://bunqueue.dev/guide/deployment Commands to verify the standalone Bunqueue binary after building from source. Includes checking the version, displaying help, and starting the server. ```bash # Check version ./dist/bunqueue --version # Show help ./dist/bunqueue --help # Start server ./dist/bunqueue start ``` -------------------------------- ### Install MCP SDK Source: https://bunqueue.dev/guide/mcp Install the optional peer dependency @modelcontextprotocol/sdk, which is required to run the MCP server. ```bash bun add @modelcontextprotocol/sdk ``` -------------------------------- ### Quick Start Configuration Source: https://bunqueue.dev/guide/configuration Create a `bunqueue.config.ts` file in your project root to define server and storage settings. This file is auto-discovered by bunqueue. ```typescript import { defineConfig } from 'bunqueue'; export default defineConfig({ server: { tcpPort: 6789, httpPort: 6790, }, storage: { dataPath: './data/queue.db', }, }); ``` -------------------------------- ### Start Bunqueue Server Source: https://bunqueue.dev/guide/comparison Starts the bunqueue server in the background. This is necessary for testing bunqueue's performance. ```bash # Start bunqueue server bun run start & ``` -------------------------------- ### Start Bunqueue Server Source: https://bunqueue.dev/guide/server Starts the Bunqueue server with default ports or custom configurations. Authentication can be enabled using environment variables. ```bash # Default ports (TCP: 6789, HTTP: 6790) bunqueue ``` ```bash # With custom configuration bunqueue start \ --tcp-port 6789 \ --http-port 6790 \ --data-path ./data/queue.db ``` ```bash # With authentication AUTH_TOKENS=secret1,secret2 bunqueue ``` -------------------------------- ### Verify Bunqueue Installation Source: https://bunqueue.dev/guide/mcp Check if Bunqueue is installed using `bun pm ls` and install it globally if necessary using `bun add -g bunqueue`. The `bunqueue-mcp` binary is bundled with the main package. ```bash # Verify bunqueue is installed (bunqueue-mcp is a binary bundled with it) bun pm ls | grep bunqueue || bun add -g bunqueue ``` -------------------------------- ### Start Bunqueue Server with Authentication for MCP Source: https://bunqueue.dev/guide/server Starts the Bunqueue server with authentication enabled via environment variables, allowing secure connections for AI agents. ```bash AUTH_TOKENS=my-secret bunqueue start ``` -------------------------------- ### Start Bunqueue Server on Specific Host and Port Source: https://bunqueue.dev/guide/cli Starts the bunqueue server, binding to a specific host and port. ```bash bunqueue start --host 127.0.0.1 -p 6789 ``` -------------------------------- ### Start Bunqueue Server with Custom Ports Source: https://bunqueue.dev/guide/cli Starts the bunqueue server specifying custom TCP and HTTP ports. ```bash bunqueue start --tcp-port 7000 --http-port 7001 ``` -------------------------------- ### Install Bunqueue Globally Source: https://bunqueue.dev/guide/mcp Install the bunqueue package globally using Bun's package manager. This makes the `bunqueue-mcp` binary available. ```shell bun add -g bunqueue ``` -------------------------------- ### Example Webhook Server with Hono Source: https://bunqueue.dev/guide/webhooks A complete example of a webhook server using Hono, compatible with Bun. It includes signature verification middleware and handlers for different event types. ```typescript import { Hono } from 'hono'; import { createHmac, timingSafeEqual } from 'crypto'; const app = new Hono(); // The secret you used when registering the webhook with --secret const MY_WEBHOOK_SECRET = process.env.MY_WEBHOOK_SECRET || 'your-secret'; // Signature verification middleware async function verifySignature(c: any, next: any) { const signature = c.req.header('x-webhook-signature'); const body = await c.req.text(); if (!signature) { return c.json({ error: 'Missing signature' }, 401); } const expected = createHmac('sha256', MY_WEBHOOK_SECRET) .update(body) .digest('hex'); const expectedBuf = Buffer.from(expected); const receivedBuf = Buffer.from(signature); if (expectedBuf.length !== receivedBuf.length || !timingSafeEqual(expectedBuf, receivedBuf)) { return c.json({ error: 'Invalid signature' }, 401); } // Store parsed body for handler c.set('webhookPayload', JSON.parse(body)); await next(); } // Webhook handler app.post('/webhooks/bunqueue', verifySignature, async (c) => { const payload = c.get('webhookPayload'); console.log(`Received ${payload.event} event for job ${payload.jobId}`); switch (payload.event) { case 'job.completed': console.log('Job completed:', payload.jobId); // Notify downstream service await notifyCompletion(payload); break; case 'job.failed': console.error('Job failed:', payload.error); // Alert on failure await sendAlert({ type: 'job_failed', jobId: payload.jobId, queue: payload.queue, error: payload.error, }); break; case 'job.progress': console.log(`Job ${payload.jobId}: ${payload.progress}%`); // Update UI or notify await broadcastProgress(payload); break; } return c.json({ received: true }); }); // Health check endpoint app.get('/health', (c) => c.json({ status: 'ok' })); export default { port: 3000, fetch: app.fetch, }; ``` -------------------------------- ### Build Bunqueue from Source Source: https://bunqueue.dev/guide/deployment Commands to clone the Bunqueue repository, install dependencies, and build a standalone executable binary for production deployment. ```bash # Clone the repository git clone https://github.com/egeominotti/bunqueue.git cd bunqueue # Install dependencies bun install # Build standalone binary bun run build ``` -------------------------------- ### Start Redis Server Source: https://bunqueue.dev/guide/comparison Starts the Redis server in detached mode. This is a prerequisite for running benchmarks or using BullMQ. ```bash # Start Redis (required for BullMQ) redis-server --daemonize yes ``` -------------------------------- ### Full Bunqueue Example (Embedded Mode) Source: https://bunqueue.dev/guide/quickstart A complete example demonstrating both producer (Queue) and consumer (Worker) in embedded mode, including graceful shutdown handling. ```javascript import { Queue, Worker, shutdownManager } from 'bunqueue/client'; interface EmailJob { to: string; subject: string; } // Producer - must have embedded: true const queue = new Queue('emails', { embedded: true }); // Add some jobs await queue.add('welcome', { to: 'new@user.com', subject: 'Welcome!' }); await queue.add('newsletter', { to: 'sub@user.com', subject: 'News' }); // Consumer - must have embedded: true const worker = new Worker('emails', async (job) => { console.log(`Sending ${job.data.subject} to ${job.data.to}`); await job.updateProgress(100); return { sent: true }; }, { embedded: true, concurrency: 3 }); worker.on('completed', (job) => { console.log(`✓ ${job.id}`); }); // Graceful shutdown process.on('SIGINT', async () => { await worker.close(); shutdownManager(); process.exit(0); }); ``` -------------------------------- ### Start Bunqueue Server for TCP Tests Source: https://bunqueue.dev/guide/benchmarks This command starts the Bunqueue server in the background, which is necessary for running TCP mode benchmarks. ```bash # Start the bunqueue server (for TCP tests) bun run start & ``` -------------------------------- ### Install bunqueue for AI Integration Source: https://bunqueue.dev/guide/mcp Install bunqueue locally for AI clients like Claude Code, or globally to make the bunqueue-mcp binary available on your PATH for other AI clients. ```bash bun add bunqueue # in your project — for Claude Code # or, for Claude Desktop / Cursor / Windsurf (they start the server outside your project): bun add -g bunqueue # global install, puts bunqueue-mcp on your PATH ``` -------------------------------- ### Install bunqueue using npm Source: https://bunqueue.dev/guide/installation Use this command to add bunqueue to your project via npm. ```bash bun add bunqueue ``` -------------------------------- ### Complete Bunqueue Worker Setup Source: https://bunqueue.dev/guide/worker This snippet shows the full setup for a Bunqueue worker, including job definition, queue and worker instantiation, job processing logic, event listeners for job status, and graceful shutdown handling. ```typescript import { Queue, Worker, shutdownManager } from 'bunqueue/client'; interface EmailJob { to: string; subject: string; body: string; } const queue = new Queue('emails', { embedded: true }); const worker = new Worker('emails', async (job) => { console.log(`Sending email to: ${job.data.to}`); await job.updateProgress(50, 'Composing email...'); await job.log(`Subject: ${job.data.subject}`); // Simulate sending await Bun.sleep(100); await job.updateProgress(100, 'Sent!'); return { sent: true, timestamp: Date.now() }; }, { embedded: true, concurrency: 5, heartbeatInterval: 5000, }); worker.on('completed', (job, result) => { console.log(`✓ Email sent: ${job.id}`); }); worker.on('failed', (job, error) => { console.error(`✗ Email failed: ${job.id} - ${error.message}`); }); // Graceful shutdown process.on('SIGINT', async () => { console.log('Shutting down...'); await worker.close(); shutdownManager(); process.exit(0); }); ``` -------------------------------- ### Start bunqueue Server Source: https://bunqueue.dev/guide/introduction Run bunqueue as a standalone server using the CLI. Specify the data path for SQLite persistence. ```bash # Start the server bunqueue start --data-path ./data/queue.db ``` -------------------------------- ### Complete Hono and Bunqueue Example Source: https://bunqueue.dev/guide/hono A comprehensive example integrating Hono with bunqueue, including middleware, multiple queues, job enqueuing, status polling, and graceful shutdown. All queues operate in embedded mode. ```typescript import { Hono } from 'hono'; import { cors } from 'hono/cors'; import { logger } from 'hono/logger'; import { Queue, Worker, shutdownManager } from 'bunqueue/client'; const app = new Hono(); // Middleware app.use('*', logger()); app.use('/api/*', cors()); // Queues (embedded mode) const queues = { emails: new Queue('emails', { embedded: true }), reports: new Queue('reports', { embedded: true }), webhooks: new Queue('webhooks', { embedded: true }), }; // Enqueue job app.post('/api/send-email', async (c) => { const { to, subject, template, data } = await c.req.json(); const job = await queues.emails.add('send', { to, subject, template, data, }, { attempts: 3, backoff: 5000, removeOnComplete: true, }); return c.json({ queued: true, jobId: job.id }); }); // Generate report (long-running task) app.post('/api/reports/generate', async (c) => { const { type, filters, format } = await c.req.json(); const job = await queues.reports.add('generate', { type, filters, format, requestedBy: c.req.header('X-User-ID'), }, { timeout: 300000, // 5 minutes priority: 10, }); return c.json({ jobId: job.id, statusUrl: `/api/jobs/reports/${job.id}`, }); }); // Poll job status app.get('/api/jobs/:queue/:id/poll', async (c) => { const { queue: queueName, id } = c.req.param(); const queue = new Queue(queueName, { embedded: true }); const job = await queue.getJob(id); if (!job) { return c.json({ error: 'Not found' }, 404); } return c.json({ id: job.id, name: job.name, progress: job.progress, result: job.returnvalue ?? null, error: job.failedReason ?? null, }); }); // Graceful shutdown process.on('SIGINT', () => { shutdownManager(); process.exit(0); }); export default app; ``` -------------------------------- ### Get JSON Output for Scripting Source: https://bunqueue.dev/guide/cli Shows how to use the `--json` flag to get machine-readable output, which can then be processed by tools like `jq` for scripting purposes. ```bash bunqueue stats --json | jq '.jobs.waiting' ``` -------------------------------- ### Bunqueue Real-World REST API Example Source: https://bunqueue.dev/guide/elysia This comprehensive example sets up multiple queues (emails, reports, webhooks) with persistence, defines workers for each, and integrates them into an Elysia API for job management and health checks. It includes DLQ configuration and event logging. ```typescript import { Elysia } from 'elysia'; import { Queue, Worker, shutdownManager } from 'bunqueue/client'; // ============================================ // Job Types // ============================================ interface EmailJob { to: string; subject: string; body: string; } interface ReportJob { type: 'daily' | 'weekly' | 'monthly'; userId: string; } interface WebhookJob { url: string; payload: Record; } // ============================================ // Queues (Embedded Mode with Persistence) // ============================================ const emailQueue = new Queue('emails', { embedded: true, dataPath: './data/app.db', defaultJobOptions: { attempts: 3, backoff: 1000, } }); const reportQueue = new Queue('reports', { embedded: true, dataPath: './data/app.db', defaultJobOptions: { attempts: 2, timeout: 60000, } }); const webhookQueue = new Queue('webhooks', { embedded: true, dataPath: './data/app.db', defaultJobOptions: { attempts: 5, backoff: 2000, } }); // Configure DLQ with auto-retry for webhooks webhookQueue.setDlqConfig({ autoRetry: true, autoRetryInterval: 300000, // 5 minutes maxAutoRetries: 3, }); // ============================================ // Workers // ============================================ const emailWorker = new Worker('emails', async (job) => { await job.updateProgress(10, 'Validating email...'); // Validate email format if (!job.data.to.includes('@')) { throw new Error('Invalid email address'); } await job.updateProgress(50, 'Sending email...'); await job.log(`Sending to: ${job.data.to}`); // Simulate sending await Bun.sleep(Math.random() * 500 + 100); await job.updateProgress(100, 'Sent!'); return { messageId: `msg-${Date.now()}`, sentAt: new Date().toISOString(), }; }, { embedded: true, concurrency: 3 }); const reportWorker = new Worker('reports', async (job) => { await job.log(`Generating ${job.data.type} report for ${job.data.userId}`); // Progress updates for (let i = 0; i <= 100; i += 20) { await job.updateProgress(i, `Processing... ${i}%`); await Bun.sleep(100); } return { reportUrl: `/reports/${job.data.type}-${job.data.userId}.pdf`, generatedAt: new Date().toISOString(), }; }, { embedded: true, concurrency: 2 }); const webhookWorker = new Worker('webhooks', async (job) => { await job.log(`Calling webhook: ${job.data.url}`); // Actual HTTP call const response = await fetch(job.data.url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(job.data.payload), }); if (!response.ok) { throw new Error(`HTTP ${response.status}`); } return { status: response.status, deliveredAt: new Date().toISOString(), }; }, { embedded: true, concurrency: 5 }); // ============================================ // Event Logging // ============================================ emailWorker.on('completed', (job, result) => { console.log(`Email sent: ${job.data.to}`); }); emailWorker.on('failed', (job, err) => { console.log(`Email failed: ${job.data.to} - ${err.message}`); }); reportWorker.on('completed', (job, result) => { console.log(`Report ready: ${result.reportUrl}`); }); webhookWorker.on('failed', (job, err) => { console.log(`Webhook failed: ${job.data.url} - ${err.message}`); }); // ============================================ // Elysia API // ============================================ const app = new Elysia() // Health check with queue stats .get('/health', () => ({ status: 'ok', queues: { emails: emailQueue.getJobCounts(), reports: reportQueue.getJobCounts(), webhooks: webhookQueue.getJobCounts(), }, })) // ---- Email Jobs ---- .post('/emails', async ({ body }) => { const { to, subject, body: content } = body as EmailJob; const job = await emailQueue.add('send', { to, subject, body: content }); return { jobId: job.id, status: 'queued' }; }) .post('/emails/priority', async ({ body }) => { const { to, subject, body: content } = body as EmailJob; const job = await emailQueue.add('send', { to, subject, body: content }, { priority: 10, }); return { jobId: job.id, status: 'queued', priority: 'high' }; }) .post('/emails/scheduled', async ({ body }) => { const { to, subject, body: content, delayMs } = body as EmailJob & { delayMs: number }; const job = await emailQueue.add('send', { to, subject, body: content }, { delay: delayMs || 5000, }); return { jobId: job.id, status: 'scheduled', ``` -------------------------------- ### Get Failed Children Values Source: https://bunqueue.dev/guide/flow Example of using `job.getFailedChildrenValues()` within a worker to retrieve error messages from children that failed with `continueParentOnFailure: true`. ```javascript const failed = await job.getFailedChildrenValues(); // { 'workers:job-abc': 'Error: timeout', 'api:job-xyz': 'Error: 503' } ``` -------------------------------- ### Configure Bunqueue Logging (CLI) Source: https://bunqueue.dev/guide/monitoring Set the log level and format using environment variables when starting Bunqueue with `bun run`. ```bash LOG_LEVEL=debug bun run src/main.ts # debug, info, warn, error LOG_FORMAT=json bun run src/main.ts # structured JSON output ``` -------------------------------- ### Get Jobs with Filtering (TCP Mode) Source: https://bunqueue.dev/guide/queue Asynchronously retrieve a list of jobs based on specified filters, such as state, start, and end indices, when connected via TCP. ```javascript // Get jobs with filtering (async - works with TCP) const jobs = await queue.getJobsAsync({ state: 'failed', start: 0, end: 50 }); ``` -------------------------------- ### Complete FlowProducer Example Source: https://bunqueue.dev/guide/flow Demonstrates setting up queues, a FlowProducer, a worker, and adding a job chain. Includes signal handling for graceful shutdown. ```typescript import { FlowProducer, Worker, Queue, shutdownManager } from 'bunqueue/client'; // Create queues const pipelineQueue = new Queue('pipeline', { embedded: true }); // Create flow producer const flow = new FlowProducer({ embedded: true }); // Create worker const worker = new Worker('pipeline', async (job) => { console.log(`Processing ${job.data.name || job.name}`); if (job.name === 'fetch') { // Simulate API call return { data: [1, 2, 3] }; } if (job.name === 'process') { // Access parent result const fetchResult = flow.getParentResult(job.data.__flowParentId); return { processed: fetchResult.data.map(x => x * 2) }; } if (job.name === 'store') { const processResult = flow.getParentResult(job.data.__flowParentId); console.log('Storing:', processResult.processed); return { stored: true }; } return {}; }, { embedded: true, concurrency: 3 }); // Add a pipeline const { jobIds } = await flow.addChain([ { name: 'fetch', queueName: 'pipeline', data: {} }, { name: 'process', queueName: 'pipeline', data: {} }, { name: 'store', queueName: 'pipeline', data: {} }, ]); console.log('Pipeline started with jobs:', jobIds); // Cleanup process.on('SIGINT', async () => { await worker.close(); shutdownManager(); process.exit(0); }); ``` -------------------------------- ### Get Jobs with Filtering (Embedded Mode) Source: https://bunqueue.dev/guide/queue Synchronously retrieve a list of jobs based on specified filters, such as state, start, and end indices. This is available only in embedded mode. ```javascript // Get jobs with filtering (sync - embedded mode only) const jobs = queue.getJobs({ state: 'waiting', start: 0, end: 10 }); ``` -------------------------------- ### engine.start Source: https://bunqueue.dev/guide/workflow Starts a new execution of a named workflow with optional input. Returns a promise that resolves with run handle information. ```APIDOC ## engine.start(name, input?) ### Description Starts a new execution of a workflow. Returns a promise containing the execution ID and workflow name. ### Method `engine.start` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body * **name** (string) - Required - The name of the workflow to start. * **input** (object) - Optional - The input data for the workflow execution. ### Request Example ```json { "name": "order-pipeline", "input": { "orderId": "ORD-1", "amount": 99.99 } } ``` ### Response #### Success Response (200) * **Promise** - A promise that resolves with an object containing: * **id** (string) - The unique ID of the execution. * **workflowName** (string) - The name of the workflow that was started. ### Response Example ```json { "id": "exec_abc123", "workflowName": "order-pipeline" } ``` ``` -------------------------------- ### Get Jobs by State (Sync) Source: https://bunqueue.dev/guide/queue Synchronously retrieve jobs for specific states like 'waiting', 'active', 'completed', 'failed', and 'delayed' in embedded mode. Specify start and end indices for pagination. ```javascript // Sync (embedded mode only) const waiting = queue.getWaiting(0, 10); const active = queue.getActive(0, 10); const completed = queue.getCompleted(0, 10); const failed = queue.getFailed(0, 10); const delayed = queue.getDelayed(0, 10); ``` -------------------------------- ### PM2 Basic Commands Source: https://bunqueue.dev/guide/deployment Essential PM2 commands for managing Bunqueue processes. Use these to start, stop, restart, and monitor your application. ```bash # Start pm2 start ecosystem.config.js # Restart pm2 restart bunqueue # Stop pm2 stop bunqueue # View logs pm2 logs bunqueue # Monitor pm2 monit # Save process list for startup pm2 save pm2 startup ``` -------------------------------- ### Get Jobs by State (Async) Source: https://bunqueue.dev/guide/queue Asynchronously retrieve jobs for specific states like 'waiting', 'active', 'completed', 'failed', and 'delayed' when connected via TCP. Specify start and end indices for pagination. ```javascript // Async (works with TCP) const waiting = await queue.getWaitingAsync(0, 10); const active = await queue.getActiveAsync(0, 10); const completed = await queue.getCompletedAsync(0, 10); const failed = await queue.getFailedAsync(0, 10); const delayed = await queue.getDelayedAsync(0, 10); ``` -------------------------------- ### Register an HTTP Handler Source: https://bunqueue.dev/guide/mcp Registers an HTTP handler for a specified queue. The embedded Worker will immediately start processing jobs by making GET requests to the provided URL. Ensure the API key is correctly set. ```bash $ bunqueue_register_handler \ queue: "meteo" \ method: GET \ url: "https://api.openweathermap.org/data/2.5/weather?q=Milan&appid=xxx" ``` -------------------------------- ### Project Structure Example Source: https://bunqueue.dev/guide/integrations Illustrates a recommended project structure for organizing bunqueue-related files within a Bun application, separating API routes, queue definitions, and workers. ```tree src/ ├── api/ │ ├── routes/ │ │ ├── emails.ts │ │ └── reports.ts │ └── index.ts ├── queues/ │ ├── definitions.ts # Queue instances │ └── index.ts ├── workers/ │ ├── email.worker.ts │ ├── report.worker.ts │ └── index.ts └── index.ts # Entry point ``` -------------------------------- ### Get Per-Queue Statistics Source: https://bunqueue.dev/guide/mcp Use `bunqueue_get_queue_stats` to get statistics for all queues. ```bash bunqueue_get_queue_stats ``` -------------------------------- ### Basic Workflow Example Source: https://bunqueue.dev/guide/workflow Illustrates a simple linear workflow with compensation handlers for each step. This demonstrates the core chaining and compensation pattern. ```typescript const workflow = new Workflow({ steps: [ step('validate') .compensate(async () => console.log('release stock')) .run(async () => console.log('validating...')) .next(step('reserve stock') .compensate(async () => console.log('release stock')) .run(async () => console.log('reserving stock...')) .next(step('charge payment') .compensate(async () => console.log('refund payment')) .run(async () => console.log('charging payment...')) .next(step('send confirmation') .run(async () => console.log('sending confirmation...')) ) ) ) ] }); // To run the workflow: // await engine.run(workflow); ``` -------------------------------- ### Create a Backup Source: https://bunqueue.dev/guide/cli Initiates an immediate backup of the current data. The output shows the progress and completion status of the backup, including the destination and size. ```bash bunqueue backup now ``` -------------------------------- ### Bunqueue Workflow Engine Example Source: https://bunqueue.dev/guide/quickstart Demonstrates creating and running a multi-step workflow with the Bunqueue Workflow Engine. Includes defining steps, compensation logic, and handling signals for human-in-the-loop processes. ```typescript import { Workflow, Engine } from 'bunqueue/workflow'; const flow = new Workflow('order-pipeline') .step('validate', async (ctx) => { const { orderId } = ctx.input as { orderId: string }; return { orderId }; }) .step('charge', async (ctx) => { return { txId: 'tx_123' }; }, { compensate: async () => { // Auto-rollback if a later step fails await refundPayment('tx_123'); }, }) .waitFor('manager-approval') // Pauses until signal received .step('ship', async (ctx) => { const approval = ctx.signals['manager-approval']; return { shipped: true }; }); const engine = new Engine({ embedded: true }); engine.register(flow); const run = await engine.start('order-pipeline', { orderId: 'ORD-1' }); // Later, when the manager approves: await engine.signal(run.id, 'manager-approval', { approved: true }); ``` -------------------------------- ### Start Bunqueue Monitoring Stack with Docker Compose Source: https://bunqueue.dev/guide/monitoring Use this command to quickly set up bunqueue along with Prometheus and Grafana for monitoring. Access Grafana at http://localhost:3000 and Prometheus at http://localhost:9090. ```bash # Start bunqueue + Prometheus + Grafana docker compose --profile monitoring up -d ``` -------------------------------- ### Bunqueue Simple Mode Example Source: https://bunqueue.dev/guide/quickstart Demonstrates setting up a Bunqueue instance in Simple Mode with routes, middleware, cron jobs, and adding tasks. This mode wraps Queue and Worker in a single object. ```typescript import { Bunqueue } from 'bunqueue/client'; const app = new Bunqueue('notifications', { embedded: true, routes: { 'send-email': async (job) => { await sendEmail(job.data.to); return { sent: true }; }, 'send-sms': async (job) => { await sendSMS(job.data.to); return { sent: true }; }, }, concurrency: 10, }); // Middleware (wraps every job) app.use(async (job, next) => { const start = Date.now(); const result = await next(); console.log(`${job.name}: ${Date.now() - start}ms`); return result; }); // Cron jobs await app.cron('daily-report', '0 9 * * *', { type: 'summary' }); // Add jobs await app.add('send-email', { to: 'alice@example.com' }); // Graceful shutdown await app.close(); ``` -------------------------------- ### Get Job Counts Per Priority Source: https://bunqueue.dev/guide/mcp Use `bunqueue_get_counts_per_priority` to get job counts broken down by priority level. ```bash bunqueue_get_counts_per_priority ``` -------------------------------- ### Docker Run Command Source: https://bunqueue.dev/guide/server Builds a Docker image for Bunqueue and runs it, mapping ports and mounting a volume for data persistence. ```bash docker build -t bunqueue . docker run -p 6789:6789 -p 6790:6790 \ -v ./data:/app/data \ -e DATA_PATH=/app/data/queue.db \ bunqueue ``` -------------------------------- ### Get Ignored Children Failures Source: https://bunqueue.dev/guide/flow Demonstrates using `job.getIgnoredChildrenFailures()` to get a record of failure reasons for children that failed with `ignoreDependencyOnFailure: true`. ```javascript const ignored = await job.getIgnoredChildrenFailures(); // { 'enrichment:job-123': 'Error: API unavailable' } ``` -------------------------------- ### Run Bunqueue MCP Manually for Debugging Source: https://bunqueue.dev/guide/mcp Execute the bunqueue-mcp binary directly to check for startup errors. A '404' error indicates that 'bunqueue' is not installed. ```shell bunx bunqueue-mcp ``` -------------------------------- ### Bunqueue Server Configuration File Source: https://bunqueue.dev/guide/mcp Example of a bunqueue server configuration file using `defineConfig`. This allows for centralized configuration of server settings, authentication, storage, and cloud integration. ```typescript import { defineConfig } from 'bunqueue'; export default defineConfig({ server: { tcpPort: 6789 }, auth: { tokens: ['your-auth-token'] }, storage: { dataPath: './data/queue.db' }, cloud: { url: 'https://cloud.bunqueue.io', apiKey: process.env.BUNQUEUE_CLOUD_API_KEY, }, }); ``` -------------------------------- ### Connect to Remote Bunqueue Server (Claude Code Example) Source: https://bunqueue.dev/guide/mcp Example of connecting to a remote bunqueue server via TCP using Claude Code. ```typescript import { Queue } from "bullmq"; const queue = new Queue("jobs", "redis://localhost:6379"); async function example() { await queue.add("data", { foo: "bar" }); console.log("Job added"); await queue.close(); } example(); ``` -------------------------------- ### Example Bunqueue Prometheus Metrics Output Source: https://bunqueue.dev/guide/monitoring This is an example of the output you can expect from the bunqueue Prometheus metrics endpoint, including job counts, queue breakdowns, and latency histograms. ```text # HELP bunqueue_jobs_waiting Number of jobs waiting in queue # TYPE bunqueue_jobs_waiting gauge bunqueue_jobs_waiting 42 # HELP bunqueue_jobs_active Number of jobs being processed # TYPE bunqueue_jobs_active gauge bunqueue_jobs_active 8 # HELP bunqueue_jobs_pushed_total Total jobs pushed # TYPE bunqueue_jobs_pushed_total counter bunqueue_jobs_pushed_total 150432 # Per-queue breakdown bunqueue_queue_jobs_waiting{queue="emails"} 30 bunqueue_queue_jobs_waiting{queue="payments"} 12 bunqueue_queue_jobs_active{queue="emails"} 5 # Latency histograms # HELP bunqueue_push_duration_ms Push operation latency in ms # TYPE bunqueue_push_duration_ms histogram bunqueue_push_duration_ms_bucket{le="0.1"} 120 bunqueue_push_duration_ms_bucket{le="0.5"} 145000 bunqueue_push_duration_ms_bucket{le="1"} 150000 bunqueue_push_duration_ms_bucket{le="+Inf"} 150432 bunqueue_push_duration_ms_sum 12045.3 bunqueue_push_duration_ms_count 150432 ``` -------------------------------- ### Run Comprehensive Benchmark Source: https://bunqueue.dev/guide/benchmarks This command executes the comprehensive benchmark suite using Bun. ```bash # Run the comprehensive benchmark bun run bench/comprehensive.ts ``` -------------------------------- ### doWhile Loop Example Source: https://bunqueue.dev/guide/workflow Implement a doWhile loop to execute steps as long as a condition remains true. This example processes items from a queue while there are remaining items. The condition is checked before each execution. ```typescript // doWhile: process items while queue has items const batchFlow = new Workflow('batch') .doWhile( (ctx) => { const remaining = (ctx.steps['process'] as { remaining: number })?.remaining ?? 10; return remaining > 0; }, (w) => w.step('process', async (ctx) => { const batch = await fetchNextBatch(); await processBatch(batch); return { remaining: await getQueueSize() }; }), ); ``` -------------------------------- ### Set Authentication Token via Environment Variable Source: https://bunqueue.dev/guide/cli Demonstrates how to set the authentication token using environment variables for persistent use across commands. This avoids needing to specify the token with each command. ```bash # Set once, use everywhere export BQ_TOKEN=my-secret-token bunqueue stats bunqueue push emails '{"to":"user@example.com"}' bunqueue queue pause emails ```