Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Theme
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Create API Key
Add Docs
Convex Workpool
https://github.com/get-convex/workpool
Admin
Convex Workpool is a Convex component that manages and limits the parallel execution of actions and
...
Tokens:
10,333
Snippets:
107
Trust Score:
9.9
Update:
1 week ago
Context
Skills
Chat
Benchmark
95.5
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# @convex-dev/workpool Convex Workpool is a component for managing asynchronous work with controlled parallelism, retry logic, and completion callbacks. It enables you to pool actions and mutations to restrict parallel requests, preventing resource contention between high-priority and low-priority workloads while providing durable, reliable workflow execution. The component allows you to configure multiple pools with different parallelism levels, automatically retry failed actions with exponential backoff and jitter for idempotent operations, and use `onComplete` callbacks to build durable workflows. It stores work status in the database, enabling reactive queries to power real-time UIs. Workpool effectively runs async functions similar to `ctx.scheduler.runAfter(0, ...)` but with built-in throttling and retry management. ## Installation and Configuration Install the Workpool component and configure it in your Convex application's config file. ```bash npm install @convex-dev/workpool ``` ```typescript // convex/convex.config.ts import { defineApp } from "convex/server"; import workpool from "@convex-dev/workpool/convex.config"; const app = defineApp(); // Create separate pools for different workloads app.use(workpool, { name: "emailWorkpool" }); app.use(workpool, { name: "scrapeWorkpool" }); app.use(workpool, { name: "serializedPool" }); export default app; ``` ## Workpool Class Constructor The Workpool class initializes a work pool with specified parallelism and retry options. Each pool instance in your code must correspond to a separate component instance defined in `convex.config.ts`. ```typescript import { Workpool } from "@convex-dev/workpool"; import { components } from "./_generated/api"; // High-priority email pool with moderate parallelism const emailPool = new Workpool(components.emailWorkpool, { maxParallelism: 10, logLevel: "INFO", }); // Low-priority scraping pool with limited parallelism const scrapePool = new Workpool(components.scrapeWorkpool, { maxParallelism: 5, retryActionsByDefault: true, defaultRetryBehavior: { maxAttempts: 3, initialBackoffMs: 1000, base: 2, }, logLevel: "INFO", }); // Serialized pool for operations that conflict with themselves const counterPool = new Workpool(components.serializedPool, { maxParallelism: 1, }); ``` ## enqueueAction Enqueues an action to be run within the pool's parallelism constraints. Returns a WorkId that can be used to check status or cancel the work. Supports retry configuration, scheduled execution, and completion callbacks. ```typescript import { mutation, internalAction } from "./_generated/server"; import { internal } from "./_generated/api"; import { Workpool, WorkId, vWorkId } from "@convex-dev/workpool"; import { v } from "convex/values"; const pool = new Workpool(components.workpool, { maxParallelism: 10, retryActionsByDefault: true, defaultRetryBehavior: { maxAttempts: 3, initialBackoffMs: 250, base: 2 }, }); export const sendVerificationEmail = internalAction({ args: { userId: v.id("users"), email: v.string() }, handler: async (ctx, { userId, email }) => { // Call email provider API const response = await fetch("https://api.emailprovider.com/send", { method: "POST", body: JSON.stringify({ to: email, template: "verification" }), }); if (!response.ok) throw new Error("Email failed"); return { sent: true, timestamp: Date.now() }; }, }); export const userSignUp = mutation({ args: { email: v.string(), name: v.string() }, handler: async (ctx, { email, name }): Promise<WorkId> => { // Create user in database const userId = await ctx.db.insert("users", { email, name }); // Enqueue email with retry and completion callback const workId = await pool.enqueueAction( ctx, internal.auth.sendVerificationEmail, { userId, email }, { retry: true, // Uses defaultRetryBehavior onComplete: internal.auth.emailSent, context: { userId, emailType: "verification" }, runAfter: 1000, // Optional: delay by 1 second } ); return workId; }, }); ``` ## enqueueActionBatch Enqueues multiple actions at once, reducing overhead and database conflicts when scheduling many jobs. Each action runs independently with its own completion callback invocation. ```typescript import { internalAction } from "./_generated/server"; import { internal } from "./_generated/api"; import { Workpool } from "@convex-dev/workpool"; const scrapePool = new Workpool(components.scrapeWorkpool, { maxParallelism: 5, retryActionsByDefault: true, }); export const scrapeCity = internalAction({ args: { city: v.string() }, handler: async (ctx, { city }) => { const response = await fetch(`https://api.weather.com/data?city=${city}`); const data = await response.json(); await ctx.runMutation(internal.weather.saveData, { city, data }); return data; }, }); export const downloadAllWeather = internalAction({ args: {}, handler: async (ctx) => { const cities = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix"]; // Batch enqueue is more efficient than individual enqueue calls const workIds = await scrapePool.enqueueActionBatch( ctx, internal.weather.scrapeCity, cities.map(city => ({ city })), { onComplete: internal.weather.scrapingComplete, context: { batchStarted: Date.now() }, } ); console.log(`Enqueued ${workIds.length} scraping jobs`); return workIds; }, }); ``` ## enqueueMutation Enqueues a mutation to be run within the pool. Mutations are not retried by the workpool since Convex automatically handles database conflicts and transient failures. Useful for serializing writes to avoid OCC errors. ```typescript import { action, internalMutation } from "./_generated/server"; import { internal } from "./_generated/api"; import { Workpool } from "@convex-dev/workpool"; // Use maxParallelism: 1 to serialize mutations that conflict const counterPool = new Workpool(components.counterPool, { maxParallelism: 1, }); export const increment = internalMutation({ args: {}, handler: async (ctx) => { const countDoc = await ctx.db.query("counter").unique(); if (countDoc) { await ctx.db.patch(countDoc._id, { count: countDoc.count + 1 }); } else { await ctx.db.insert("counter", { count: 1 }); } }, }); export const doWorkAndCount = action({ args: {}, handler: async (ctx) => { // Do some external work const result = await fetch("https://api.example.com/process"); // Enqueue mutation to increment counter without OCC conflicts await counterPool.enqueueMutation(ctx, internal.counter.increment, {}); return result.json(); }, }); ``` ## enqueueQuery Enqueues a query to be run within the pool. The query executes in a mutation context, so results can be passed to completion handlers. Useful in workflow scenarios where you need to read data as part of a sequence. ```typescript import { mutation, query, internalMutation } from "./_generated/server"; import { api, internal } from "./_generated/api"; import { Workpool, vOnCompleteArgs } from "@convex-dev/workpool"; import { v } from "convex/values"; const pool = new Workpool(components.workpool, { maxParallelism: 10 }); export const getOrderTotal = query({ args: { orderId: v.id("orders") }, handler: async (ctx, { orderId }) => { const order = await ctx.db.get(orderId); const items = await ctx.db .query("orderItems") .withIndex("by_order", q => q.eq("orderId", orderId)) .collect(); return items.reduce((sum, item) => sum + item.price * item.quantity, 0); }, }); export const processOrderWorkflow = mutation({ args: { orderId: v.id("orders") }, handler: async (ctx, { orderId }) => { // Enqueue query as part of workflow const workId = await pool.enqueueQuery( ctx, api.orders.getOrderTotal, { orderId }, { onComplete: internal.orders.handleTotalCalculated, context: { orderId }, } ); return workId; }, }); export const handleTotalCalculated = internalMutation({ args: vOnCompleteArgs(v.object({ orderId: v.id("orders") })), handler: async (ctx, { workId, context, result }) => { if (result.kind === "success") { const total = result.returnValue as number; await ctx.db.patch(context.orderId, { calculatedTotal: total }); } }, }); ``` ## status Gets the current status of a work item. Returns pending, running, or finished state. Useful for building reactive UIs that show job progress. ```typescript import { query } from "./_generated/server"; import { Workpool, vWorkId, WorkId } from "@convex-dev/workpool"; import { v } from "convex/values"; const pool = new Workpool(components.workpool, { maxParallelism: 10 }); export const getJobStatus = query({ args: { workId: vWorkId }, handler: async (ctx, { workId }) => { const status = await pool.status(ctx, workId); // Status is one of: // { state: "pending", previousAttempts: number } // { state: "running", previousAttempts: number } // { state: "finished" } return status; }, }); // Get status for multiple jobs at once export const getBatchStatus = query({ args: { workIds: v.array(vWorkId) }, handler: async (ctx, { workIds }) => { const statuses = await pool.statusBatch(ctx, workIds as WorkId[]); return workIds.map((id, i) => ({ id, status: statuses[i] })); }, }); ``` ## cancel and cancelAll Cancels pending or running work items. If work is already running, it will finish but won't be retried. The onComplete callback will still be called with `result.kind === "canceled"`. ```typescript import { mutation, action } from "./_generated/server"; import { Workpool, vWorkId, WorkId } from "@convex-dev/workpool"; const pool = new Workpool(components.workpool, { maxParallelism: 10 }); export const cancelJob = mutation({ args: { workId: vWorkId }, handler: async (ctx, { workId }) => { // Cancel a specific work item await pool.cancel(ctx, workId as WorkId); }, }); export const cancelAllPendingJobs = action({ args: { limit: v.optional(v.number()) }, handler: async (ctx, { limit }) => { // Cancel all pending work in the pool await pool.cancelAll(ctx, { limit: limit ?? 1000 }); }, }); ``` ## defineOnComplete Helper method to define a type-safe completion handler mutation. The handler receives the work ID, custom context, and the result (success, failed, or canceled). ```typescript import { internalMutation } from "./_generated/server"; import { internal } from "./_generated/api"; import { Workpool, vOnCompleteArgs } from "@convex-dev/workpool"; import { v } from "convex/values"; import { DataModel } from "./_generated/dataModel"; const pool = new Workpool(components.workpool, { maxParallelism: 10, retryActionsByDefault: true, }); // Method 1: Using defineOnComplete helper (recommended) export const emailSentHandler = pool.defineOnComplete<DataModel>({ context: v.object({ userId: v.id("users"), emailType: v.string(), }), handler: async (ctx, { workId, context, result }) => { const logEntry = { userId: context.userId, emailType: context.emailType, workId, completedAt: Date.now(), success: result.kind === "success", error: result.kind === "failed" ? result.error : null, }; await ctx.db.insert("emailLog", logEntry); // Chain another job on failure if (result.kind === "failed") { await pool.enqueueAction( ctx, internal.notifications.alertAdmin, { message: `Email failed: ${result.error}` }, { retry: { maxAttempts: 5, initialBackoffMs: 5000, base: 2 } } ); } }, }); // Method 2: Using vOnCompleteArgs validator directly export const alternativeHandler = internalMutation({ args: vOnCompleteArgs(v.object({ orderId: v.id("orders") })), handler: async (ctx, { workId, context, result }) => { if (result.kind === "success") { console.log("Order processed:", context.orderId, result.returnValue); } else if (result.kind === "failed") { console.error("Order failed:", result.error); } else { console.log("Order canceled"); } }, }); ``` ## Retry Behavior Configuration Configure exponential backoff with jitter for retrying failed actions. Only use retries for idempotent actions that can safely run multiple times. ```typescript import { Workpool, DEFAULT_RETRY_BEHAVIOR } from "@convex-dev/workpool"; // Default retry behavior: attempts at [0, 250, 500, 1000, 2000] ms delays console.log(DEFAULT_RETRY_BEHAVIOR); // { maxAttempts: 5, initialBackoffMs: 250, base: 2 } // Pool with custom default retry behavior const paymentPool = new Workpool(components.paymentPool, { maxParallelism: 5, retryActionsByDefault: true, defaultRetryBehavior: { maxAttempts: 10, // Up to 10 attempts total initialBackoffMs: 500, // Start with 500ms delay base: 2, // Double delay each retry }, }); // Override retry behavior per-call export const processPayment = mutation({ args: { orderId: v.id("orders"), transactionId: v.string() }, handler: async (ctx, args) => { await paymentPool.enqueueAction( ctx, internal.payments.chargeCard, args, { // Custom retry for this specific action retry: { maxAttempts: 3, initialBackoffMs: 1000, base: 3, // Triple delay each retry: 1s, 3s, 9s }, } ); }, }); // Disable retry for non-idempotent action export const sendOneTimeCode = mutation({ args: { phone: v.string() }, handler: async (ctx, { phone }) => { await paymentPool.enqueueAction( ctx, internal.sms.sendCode, { phone }, { retry: false } // Don't retry - could send multiple codes ); }, }); ``` ## Dynamic Parallelism Configuration Update pool parallelism at runtime, useful for dynamic scaling or pausing work during maintenance. ```typescript import { mutation, action } from "./_generated/server"; import { components } from "./_generated/api"; // Update maxParallelism via direct component call export const updatePoolParallelism = mutation({ args: { maxParallelism: v.number() }, handler: async (ctx, { maxParallelism }) => { // Set to 0 to pause all work in the pool await ctx.runMutation(components.workpool.config.update, { maxParallelism, }); }, }); // Or create Workpool with dynamic parallelism from environment export const getPool = () => { const parallelism = parseInt(process.env.WORKPOOL_PARALLELISM ?? "10"); return new Workpool(components.workpool, { maxParallelism: parallelism, }); }; ``` --- Workpool is ideal for scenarios where you need to separate and throttle async workloads, such as prioritizing verification emails over background scraping jobs. It excels at building durable workflows with guaranteed completion callbacks, managing third-party API calls with rate limiting, and reducing database write conflicts by serializing mutations that operate on shared data. Integration typically involves creating multiple Workpool instances in `convex.config.ts` for different priority levels, then using `enqueueAction` or `enqueueMutation` instead of `ctx.scheduler.runAfter`. The `onComplete` callback pattern enables building reliable multi-step workflows where each step's result determines the next action, with full visibility into job status through reactive queries.