### Upstash Workflow Development Setup Source: https://upstash.com/docs/workflow/sdk/workflow-py Instructions for setting up the development environment for the Upstash Workflow project. This includes cloning the repository, installing Poetry, installing dependencies, configuring environment variables, and running tests and code quality checks. ```bash # 1. Clone the repository # 2. Install Poetry # 3. Install dependencies with `poetry install` # 4. Create a .env file with `cp .env.example .env` and fill in the environment variables # 5. Run tests with `poetry run pytest` # 6. Format with `poetry run ruff format .` # 7. Check with `poetry run ruff check .` # 8. Type check with `poetry run mypy --show-error-codes .` ``` -------------------------------- ### Start TanStack Start Development Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Starts the TanStack Start development server using either pnpm or npm. This command is essential for running your application locally and making it accessible for workflow endpoint testing. Ensure you have the necessary package manager installed. ```bash pnpm dev ``` ```bash npm run dev ``` -------------------------------- ### Install Upstash Workflow SDK with FastAPI Source: https://upstash.com/docs/workflow/sdk/workflow-py This snippet shows the necessary commands to set up a Python virtual environment and install the required packages for using Upstash Workflow with FastAPI. It includes installing `fastapi`, `uvicorn`, and `upstash-workflow`. ```bash python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Start Local QStash Server Source: https://upstash.com/docs/workflow/agents/getting-started Starts the local QStash server using the Upstash CLI. This server is required for local development and testing of Upstash Workflow applications. Two package managers (npm and pnpm) are shown. ```bash npx @upstash/qstash-cli dev ``` ```bash pnpm dlx @upstash/qstash-cli dev ``` -------------------------------- ### Create TanStack Start Project Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Initializes a new TanStack Start project using pnpm. This command sets up the basic project structure and dependencies required for a TanStack Start application. ```bash pnpm create @tanstack/start@latest ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Installs the Upstash Workflow SDK for JavaScript. This allows your TanStack Start application to interact with Upstash Workflow services. The installation can be done using pnpm, npm, or bun. ```bash pnpm install @upstash/workflow ``` ```bash npm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Python Virtual Environment Setup Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Creates and activates a Python virtual environment for managing project dependencies. This ensures that project-specific packages do not conflict with system-wide installations. ```bash python -m venv venv source venv/bin/activate ``` -------------------------------- ### Install AI SDK Dependencies Source: https://upstash.com/docs/workflow/integrations/aisdk Commands to install the necessary packages for using OpenAI with the Vercel AI SDK. ```bash npm install @ai-sdk/openai ai zod ``` ```bash pnpm install @ai-sdk/openai ai zod ``` ```bash bun install @ai-sdk/openai ai zod ``` -------------------------------- ### Install Node.js Dependencies Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Installs the necessary Node.js dependencies for the Next.js frontend. This command should be run after setting up the project and potentially the Python virtual environment. ```bash npm install ``` -------------------------------- ### Clone Project Repository Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Clones the Upstash Workflow Next.js & Flask example project from GitHub. This is the initial step to get the project files for local development. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-flask ``` -------------------------------- ### Install Upstash Workflow and Realtime dependencies Source: https://upstash.com/docs/workflow/howto/realtime/basic Installs the required npm packages to enable workflow orchestration and real-time event communication. ```bash npm install @upstash/workflow @upstash/realtime @upstash/redis zod ``` -------------------------------- ### TypeScript - Sync User Data Example Source: https://upstash.com/docs/workflow/basics/context/call This example demonstrates how to use `context.call` in TypeScript to sync user data with a third-party application. It shows how to specify the request method, URL, body, and headers. ```APIDOC ## POST /api/workflow ### Description This endpoint handles incoming workflow requests and demonstrates calling an external API to sync user data. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **topic** (string) - Required - The topic for the workflow. ### Request Example ```json { "topic": "user-profile-update" } ``` ### Response #### Success Response (200) - **status** (number) - The HTTP status code of the external API response. - **headers** (object) - The HTTP headers of the external API response. - **body** (any) - The response body from the external API. #### Response Example ```json { "status": 200, "headers": { "Content-Type": "application/json" }, "body": { "message": "User data synced successfully" } } ``` ``` -------------------------------- ### Start QStash CLI Development Server Source: https://upstash.com/docs/workflow/howto/local-development/development-server Initiates the local development server for QStash functionality using the QStash CLI. This server allows for local testing and debugging of QStash operations without deploying to production. The output provides essential connection details and tokens. ```javascript npx @upstash/qstash-cli dev ``` -------------------------------- ### Install Workflow Agent Dependencies Source: https://upstash.com/docs/workflow/agents/features Install the necessary packages to define tools and integrate with the AI SDK and LangChain ecosystem. ```bash npm i ai mathjs zod @agentic/ai-sdk @agentic/weather @langchain/core @langchain/community ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Installs the Upstash Workflow SDK using different package managers (npm, pnpm, bun). This is the first step to integrate Upstash Workflow into your project. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Connect ngrok Account Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel This command connects your ngrok CLI to your account using your authentication token. Replace `` with your actual token obtained from the ngrok dashboard. This is a one-time setup step. ```bash ngrok config add-authtoken ``` -------------------------------- ### Run and Trigger Workflow Source: https://upstash.com/docs/workflow/quickstarts/solidjs Commands to start the development server and trigger the workflow endpoint via a POST request. ```bash npm run dev ``` ```bash curl -X POST https://localhost:3000/api/workflow ``` -------------------------------- ### Run Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Starts a local QStash server for development purposes. This server mimics the behavior of the managed QStash servers, allowing for local testing without external network calls. It provides credentials needed for environment variable configuration. ```bash pnpx @upstash/qstash-cli dev ``` ```bash npx @upstash/qstash-cli dev ``` -------------------------------- ### Configure Environment Variables for Local QStash Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Sets up environment variables in a .env file to connect your TanStack Start app to the local QStash server. These variables include the QStash URL, token, and signing keys, which are essential for communication with the local development server. ```txt QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r" QSTASH_NEXT_SIGNING_KEY="sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs" ``` -------------------------------- ### Start Local Tunnel with ngrok Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel This command starts a tunnel to make your local server publicly accessible. Replace `` with the port your local application is running on (e.g., 3000 for Next.js). The output will provide a public URL that Upstash Workflow can use. ```bash ngrok http ``` ```bash ngrok http 3000 ``` -------------------------------- ### Install Agents Package Source: https://upstash.com/docs/workflow/howto/migrations Install the new @upstash/workflow-agents package to access agent functionality separately from the core workflow package. ```bash npm install @upstash/workflow-agents ``` -------------------------------- ### Run Next.js App Source: https://upstash.com/docs/workflow/agents/getting-started Starts the local Next.js development server. This is a prerequisite for calling the workflow endpoint locally. ```bash npm run dev ``` -------------------------------- ### Initialize Webhook Endpoint Source: https://upstash.com/docs/workflow/howto/use-webhooks Demonstrates how to create a basic webhook endpoint using the serve function. This setup includes an initial payload parser to handle incoming request data. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` -------------------------------- ### Create Upstash Workflow Endpoint in TanStack Start Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Defines a workflow endpoint using TanStack Start's API routes and the Upstash Workflow SDK. The `serve` function from `@upstash/workflow/tanstack` is used to expose a workflow that can be triggered independently. This example demonstrates a simple workflow with two steps. ```typescript import { createFileRoute } from '@tanstack/react-router' import { serve } from '@upstash/workflow/tanstack' const someWork = (input: string) => { return `processed '${JSON.stringify(input)}'` } export const Route = createFileRoute('/api/workflow')({ server: { handlers: serve(async (context) => { const input = context.requestPayload const result1 = await context.run('step1', () => { const output = someWork(input) console.log('step 1 input', input, 'output', output) return output }) await context.run('step2', () => { const output = someWork(result1) console.log('step 2 input', result1, 'output', output) }) }), }, }) ``` -------------------------------- ### Start Workflow via HTTP Request (curl) Source: https://upstash.com/docs/workflow/howto/start Starts an Upstash Workflow run by sending an HTTP POST request to the workflow endpoint. This method is suitable for quick testing during development but is not recommended for production environments, especially if the endpoint is secured with signing keys, as it may lack the necessary signature headers. ```bash curl -X POST https:/// \ -H "my-header: foo" \ -d '{"foo": "bar"}' ``` -------------------------------- ### Install Upstash Workflow Packages Source: https://upstash.com/docs/workflow/agents/getting-started Installs the necessary Upstash Workflow and AI-related packages using npm. These packages are essential for using Upstash Workflow and its agent features. ```bash npm i @upstash/workflow @upstash/workflow-agents ai zod ``` -------------------------------- ### Run Workflow Locally with Wrangler Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Starts the local development server for the Cloudflare Worker. This command initializes the environment and provides a local URL for testing. ```bash npm run wrangler dev ``` -------------------------------- ### POST Request with Request Body and Headers Source: https://upstash.com/docs/workflow/basics/context/call This example illustrates making a POST request using context.call(), including a request body and custom headers. It demonstrates how to send data to an endpoint and include necessary authentication or metadata in the headers. The response handling remains the same, focusing on status, body, and headers. ```javascript const result = await context.call({ url: "https://api.example.com/submit", method: "POST", body: JSON.stringify({ key: "value" }), headers: { "Content-Type": "application/json", "Authorization": "Bearer YOUR_TOKEN" } }); console.log("Status:", result.status); console.log("Body:", result.body); ``` -------------------------------- ### Minimal Workflow Endpoint with Hono Source: https://upstash.com/docs/workflow/quickstarts/hono This snippet demonstrates the most basic setup for a workflow endpoint using Hono. It defines two steps, 'initial-step' and 'second-step', within the workflow. This requires the 'hono' and '@upstash/workflow/hono' packages. ```typescript import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` -------------------------------- ### Notify Workflows with context.notify Source: https://upstash.com/docs/workflow/basics/context/notify Demonstrates how to trigger an event notification to other workflows. The basic example shows a standard notification, while the lookback example includes a workflowRunId to ensure delivery even if the target workflow has not yet reached the wait step. ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload; const { notifyResponse, } = await context.notify("notify step", "my-event-Id", payload); }); ``` ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ orderId: string }>(async (context) => { const { orderId } = context.requestPayload; await context.run("process-payment", async () => { return processPayment(orderId); }); const { notifyResponse, } = await context.notify( "notify payment complete", "payment-processed", { orderId, status: "success" }, "wfr_order_processor_123" ); }); ``` -------------------------------- ### Python - Generate Long Essay Example Source: https://upstash.com/docs/workflow/basics/context/call This Python example showcases how to use `context.call` with FastAPI to generate a long essay using the OpenAI API. It illustrates setting up the request body with specific messages for the AI model. ```APIDOC ## POST /api/example ### Description This endpoint, built with FastAPI, accepts a topic and uses `context.call` to invoke the OpenAI API for generating a lengthy essay. ### Method POST ### Endpoint /api/example ### Parameters #### Request Body - **topic** (string) - Required - The subject for the essay generation. ### Request Example ```json { "topic": "The impact of artificial intelligence on modern society" } ``` ### Response #### Success Response (200) - **status** (number) - The HTTP status code received from the OpenAI API. - **headers** (object) - The HTTP headers returned by the OpenAI API. - **body** (object) - The response body from the OpenAI API, containing the generated essay. #### Response Example ```json { "status": 200, "headers": { "Content-Type": "application/json" }, "body": { "id": "chatcmpl-12345", "choices": [ { "message": { "role": "assistant", "content": "Artificial intelligence (AI) is rapidly transforming modern society... [long essay content]..." } } ] } } ``` ``` -------------------------------- ### Implement Failure Handling with Python Source: https://upstash.com/docs/workflow/howto/failures This Python example shows how to define and use a `failure_function` with the `@serve.post` decorator for handling workflow execution errors. It outlines the parameters available for error details and demonstrates a basic structure for custom failure logic. ```python async def failure_function( context, # context during failure fail_status, # failure status fail_response, # failure message fail_headers # failure headers ): # handle the failure pass @serve.post("/api/example", failure_function=failure_function) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Define Workflow Endpoint in SolidJS Source: https://upstash.com/docs/workflow/quickstarts/solidjs Example of creating a workflow endpoint in a SolidJS project using the serve function from the SDK. ```typescript import { serve } from "@upstash/workflow/solidjs" export const { POST } = serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ``` -------------------------------- ### Notify with Retry Logic Source: https://upstash.com/docs/workflow/features/wait-for-event This example shows how to send a notification and includes logic to check if any workflows were actually notified. If not, it waits and retries the notification once. ```APIDOC ## POST /workflow/notify (with retry) ### Description Sends a notification for a given event. This endpoint includes a check to see if any workflow runs were waiting for the event. If no workflows were notified, it implements a retry mechanism after a short delay. ### Method POST ### Endpoint `/workflow/notify` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **eventId** (string) - Required - The ID of the event to notify. - **eventData** (object) - Optional - The data associated with the event. ### Request Example ```json { "eventId": "", "eventData": { "status": "completed" } } ``` ### Response #### Success Response (200) - **waiters** (array) - A list of workflow runs that were notified. If this array is empty, no workflows were waiting for the event. #### Response Example ```json { "waiters": [ { "workflowRunId": "" } ] } ``` ### Notes - The provided TypeScript code snippet demonstrates a client-side implementation of this retry logic. - A 5-second delay is used before the retry attempt in the example. ``` -------------------------------- ### Start Stripe Trial in Workflow Source: https://upstash.com/docs/workflow/examples/authWebhook Initiates a trial period in Stripe within the workflow context. This ensures the action is tracked and retriable. ```typescript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` ```python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` -------------------------------- ### Run and Trigger Workflow via Terminal Source: https://upstash.com/docs/workflow/quickstarts/fastapi Provides commands to set environment variables, start the FastAPI development server, and trigger the workflow endpoint using curl. ```bash source .env uvicorn main:app --reload curl -X POST https://localhost:8000/api/workflow ``` -------------------------------- ### Resuming Workflow Runs from DLQ Source: https://upstash.com/docs/workflow/basics/client/dlq/resume Demonstrates how to resume workflow runs using the client.dlq.resume method. Includes examples for both single-run resumption with flow control and batch resumption of multiple DLQ entries. ```typescript const { messages } = await client.dlq.list(); const response = await client.dlq.resume({ dlqId: messages[0].dlqId, flowControl: { key: "my-flow-control-key", value: "my-flow-control-value", }, retries: 3, }); ``` ```typescript const responses = await client.dlq.resume({ dlqId: ["dlq-12345", "dlq-67890"], retries: 5, }); ``` -------------------------------- ### Implement User Onboarding Workflow Source: https://upstash.com/docs/workflow/examples/authWebhook This snippet demonstrates how to define a workflow that handles user creation, Stripe integration, and trial lifecycle management. It includes steps for database persistence, email communication, and scheduled delays. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.sleep("wait", 7 * 24 * 60 * 60); // ... remaining workflow logic }); ``` ```python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} # Workflow implementation follows using the serve decorator ``` -------------------------------- ### Complete Parallel Workflow Implementation Source: https://upstash.com/docs/workflow/howto/parallel-runs A full example of a workflow that checks multiple inventory items in parallel before proceeding to a sequential task. It uses the serve function from @upstash/workflow/nextjs. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` -------------------------------- ### Programmatically Schedule a Workflow Source: https://upstash.com/docs/workflow/howto/schedule This section provides code examples for programmatically scheduling workflows. It demonstrates how to create user-specific schedules, such as sending weekly summary reports, using TypeScript and Python. ```APIDOC ## Programmatically Schedule a Workflow ### Description This endpoint allows for the programmatic scheduling of workflows, often used for tasks like sending weekly summary reports to users. It demonstrates creating a schedule that triggers seven days after a user signs up. ### Method POST ### Endpoint `/api/sign-up` ### Parameters #### Request Body - **userData** (object) - Required - User data containing necessary information for registration and scheduling. ### Request Example (TypeScript) ```typescript api/sign-up/route.ts import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` ### Request Example (Python) ```python main.py from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ``` ### Notes - When creating a per-user schedule, pass a unique `scheduleId` to identify the schedule for better management and observability. ``` -------------------------------- ### Configure Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Sets up a local QStash server for development and testing workflows. It involves running the QStash CLI and configuring environment variables with the provided QStash URL and token. ```bash npx @upstash/qstash-cli dev ``` ```text QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" ``` -------------------------------- ### Wait for Webhook Execution in TypeScript Source: https://upstash.com/docs/workflow/basics/context/waitForWebhook This example demonstrates how to initialize a webhook using context.createWebhook and subsequently pause the workflow execution using context.waitForWebhook. It includes logic to handle both successful webhook triggers and timeout scenarios. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const webhook = await context.createWebhook("create webhook"); const webhookResponse = await context.waitForWebhook( "wait for webhook", webhook, "30s" ); if (webhookResponse.timeout) { console.log("Webhook was not called within the timeout period"); } else { console.log("Webhook was called successfully"); console.log("Request body:", webhookResponse.request.body); console.log("Request headers:", webhookResponse.request.headers); } }); ``` -------------------------------- ### Configure Local Development Environment Source: https://upstash.com/docs/workflow/quickstarts/solidjs Commands and configuration snippets for setting up local development environment variables for QStash. ```bash touch .env ``` ```bash npx @upstash/qstash-cli dev ``` ```text QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" ``` ```text QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### Get Signing Keys (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/signing-keys/get-signing-keys This OpenAPI specification defines the endpoint for retrieving signing keys from the Upstash Workflow REST API. It outlines the GET request to /v2/keys, expected responses, and authentication mechanisms (Bearer Token and Query Parameter). ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/keys: get: tags: - Signing Keys summary: Get Signing Keys description: Retrieve your current and next signing keys responses: '200': description: Signing keys retrieved successfully content: application/json: schema: $ref: '#/components/schemas/SigningKeys' components: schemas: SigningKeys: type: object properties: current: type: string description: The current signing key. next: type: string description: The next signing key. securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Verify Upstash QStash Signature in Next.js Source: https://upstash.com/docs/workflow/howto/failures This Next.js example shows how to use the `verifySignature` utility from `@upstash/qstash/nextjs` to verify incoming requests, ensuring they are legitimately from Upstash. It includes handling base64 encoded bodies and setting up the API route. ```javascript // pages/api/callback.js import { verifySignature } from "@upstash/qstash/nextjs"; function handler(req, res) { // responses from qstash are base64-encoded const decoded = atob(req.body.body); console.log(decoded); return res.status(200).end(); } export default verifySignature(handler); export const config = { api: { bodyParser: false, }, }; ``` -------------------------------- ### Combine Multiple Middlewares Source: https://upstash.com/docs/workflow/howto/middlewares This example demonstrates how to integrate multiple middlewares, including logging, error tracking, and performance monitoring, into an Upstash Workflow using the 'serve' function from '@upstash/workflow/nextjs'. Middlewares are applied in the order they appear in the 'middlewares' array. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { // Your workflow logic }, { middlewares: [ loggingMiddleware, errorTrackingMiddleware, performanceMiddleware ] } ); ``` -------------------------------- ### Configure Local Development Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/express Sets up environment variables for local development. It shows how to configure either a local QStash server URL and token or a Qstash token and the Upstash Workflow URL when using a local tunnel. ```txt QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" ``` ```txt QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### Correct Step Execution Order for generateText Source: https://upstash.com/docs/workflow/integrations/aisdk Demonstrates the correct way to call `generateText` by ensuring a preceding step retrieves the prompt. The incorrect example shows a common mistake where `generateText` is called before the prompt is available, leading to an error. ```typescript export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Get prompt in a step first const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); const result = await generateText({ model: openai('gpt-3.5-turbo'), prompt }); }); ``` -------------------------------- ### Implementing Order Processing Workflow with Event Waiting Source: https://upstash.com/docs/workflow/examples/waitForEvent A complete workflow example that requests order processing, waits for an external event with a 10-minute timeout, and sends a confirmation email upon completion. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const { orderId, userEmail } = context.requestPayload; await context.run("request order processing", async () => { await requestProcessing(orderId) }) const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" } ); if (timeout) return; const processedData = eventData; await context.run("process-order", async () => { console.log(`Order ${orderId} processed:`, processedData); }); await context.run("send-confirmation-email", async () => { await sendEmail(userEmail, "Your order has been processed!", processedData); }); }); ``` -------------------------------- ### Get Failed Workflow Run Details (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/dlq/get-failed-workflow-run This snippet defines the OpenAPI 3.1.0 specification for retrieving details of a specific failed workflow run from the Dead Letter Queue (DLQ). It outlines the GET request for the /v2/workflows/dlq/{dlqId} endpoint, including parameters, response schemas for success and error cases, and component schemas for WorkflowDLQMessage and Error. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/workflows/dlq/{dlqId}: get: tags: - DLQ summary: Get Failed Workflow Run description: | Get details of a specific failed workflow run from the DLQ. parameters: - in: path name: dlqId required: true schema: type: string description: The DLQ ID of the failed workflow run. responses: '200': description: Failed workflow run details content: application/json: schema: $ref: '#/components/schemas/WorkflowDLQMessage' '400': description: Bad Request - Invalid DLQ ID content: application/json: schema: $ref: '#/components/schemas/Error' '401': description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/Error' '404': description: DLQ message not found content: application/json: schema: $ref: '#/components/schemas/Error' '500': description: Internal Server Error content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: WorkflowDLQMessage: type: object properties: dlqId: type: string description: The DLQ ID of the failed workflow message. workflowUrl: type: string description: The URL of the workflow. workflowRunId: type: string description: The ID of the workflow run. workflowCreatedAt: type: integer format: int64 description: >- The timestamp when the workflow run was created (Unix timestamp in milliseconds). url: type: string description: The URL of the failed workflow step. method: type: string description: The HTTP method used for the workflow step. header: type: object additionalProperties: type: array items: type: string description: The HTTP headers sent to the workflow step. body: type: string description: >- The body of the message if it is composed of utf8 chars only, empty otherwise. bodyBase64: type: string description: >- The base64 encoded body if the body contains a non-utf8 char only, empty otherwise. maxRetries: type: integer description: >- The number of retries that should be attempted in case of delivery failure. createdAt: type: integer format: int64 description: The unix timestamp in milliseconds when the message was created. failureCallback: type: string description: The url where we send a callback to after the workflow fails. callerIP: type: string description: IP address of the publisher of this workflow. label: type: string description: The label assigned to the workflow run. flowControlKey: type: string description: The flow control key used for rate limiting. failureFunctionState: type: string description: The state of the failure function if applicable. responseStatus: type: integer description: The HTTP status code received from the destination API. responseHeader: type: object additionalProperties: type: array items: type: string description: The HTTP response headers received from the destination API. responseBody: type: string description: >- The body of the response if it is composed of utf8 chars only, empty otherwise. responseBodyBase64: type: string description: >- The base64 encoded body of the response if the body contains a non-utf8 char only, empty otherwise. Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer ``` -------------------------------- ### Trigger Workflow and Notify with Lookback Source: https://upstash.com/docs/workflow/features/wait-for-event This example demonstrates how to trigger a workflow and then immediately notify it with a specific event ID and data, using `workflowRunId` to enable lookback and prevent race conditions. ```APIDOC ## POST /workflow/trigger and POST /workflow/notify (with lookback) ### Description Triggers a workflow run and then sends a notification to it. The `workflowRunId` parameter in the `notify` call enables lookback, ensuring the notification is received even if it's sent before the workflow starts waiting. ### Method POST ### Endpoint `/workflow/trigger` and `/workflow/notify` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body (for trigger) - **url** (string) - Required - The URL of the workflow to trigger. - **body** (object) - Optional - The payload to send to the workflow. #### Request Body (for notify) - **eventId** (string) - Required - The ID of the event to notify. - **eventData** (object) - Optional - The data associated with the event. - **workflowRunId** (string) - Required - The ID of the specific workflow run to notify, enabling lookback. ### Request Example (Trigger) ```json { "url": "https://your-app.com/api/process-order", "body": { "orderId": "123" } } ``` ### Request Example (Notify) ```json { "eventId": "payment-verified", "eventData": { "verified": true }, "workflowRunId": "" } ``` ### Response #### Success Response (200) (for trigger) - **workflowRunId** (string) - The ID of the triggered workflow run. #### Success Response (200) (for notify) - **waiters** (array) - A list of workflow runs that were notified. #### Response Example (Trigger) ```json { "workflowRunId": "" } ``` #### Response Example (Notify) ```json { "waiters": [ { "workflowRunId": "" } ] } ``` ``` -------------------------------- ### GET /workflow-runs Source: https://upstash.com/docs/workflow/api-refence/logs/list-workflow-run-logs Retrieve a list of workflow runs with their associated metadata and step execution details. ```APIDOC ## GET /workflow-runs ### Description Retrieves an array of completed workflow runs, including detailed step information and metadata. ### Method GET ### Endpoint /workflow-runs ### Parameters #### Query Parameters - **cursor** (string) - Optional - Pagination cursor for retrieving the next set of results. - **state** (string) - Optional - Filter runs by state (e.g., RUN_SUCCESS, RUN_FAILED). - **groupBy** (string) - Optional - Grouping criteria for the returned workflow runs. ### Request Example GET /workflow-runs?state=RUN_SUCCESS ### Response #### Success Response (200) - **runs** (array) - List of WorkflowRun objects containing execution details. #### Response Example { "runs": [ { "workflowRunId": "run_123", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1715678900000, "steps": [] } ] } ``` -------------------------------- ### Serve API Endpoint with External Call (Python) Source: https://upstash.com/docs/workflow/basics/context/call This example demonstrates creating an API endpoint using Upstash Workflow's `Serve` integration with FastAPI in Python. It defines a POST endpoint that accepts a `topic` and uses `context.call` to interact with the OpenAI API to generate a long essay. The snippet includes setting up the FastAPI app, defining the request model, and handling the response from the external API. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from dataclasses import dataclass import os app = FastAPI() serve = Serve(app) @dataclass class Request: topic: str @serve.post("/api/example") async def example(context: AsyncWorkflowContext[Request]) -> None: request: Request = context.request_payload result = await context.call( "generate-long-essay", url="https://api.openai.com/v1/chat/completions", method="POST", body={ "model": "gpt-4o", "messages": [ { "role": "system", "content": "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, {"role": "user", "content": request["topic"]}, ], }, headers={ "authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", }, ) status, headers, body = result.status, result.headers, result.body ``` -------------------------------- ### Workflow Invocation Example Source: https://upstash.com/docs/workflow/features/invoke Demonstrates how to invoke another workflow, passing data and receiving its response. The `context.invoke` function allows for asynchronous execution and provides details about the invoked workflow's outcome. ```APIDOC ## context.invoke ### Description Invokes another workflow and awaits its completion before proceeding. This enables orchestration of multiple workflows. ### Method `context.invoke(workflowName, options)` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **workflow** (object) - Required - The workflow object to invoke. - **body** (any) - Optional - The body to pass to the invoked workflow. - **header** (object) - Optional - Headers to pass to the invoked workflow. - **retries** (number) - Optional - The number of retries for the invoked workflow (default: 3). - **flowControl** (object) - Optional - Flow control settings for the invoked workflow. - **workflowRunId** (string) - Optional - A specific workflow run ID to set. ### Response #### Success Response (200) - **body** (any) - The response body from the invoked workflow. - **isFailed** (boolean) - True if the invoked workflow failed. - **isCanceled** (boolean) - True if the invoked workflow was canceled. ### Request Example ```typescript const { body, isFailed, isCanceled } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, retries, flowControl, workflowRunId } ) ``` ### Response Example ```json { "body": { "result": "analysis complete" }, "isFailed": false, "isCanceled": false } ``` ### Limitations An infinite chain of workflow invocations is not permitted. The process will fail if the invocation depth exceeds 100. ``` -------------------------------- ### Run Flask App and Trigger Workflow (Bash) Source: https://upstash.com/docs/workflow/quickstarts/flask This snippet provides the bash commands to run a Flask application and trigger a workflow endpoint. It first shows how to source environment variables from a .env file. Then, it starts the Flask development server and demonstrates how to make a POST request to the workflow endpoint using curl. ```bash source .env flask --app main run -p 8000 curl -X POST https://localhost:8000/api/workflow ``` -------------------------------- ### GET /workflow/runs/logs Source: https://upstash.com/docs/workflow/basics/client/logs Retrieves a list of workflow runs based on optional filter criteria and pagination parameters. ```APIDOC ## GET /workflow/runs/logs ### Description Retrieves workflow run logs. This endpoint allows filtering by run ID, state, URL, and creation time. ### Method GET ### Endpoint /workflow/runs/logs ### Parameters #### Query Parameters - **workflowRunId** (string) - Optional - Filter by a specific workflow run ID. - **count** (number) - Optional - Maximum number of runs to return. - **state** (string) - Optional - Filter by execution state (RUN_STARTED, RUN_SUCCESS, RUN_FAILED, RUN_CANCELED). - **workflowUrl** (string) - Optional - Filter by the exact workflow URL. - **workflowCreatedAt** (number) - Optional - Filter by the workflow creation time (Unix timestamp). - **cursor** (string) - Optional - A pagination cursor from a previous request. ### Request Example GET /workflow/runs/logs?state=RUN_SUCCESS&count=10 ### Response #### Success Response (200) - **runs** (array) - List of workflow run objects. - **cursor** (string) - A cursor to use for pagination. If no cursor is returned, there are no more workflow runs. #### Response Example { "runs": [], "cursor": "next_page_token" } ``` -------------------------------- ### Implement Customer Onboarding Workflow Source: https://upstash.com/docs/workflow/examples/customerOnboarding A complete workflow implementation that registers a user, sends a welcome email, waits for a period, and enters a loop to periodically check user activity and send appropriate follow-up emails. ```typescript import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { return "non-active" } ``` ```python from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### GET /v2/workflows/logs Source: https://upstash.com/docs/workflow/api-refence/logs/list-workflow-run-logs Retrieves a list of logs for workflow runs based on optional query parameters for filtering and pagination. ```APIDOC ## GET /v2/workflows/logs ### Description Fetch logs for workflow runs. Supports filtering by status, time ranges, and specific workflow identifiers. ### Method GET ### Endpoint /v2/workflows/logs ### Parameters #### Query Parameters - **cursor** (string) - Optional - Pagination cursor for fetching the next page. - **workflowUrl** (string) - Optional - Filter by exact workflow URL. - **workflowRunId** (string) - Optional - Filter by specific workflow run ID. - **workflowCreatedAt** (integer) - Optional - Filter by creation timestamp (ms). - **state** (string) - Optional - Filter by state (e.g., RUN_SUCCESS, RUN_FAILED, STEP_SUCCESS). - **fromDate** (integer) - Optional - Start date filter (ms). - **toDate** (integer) - Optional - End date filter (ms). - **count** (integer) - Optional - Max results per page (default 1000). - **trimBody** (integer) - Optional - Trim request/response bodies to specified bytes. - **label** (string) - Optional - Filter by user-assigned label. - **flowControlKey** (string) - Optional - Filter by flow control key. - **callerIp** (string) - Optional - Filter by caller IP address. ### Request Example GET /v2/workflows/logs?state=RUN_SUCCESS&count=10 ### Response #### Success Response (200) - **cursor** (string) - Pagination cursor for the next page. #### Response Example { "cursor": "next_page_token_abc123" } ``` -------------------------------- ### Sync User Creation (Python & TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook This snippet demonstrates how to synchronize a new user by creating them in a database. It shows the asynchronous execution flow and the retrieval of a user ID upon successful creation. This is typically the first step in onboarding a new user. ```python async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] ``` -------------------------------- ### Source Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Loads the environment variables defined in the .env file into the current shell session. This is necessary before starting the development server to ensure the application can access required configurations. ```bash source .env ``` -------------------------------- ### Example React Component Using Workflow Hook Source: https://upstash.com/docs/workflow/howto/realtime/basic This React component demonstrates how to use the `useWorkflow` hook to trigger a workflow and display its real-time progress. It includes a button to initiate the workflow and lists the steps as they complete. ```tsx "use client"; import { useWorkflow } from "@/hooks/useWorkflow"; export default function WorkflowPage() { const { trigger, steps, isRunFinished } = useWorkflow(); return (
{isRunFinished &&

✅ Workflow Finished!

}

Workflow Steps:

{steps.map((step, index) => (
{step.stepName} {Boolean(step.result) && : {JSON.stringify(step.result)}}
))}
); } ``` -------------------------------- ### Run Express.js Application and Trigger Workflow Source: https://upstash.com/docs/workflow/quickstarts/express Starts the Express.js development server and demonstrates how to trigger the workflow endpoint using a cURL command. This shows the basic process of sending a POST request with JSON payload to the defined workflow endpoint. ```bash npm run dev ``` ```bash curl -X POST http://localhost:3000/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Create Next.js Project Source: https://upstash.com/docs/workflow/agents/getting-started Initializes a new Next.js project using npx. This command sets up the basic structure for a Next.js application. ```bash npx create-next-app@latest [project-name] [options] ``` -------------------------------- ### GET /steps/{stepId} Source: https://upstash.com/docs/workflow/api-refence/logs/list-workflow-run-logs Retrieves detailed information about a specific workflow step, including its state, execution results, and call metadata. ```APIDOC ## GET /steps/{stepId} ### Description Retrieves the execution details, state, and response data for a specific workflow step. ### Method GET ### Endpoint /steps/{stepId} ### Parameters #### Path Parameters - **stepId** (integer) - Required - The unique identifier for the workflow step. ### Request Example GET /steps/12345 ### Response #### Success Response (200) - **stepId** (integer) - Unique identifier. - **stepName** (string) - Name of the step. - **state** (string) - Current state (STEP_SUCCESS, STEP_RETRY, STEP_FAILED, STEP_PROGRESS, STEP_CANCELED). - **out** (string) - The output/result of the step. - **callResponseStatus** (integer) - HTTP status code received. #### Response Example { "stepId": 12345, "stepName": "process-data", "state": "STEP_SUCCESS", "callResponseStatus": 200 } ``` -------------------------------- ### Pause workflow execution with context.sleep Source: https://upstash.com/docs/workflow/basics/context/sleep Demonstrates how to use context.sleep to pause a workflow for a specified duration before proceeding to the next step. This example shows implementation in both TypeScript and Python environments. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { const signedInUser = await signIn(userData); return signedInUser; }); // 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d"); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d") async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ``` -------------------------------- ### GET /v2/flowControl/{flowControlKey} Source: https://upstash.com/docs/workflow/api-refence/flow-control/get-flow-control-key Retrieves the details of a specific Flow Control key, including the number of messages currently waiting. ```APIDOC ## GET /v2/flowControl/{flowControlKey} ### Description Get details of a specific Flow Control key, including its name and the current number of messages waiting due to flow control configuration. ### Method GET ### Endpoint /v2/flowControl/{flowControlKey} ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The unique identifier of the Flow Control key to retrieve. ### Request Example GET /v2/flowControl/my-flow-key ### Response #### Success Response (200) - **flowControlKey** (string) - The flow control key name. - **waitlistSize** (integer) - The number of messages waiting due to flow control configuration. #### Response Example { "flowControlKey": "my-flow-key", "waitlistSize": 5 } ``` -------------------------------- ### Implementing Flow Control Source: https://upstash.com/docs/workflow/basics/context/call This example demonstrates how to use the flowControl parameter within context.call() to manage request rates and parallelism. By defining a rate and parallelism limit, you can prevent overwhelming external services or adhere to their API usage policies. This is essential for managing external dependencies effectively. ```javascript const result = await context.call({ url: "https://api.example.com/limited-resource", method: "GET", flowControl: { key: "resource-group-1", rate: 10, // 10 requests per second parallelism: 5 // Max 5 concurrent requests } }); console.log("Status:", result.status); ``` -------------------------------- ### GET /v2/keys Source: https://upstash.com/docs/workflow/api-refence/signing-keys/get-signing-keys Retrieve your current and next signing keys. This endpoint is used to fetch the signing keys necessary for verifying requests made by the Upstash service. ```APIDOC ## GET /v2/keys ### Description Retrieve your current and next signing keys. This endpoint is used to fetch the signing keys necessary for verifying requests made by the Upstash service. ### Method GET ### Endpoint /v2/keys ### Parameters #### Query Parameters - **qstash_token** (string) - Required - QStash authentication token passed as a query parameter #### Request Body None ### Request Example None ### Response #### Success Response (200) - **current** (string) - The current signing key. - **next** (string) - The next signing key. #### Response Example ```json { "current": "your_current_signing_key", "next": "your_next_signing_key" } ``` ``` -------------------------------- ### Perform HTTP Request with context.call() Source: https://upstash.com/docs/workflow/basics/context/call This snippet demonstrates how to use context.call() to make an HTTP GET request. It shows basic usage with a URL and highlights that the function returns the status, body, and headers of the response. This is fundamental for integrating external APIs into your workflows. ```javascript const result = await context.call({ url: "https://api.example.com/data", method: "GET" }); console.log("Status:", result.status); console.log("Body:", result.body); console.log("Headers:", result.headers); ``` -------------------------------- ### Chaining LLM Agents in Upstash Workflow Source: https://upstash.com/docs/workflow/agents/patterns/prompt-chaining This TypeScript example demonstrates how to define three distinct agents and chain their outputs. The workflow uses the Upstash Workflow SDK to execute tasks sequentially, passing the text output from one agent as the prompt input for the next. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo'); const agent1 = agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = agents.agent({ model, name: 'secondAgent', maxSteps: 2, background: 'You are an agent that describes the work of the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the works of the physicists mentioned previously.', tools: {} }); const firstOutput = await agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ``` -------------------------------- ### Avoid Non-Deterministic Code Outside context.run Source: https://upstash.com/docs/workflow/basics/caveats Examples of anti-patterns where non-idempotent, time-dependent, or random code is executed directly in the workflow handler, which can cause authentication or execution errors. ```typescript export const { POST } = serve<{ entryId: string }>(async (context) => { const { entryId } = context.requestPayload; const result = await getResultFromDb(entryId); if (result.return) { return; } }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: entry_id = context.request_payload["entry_id"] result = await get_result_from_db(entry_id) if result.should_return: return ``` -------------------------------- ### Define Workflow Endpoint in Next.js Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Demonstrates how to serve a workflow endpoint using the Upstash Workflow SDK in both App and Pages router configurations. Includes examples for standard implementation and handling native request objects. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) ``` ```typescript import { serve } from "@upstash/workflow/nextjs"; import { NextRequest } from "next/server"; export const POST = async (request: NextRequest) => { const { POST: handler } = serve(async (context) => { // Your workflow steps }); return await handler(request); } ``` ```typescript import { servePagesRouter } from "@upstash/workflow/nextjs"; const { handler } = servePagesRouter( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) export default handler; ``` ```typescript import type { NextApiRequest, NextApiResponse } from "next"; import { servePagesRouter } from "@upstash/workflow/nextjs"; export default async function handler( req: NextApiRequest, res: NextApiResponse ) { const { handler } = servePagesRouter( async (context) => { // Your workflow steps } ) await handler(req, res) } ``` -------------------------------- ### Configure Environment Variables Source: https://upstash.com/docs/workflow/agents/features Set the required QStash and OpenAI API keys in your environment configuration. ```bash QSTASH_TOKEN="" OPENAI_API_KEY="" ``` -------------------------------- ### Call Resend API Source: https://upstash.com/docs/workflow/basics/context/api Provides an example of using `context.api.resend.call` to send an email via the Resend service. It requires a Resend API key and specifies the sender, recipient, subject, and HTML content of the email. ```typescript const { status, body } = await context.api.resend.call("Call Resend", { token: "", body: { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, headers: { "content-type": "application/json", }, }); ``` -------------------------------- ### Implement workflow delays with context.sleep Source: https://upstash.com/docs/workflow/features/sleep This example demonstrates how to pause a workflow for a specific duration (3 days) between two execution steps. It shows the implementation pattern for both TypeScript and Python using the Upstash Workflow SDK. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const { userId } = context.requestPayload; // Send welcome email immediately await context.run("send-welcome-email", async () => { return await sendWelcomeEmail(userId); }); // Wait for 3 days before sending follow-up await context.sleep("wait-for-follow-up", "3d"); // Send follow-up email await context.run("send-follow-up-email", async () => { return await sendFollowUpEmail(userId); }); }); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[str]) -> None: user_id = context.request_payload["user_id"] # Send welcome email immediately async def _send_welcome_email(): return await send_welcome_email(user_id) await context.run("send-welcome-email", _send_welcome_email) # Wait for 3 days before sending follow-up await context.sleep("wait-for-follow-up", "3d") # Send follow-up email async def _send_follow_up_email(): return await send_follow_up_email(user_id) await context.run("send-follow-up-email", _send_follow_up_email) ``` -------------------------------- ### Define Basic Workflow Endpoint with FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi Demonstrates how to initialize a workflow service with FastAPI and define sequential steps using context.run. This pattern ensures business logic is retried automatically upon failure. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` -------------------------------- ### OpenAPI Specification for Get Flow Control Key Source: https://upstash.com/docs/workflow/api-refence/flow-control/get-flow-control-key-1 This snippet provides the OpenAPI 3.1.0 specification for the 'Get Flow Control Key' endpoint. It details the API's purpose, available paths, parameters, request/response schemas, and security schemes. This specification is crucial for understanding and integrating with the Upstash Workflow REST API. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/keys/rotate: get: tags: - Flow Control summary: Get Flow Control Key description: > Get details of a specific Flow Control key. Flow Control keys are used to manage concurrency and rate limiting for workflow steps. This endpoint returns the current waitlist size for the specified flow control key, which indicates how many workflow steps are currently waiting due to flow control constraints. parameters: - in: path name: flowControlKey required: true schema: type: string description: The Flow Control key to retrieve. responses: '200': description: Flow control key details content: application/json: schema: $ref: '#/components/schemas/FlowControlKey' '400': description: Bad Request content: application/json: schema: $ref: '#/components/schemas/Error' '401': description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/Error' '404': description: Flow Control key not found content: application/json: schema: $ref: '#/components/schemas/Error' '500': description: Internal Server Error content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: FlowControlKey: type: object properties: flowControlKey: type: string description: The flow control key name waitlistSize: type: integer description: The number of messages waiting due to flow control configuration. Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Implement Basic Webhook Workflow Source: https://upstash.com/docs/workflow/features/webhooks Demonstrates the standard pattern of creating a webhook, passing the URL to an external service, and waiting for the callback response with a timeout. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const webhook = await context.createWebhook("create webhook"); const callResult = await context.call("call webhook caller", { url: "https://webhook/caller", method: "POST", body: JSON.stringify({ webhookUrl: webhook.webhookUrl, }), }); const webhookResponse = await context.waitForWebhook( "wait for webhook", webhook, "30s" ); if (webhookResponse.timeout) { console.log("Webhook was not called in time"); } else { console.log("Webhook received:", webhookResponse.request); } }); ``` -------------------------------- ### Workflow Endpoint with Hono Context Access Source: https://upstash.com/docs/workflow/quickstarts/hono This example shows how to create a workflow endpoint with Hono while also accessing Hono's native context, such as environment variables. It defines a custom Bindings interface that extends WorkflowBindings and uses `env()` adapter to retrieve environment variables. The initial payload type is represented by `unknown` and can be specified for better type safety. ```typescript import { Hono } from "hono" import { serve, WorkflowBindings } from "@upstash/workflow/hono" import { env } from "hono/adapter" interface Bindings extends WorkflowBindings { ENVIRONMENT: "development" | "production" } const app = new Hono<{ Bindings: Bindings }>() app.post("/workflow", (c) => { // 👇 access Honos native context, i.e. getting an env variable const { ENVIRONMENT } = env(c) // 👇 `unknown` represents your initial payload data type const handler = serve( async (context) => { ... } ) return await handler(c) }) ``` -------------------------------- ### Get Global Parallelism OpenAPI Specification Source: https://upstash.com/docs/workflow/api-refence/flow-control/get-global-parallelism This snippet provides the OpenAPI 3.1.0 specification for the Upstash Workflow REST API endpoint used to retrieve global parallelism information. It details the GET request to /v2/globalParallelism, including authentication methods and the structure of the JSON response which contains maximum and current parallelism counts. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/globalParallelism: get: tags: - Flow Control summary: Get Global Parallelism description: >- Returns the current global parallelism usage across all flow control keys responses: '200': description: Global parallelism info retrieved successfully content: application/json: schema: type: object properties: parallelismMax: type: integer description: The configured maximum global parallelism parallelismCount: type: integer description: >- The current number of messages running globally in parallel components: securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Creating Custom Middleware with Init Function Source: https://upstash.com/docs/workflow/howto/middlewares Illustrates creating a custom middleware that requires initialization of resources using the `init` function. ```APIDOC ## Creating Custom Middleware with Init Function ### Description This section explains how to create a custom middleware that needs to initialize resources, such as database connections or external clients, before executing its callbacks. The `init` function handles resource initialization, and its return value (callbacks) is used by the middleware. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters None #### Request Body (Workflow definition and execution payload) ### Request Example ```json { "workflow": "your_workflow_definition", "payload": {} } ``` ### Response #### Success Response (200) - **result** (any) - The result of the workflow execution. #### Response Example ```json { "result": "Workflow completed successfully" } ``` ``` -------------------------------- ### context.run() - Serial Execution (Python) Source: https://upstash.com/docs/workflow/basics/context/run Demonstrates sequential execution of workflow steps in Python using await with context.run(). ```APIDOC ## POST /api/example ### Description Executes workflow steps sequentially in Python. Each step completes before the next one starts. ### Method POST ### Endpoint /api/example ### Parameters #### Request Body - **input** (string) - Required - The input payload for the workflow. ### Request Example ```json { "input": "some data" } ``` ### Response #### Success Response (200) - **result1** (string) - The result from the first step. - **result2** (string) - The result from the second step (if applicable). #### Response Example ```json { "result1": "processed data", "result2": "final output" } ``` ``` -------------------------------- ### Implement Authentication in Flask Workflow Source: https://upstash.com/docs/workflow/quickstarts/flask This example demonstrates how to add an authentication check to a Flask workflow endpoint. It inspects the 'Authentication' header from the incoming request. If the header does not match the expected 'Bearer secret_password', the workflow execution is halted. Otherwise, it proceeds to execute the defined steps. ```python from flask import Flask from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/auth") def auth(context: WorkflowContext[str]) -> None: if context.headers.get("Authentication") != "Bearer secret_password": print("Authentication failed.") return def _step1() -> str: return "output 1" context.run("step1", _step1) def _step2() -> str: return "output 2" context.run("step2", _step2) ``` -------------------------------- ### GET /waiters Source: https://upstash.com/docs/workflow/basics/client/waiters Retrieves all workflow runs that are currently paused at a context.waitForEvent step for a specific event ID. ```APIDOC ## GET /waiters ### Description Retrieves a list of all waiters currently listening for a specific event ID. A waiter represents a workflow run paused at a `context.waitForEvent` step. ### Method GET ### Endpoint /waiters ### Parameters #### Query Parameters - **eventId** (string) - Required - The identifier of the event to look up. ### Request Example ```javascript const result = await client.getWaiters({ eventId: "my-event-id", }); ``` ### Response #### Success Response (200) - **waiters** (Array) - A list of Waiter objects describing workflows waiting on the event. #### Response Example { "waiters": [ { "workflowRunId": "run_123", "eventId": "my-event-id" } ] } ``` -------------------------------- ### Create Astro Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/astro This TypeScript code demonstrates how to define a workflow endpoint in an Astro application. It uses the `serve` function from `@upstash/workflow/astro` to handle workflow execution. The example includes two steps: an initial step that logs a message and returns a string, and a second step that logs the result from the first step. Environment variables are handled using a combination of `process.env` and `import.meta.env`. ```typescript import { serve } from "@upstash/workflow/astro"; export const { POST } = serve(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { // env must be passed in astro. // for local dev, we need import.meta.env. // For deployment, we need process.env: env: { ...process.env, ...import.meta.env } }) ``` -------------------------------- ### GET /v2/workflows/dlq Source: https://upstash.com/docs/workflow/api-refence/dlq/list-failed-workflow-runs Lists and paginates through all failed workflow runs currently in the Dead Letter Queue (DLQ). This endpoint allows filtering and inspection of failures to decide on actions like resuming, restarting, or deleting them. ```APIDOC ## GET /v2/workflows/dlq ### Description List and paginate through all failed workflow runs currently in the DLQ. Failed workflows end up in the DLQ after exhausting all retry attempts. You can filter, paginate, and inspect these failures to understand what went wrong and decide whether to resume, restart, or delete them. ### Method GET ### Endpoint /v2/workflows/dlq ### Parameters #### Query Parameters - **cursor** (string) - Optional - Pagination cursor. If provided, returns the next page of results. - **count** (integer) - Optional - The maximum number of failed workflow runs to return per page. Defaults to 100. Maximum is 100. - **fromDate** (integer) - Optional - Filter by starting date in milliseconds (Unix timestamp). This is inclusive. - **toDate** (integer) - Optional - Filter by ending date in milliseconds (Unix timestamp). This is inclusive. - **workflowUrl** (string) - Optional - Filter by workflow URL. - **workflowRunId** (string) - Optional - Filter by workflow run ID. - **workflowCreatedAt** (integer) - Optional - Filter by workflow creation timestamp in milliseconds (Unix timestamp). - **label** (string) - Optional - Filter by label assigned to the workflow run. - **failureFunctionState** (string) - Optional - Filter by failure function state. Possible values: CALLBACK_INPROGRESS, CALLBACK_SUCCESS, CALLBACK_FAIL, CALLBACK_CANCELED. - **callerIp** (string) - Optional - Filter by IP address of the publisher. - **flowControlKey** (string) - Optional - Filter by Flow Control Key. ### Response #### Success Response (200) - **cursor** (string) - Pagination cursor for the next page. Empty if no more results. - **messages** (array) - Array of failed workflow messages. - **dlqId** (string) - The DLQ ID of the failed workflow message. - **workflowUrl** (string) - The URL of the workflow. #### Error Response (400, 401, 500) - **code** (string) - Error code. - **message** (string) - Error message. ### Request Example (No request body for GET requests) ### Response Example (200 OK) ```json { "cursor": "next_cursor_string", "messages": [ { "dlqId": "dlq_123", "workflowUrl": "https://example.com/workflow/abc" } ] } ``` ``` -------------------------------- ### Image Processing Workflow (Python) Source: https://upstash.com/docs/workflow/examples/imageProcessing This Python code defines an image processing workflow using Upstash Workflow with FastAPI. It mirrors the TypeScript example, orchestrating image retrieval, resizing, filtering, and storage. The workflow uses `context.run` for sequential tasks and `context.call` for external service interactions. Dependencies include `fastapi`, `upstash_workflow`, and utility functions for image operations. ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) ``` -------------------------------- ### Create Express.js Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/express Defines a workflow endpoint using Express.js and the Upstash Workflow SDK. This example demonstrates a minimal workflow with two steps: processing an incoming message and logging it. It requires Express.js and dotenv for environment variable configuration. ```typescript import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string }> (async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) }) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ``` -------------------------------- ### context.run() - Sequential Execution (TypeScript) Source: https://upstash.com/docs/workflow/basics/context/run Demonstrates how to execute workflow steps sequentially using await with context.run(). Each step completes before the next one begins. ```APIDOC ## POST /api/example ### Description Executes workflow steps sequentially. Each step must complete before the next one starts. ### Method POST ### Endpoint /api/example ### Parameters #### Request Body - **input** (string) - Required - The input payload for the workflow. ### Request Example ```json { "input": "some data" } ``` ### Response #### Success Response (200) - **result1** (string) - The result from the first step. - **result2** (string) - The result from the second step (if applicable). #### Response Example ```json { "result1": "processed data", "result2": "final output" } ``` ``` -------------------------------- ### Handle Authorization Errors with Early Return (Python) Source: https://upstash.com/docs/workflow/troubleshooting/general Illustrates how to manage authorization errors in Upstash Workflows using Python. Similar to the TypeScript example, returning early from a function before executing workflow steps triggers an authentication failure. This snippet shows the recommended pattern for handling non-deterministic conditions by transforming them into explicit workflow steps. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: if some_condition(): return # rest of the workflow ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _check_condition() -> bool: return some_condition() should_return = await context.run("check condition", _check_condition) if should_return: return # rest of the workflow ``` -------------------------------- ### Serve a Workflow Endpoint Source: https://upstash.com/docs/workflow/basics/serve Demonstrates how to initialize a workflow endpoint using the serve function. It accepts a route function for logic and an options object for configuration. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { // Route function }, { // Options }); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### Configure Flow Control for Workflow Trigger Source: https://upstash.com/docs/workflow/features/flow-control This TypeScript example shows how to configure flow control settings when triggering a workflow run using the Upstash Workflow client. It specifies a `key` for the flow control, limits `parallelism` to 1, and sets the `rate` to 10 events per `period` of 100 (units depend on context, likely milliseconds or seconds). This configuration ensures that the workflow adheres to the defined execution limits. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", flowControl: { key: "user-signup", parallelism: 1, rate: 10, period: 100, } }) ``` -------------------------------- ### Programmatically Schedule User-Specific Workflows Source: https://upstash.com/docs/workflow/howto/schedule Demonstrates how to create a recurring schedule for a specific user upon registration. This uses the QStash client to set a destination URL, a body payload, and a CRON expression. ```typescript import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` ```python from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ``` -------------------------------- ### Trigger Upstash Workflow for Text Generation Source: https://upstash.com/docs/workflow/integrations/aisdk This code snippet shows how to trigger a deployed Upstash Workflow endpoint using the Upstash Workflow client. It requires a QStash token and the URL of the workflow endpoint. The example triggers a workflow with a specific prompt for weather information. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: { "prompt": "How is the weather in San Francisco around this time?" } }); ``` -------------------------------- ### Retrieve and Download Dataset Source: https://upstash.com/docs/workflow/examples/allInOne Demonstrates how to fetch a dataset URL and download the content using context.run and context.call. This approach ensures long-running HTTP requests bypass serverless execution limits. ```typescript const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) ``` ```python async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body ``` -------------------------------- ### Redact Workflow Run Data (Python) Source: https://upstash.com/docs/workflow/howto/redact-fields This example shows how to trigger a workflow run and redact sensitive data from the request body and headers using the Upstash Workflow client in Python. It illustrates setting the `redact` option to control which parts of the request are masked. ```python from upstash_workflow import Client client = Client("") client.trigger( url="https://my-app.com/api/workflow", body={ "hello": "world", }, redact={ "body": True, "header": ["Authorization"] // or `header: True` to redact all headers }, ) ``` -------------------------------- ### GET /v2/keys/rotate Source: https://upstash.com/docs/workflow/api-refence/flow-control/get-flow-control-key-1 Retrieves the current status and waitlist size for a specific flow control key used in workflow orchestration. ```APIDOC ## GET /v2/keys/rotate ### Description Get details of a specific Flow Control key. Flow Control keys are used to manage concurrency and rate limiting for workflow steps. This endpoint returns the current waitlist size for the specified flow control key, which indicates how many workflow steps are currently waiting due to flow control constraints. ### Method GET ### Endpoint /v2/keys/rotate/{flowControlKey} ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The Flow Control key to retrieve. ### Request Example GET /v2/keys/rotate/my-flow-key ### Response #### Success Response (200) - **flowControlKey** (string) - The flow control key name - **waitlistSize** (integer) - The number of messages waiting due to flow control configuration. #### Response Example { "flowControlKey": "my-flow-key", "waitlistSize": 5 } ``` -------------------------------- ### Trigger Workflow with Vercel Bypass Header (TypeScript & cURL) Source: https://upstash.com/docs/workflow/troubleshooting/vercel This snippet shows how to trigger a workflow while bypassing Vercel's deployment protection. It includes examples for both TypeScript using the Upstash workflow client and for cURL. In both cases, the 'x-vercel-protection-bypass' header must be included when initiating the request. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.trigger({ url: "https://vercel-preview.com/workflow", headers: { "x-vercel-protection-bypass": process.env.VERCEL_AUTOMATION_BYPASS_SECRET! }, body: "Hello world!" }) ``` ```bash curl -X POST \ 'https://vercel-preview.com/workflow' \ -H 'x-vercel-protection-bypass: ' \ -d 'Hello world!' ``` -------------------------------- ### Create Custom Middleware with Init Pattern Source: https://upstash.com/docs/workflow/howto/middlewares Demonstrates using the init pattern for custom middleware, which is useful for initializing external resources like database connections before handling workflow events. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const databaseMiddleware = new WorkflowMiddleware({ name: "database-logger", init: async () => { const db = await connectToDatabase(); return { runStarted: async ({ context }) => { await db.insert({ workflowRunId: context.workflowRunId, status: 'started' }); }, runCompleted: async ({ context, result }) => { await db.update({ workflowRunId: context.workflowRunId, status: 'completed', result }); }, onError: async ({ workflowRunId, error }) => { await db.insert({ workflowRunId, level: 'error', message: error.message }); } }; } }); ``` -------------------------------- ### Trigger Workflow and Notify with Lookback Source: https://upstash.com/docs/workflow/features/wait-for-event This snippet demonstrates how to trigger a workflow and then immediately notify it with a specific event ID and data, using `workflowRunId` to enable lookback. This prevents race conditions where a notification might be sent before the workflow starts waiting. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // Trigger a workflow const { workflowRunId } = await client.trigger({ url: "https://your-app.com/api/process-order", body: { orderId: "123" } }); // Immediately notify with lookback - no race condition! await client.notify({ eventId: "payment-verified", eventData: { verified: true }, workflowRunId: workflowRunId, // Enables lookback }); ``` -------------------------------- ### Configure Environment Variables for QStash Source: https://upstash.com/docs/workflow/quickstarts/astro This section outlines how to set up environment variables for Upstash QStash, essential for authenticating your application. It covers creating a .env file and provides two options for local development: using a local QStash server or a local tunnel. The local server option is good for testing without affecting billing, while the local tunnel option syncs with production QStash for logging. ```bash touch .env ``` ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" ``` ```dotenv QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### GET /v2/globalParallelism Source: https://upstash.com/docs/workflow/api-refence/flow-control/get-global-parallelism Retrieves the current global parallelism usage, including the configured maximum and the current count of parallel messages. ```APIDOC ## GET /v2/globalParallelism ### Description Returns the current global parallelism usage across all flow control keys. This endpoint helps in monitoring the load and concurrency limits of your workflows. ### Method GET ### Endpoint /v2/globalParallelism ### Parameters None ### Request Example GET /v2/globalParallelism Authorization: Bearer ### Response #### Success Response (200) - **parallelismMax** (integer) - The configured maximum global parallelism. - **parallelismCount** (integer) - The current number of messages running globally in parallel. #### Response Example { "parallelismMax": 100, "parallelismCount": 12 } ``` -------------------------------- ### Define an Agent with Tools in TypeScript Source: https://upstash.com/docs/workflow/agents/features This snippet demonstrates how to define a researcher agent using Upstash Workflow and Langchain. It configures an OpenAI model, sets a maximum of 2 LLM calls, and provides a Wikipedia tool for information retrieval. The agent's background is set to guide its behavior as a researcher. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo') const researcherAgent = agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' 'Utilize Wikipedia as much as possible for correct information', }) }) ``` -------------------------------- ### POST /api/example Source: https://upstash.com/docs/workflow/basics/serve Defines an API endpoint using the serve() function to execute a workflow. The route function handles workflow logic, and options can configure advanced settings. ```APIDOC ## POST /api/example ### Description This endpoint allows you to define and run a workflow using the `serve()` function. It accepts a route function that outlines the workflow steps and an optional configuration object for advanced settings. ### Method POST ### Endpoint /api/example ### Parameters #### Request Body - **context** (AsyncWorkflowContext) - Required - The context object provided by Upstash Workflow, containing workflow APIs and run properties. ### Request Example ```json { "userId": "user123", "someOtherProperty": "value" } ``` ### Response #### Success Response (200) - **status** (string) - Indicates the success of the workflow execution. #### Response Example ```json { "status": "completed" } ``` ``` -------------------------------- ### Single Agent Workflow Source: https://upstash.com/docs/workflow/agents/features Demonstrates setting up and running a single agent with access to Wikipedia for research. ```APIDOC ## POST /api/workflow ### Description This endpoint allows you to define and run a single agent workflow. The agent is configured with a specific model, tools, and background information to perform a given task. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **context** (object) - Required - The workflow context. - **agents** (object) - Required - Agent configuration including model, name, maxSteps, tools, and background. - **task** (object) - Required - The task definition including the agent and prompt. ### Request Example ```json { "context": { ... }, "agents": { "model": "gpt-3.5-turbo", "name": "academic", "maxSteps": 2, "tools": { "wikiTool": "WikipediaQueryRun" }, "background": "You are researcher agent with access to Wikipedia. Utilize Wikipedia as much as possible for correct information." }, "task": { "agent": "academic", "prompt": "Tell me about 5 topics in advanced physics." } } ``` ### Response #### Success Response (200) - **text** (string) - The final text response generated by the agent after completing the task. #### Response Example ```json { "text": "Here are summaries of 5 topics in advanced physics: 1. Quantum Mechanics..." } ``` ``` -------------------------------- ### Trigger Single Workflow with Client Source: https://upstash.com/docs/workflow/howto/start Initiates a single Upstash Workflow run using the `client.trigger` method. This is the recommended approach for starting workflows programmatically. It allows specifying optional parameters like body, headers, workflow run ID, retries, delay, failure URL, and flow control. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3, // optional retries in the initial request delay: "10s", // optional delay value failureUrl: "https://", // optional failure url flowControl: { ... } // optional flow control }) console.log(workflowRunId) // prints wfr_my-workflow ``` -------------------------------- ### Implement Error Tracking Middleware Source: https://upstash.com/docs/workflow/howto/middlewares This example shows how to create a custom middleware named 'error-tracking'. The middleware utilizes the onError callback to send error details, including workflowRunId, error message, stack trace, and timestamp, to an external monitoring service via a POST request. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const errorTrackingMiddleware = new WorkflowMiddleware({ name: "error-tracking", callbacks: { onError: async ({ workflowRunId, error }) => { await fetch("https://your-monitoring-service.com/errors", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ workflowRunId, error: error.message, stack: error.stack, timestamp: new Date().toISOString() }) }); } } }); ``` -------------------------------- ### Implement Sleep Functionality in Flask Workflow Source: https://upstash.com/docs/workflow/quickstarts/flask This example shows how to incorporate sleep functionality within a Flask workflow. It includes steps that process input, use 'context.sleep_until' to pause execution until a specific time, and 'context.sleep' to pause for a duration. The workflow processes data through multiple steps with timed delays. ```python from flask import Flask import time from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/sleep") def sleep(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) context.sleep_until("sleep1", time.time() + 3) def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = context.run("step2", _step2) context.sleep("sleep2", 2) def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) context.run("step3", _step3) ``` -------------------------------- ### Orchestrate AI Data Processing Workflow (Python) Source: https://upstash.com/docs/workflow/examples/allInOne This Python code defines a FastAPI endpoint using Upstash Workflow to manage a data processing pipeline. It mirrors the TypeScript example by downloading data, chunking it, processing with OpenAI, aggregating results, and reporting. It demonstrates the use of `upstash_workflow.fastapi.Serve` for creating workflow-enabled API endpoints in Python. ```python from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( ``` -------------------------------- ### Trigger Multiple Workflow Runs Source: https://upstash.com/docs/workflow/basics/client/trigger Starts multiple workflow runs simultaneously by providing an array of configuration objects to the `trigger` method. Each object in the array defines the parameters for an individual workflow run. The method returns an array of results, where each element contains the `workflowRunId` for the corresponding triggered run. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const results = await client.trigger([ { url: "", // other options... }, { url: "", // other options... }, ]) console.log(results[0].workflowRunId) // prints wfr_my-workflow ``` -------------------------------- ### POST /serve (Workflow Configuration) Source: https://upstash.com/docs/workflow/basics/serve/advanced Configures and initializes a workflow endpoint with schema validation and custom infrastructure settings. ```APIDOC ## POST /serve ### Description Initializes a workflow handler. It supports automatic payload validation via Zod, custom URL routing for proxies, and manual configuration of QStash clients and receivers. ### Method POST ### Endpoint /serve ### Parameters #### Request Body (Configuration Options) - **schema** (Zod Object) - Optional - Zod schema to validate the incoming request payload. - **url** (string) - Optional - The full public URL of the workflow endpoint, used when behind proxies. - **baseUrl** (string) - Optional - Overrides the base portion of the inferred URL. - **qstashClient** (object) - Optional - Custom QStash client instance. - **receiver** (object) - Optional - Custom QStash Receiver instance for request verification. - **env** (object) - Optional - Manual injection of environment variables. ### Request Example { "schema": "z.object({ expression: z.string() })", "url": "https://api.example.com/workflow" } ### Response #### Success Response (200) - **status** (string) - Workflow initialization successful. #### Response Example { "status": "success" } ``` -------------------------------- ### GET /v2/workflows/dlq/{dlqId} Source: https://upstash.com/docs/workflow/api-refence/dlq/get-failed-workflow-run Retrieves the full details of a specific failed workflow run from the Dead Letter Queue using its unique DLQ ID. ```APIDOC ## GET /v2/workflows/dlq/{dlqId} ### Description Get details of a specific failed workflow run from the DLQ. ### Method GET ### Endpoint /v2/workflows/dlq/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the failed workflow run. ### Request Example GET /v2/workflows/dlq/msg_12345 ### Response #### Success Response (200) - **dlqId** (string) - The DLQ ID of the failed workflow message. - **workflowUrl** (string) - The URL of the workflow. - **workflowRunId** (string) - The ID of the workflow run. - **responseStatus** (integer) - The HTTP status code received from the destination API. #### Response Example { "dlqId": "msg_12345", "workflowUrl": "https://api.example.com/workflow", "workflowRunId": "run_abc", "responseStatus": 500 } ``` -------------------------------- ### Initialize Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client Initializes a new instance of the Upstash Workflow Client using QStash credentials. This client is stateless and can be reused throughout the application. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ baseUrl: process.env.QSTASH_URL!, token: process.env.QSTASH_TOKEN! }); ``` -------------------------------- ### GET /v2/waiters/{eventId} Source: https://upstash.com/docs/workflow/api-refence/notify/list-waiters Lists all active waiters for a specific event ID. This endpoint returns information about all workflow runs that are currently waiting for the specified event. ```APIDOC ## GET /v2/waiters/{eventId} ### Description List all active waiters for a specific event ID. Returns information about all workflow runs that are currently waiting for the specified event. ### Method GET ### Endpoint /v2/waiters/{eventId} ### Parameters #### Path Parameters - **eventId** (string) - Required - The event ID to list waiters for. #### Query Parameters None #### Request Body None ### Request Example None ### Response #### Success Response (200) - **url** (string) - The URL that is waiting for the event notification. - **headers** (object) - The HTTP headers to send with the notification. - **deadline** (integer) - The Unix timestamp in seconds when the wait operation times out. - **timeoutBody** (string) - The body to send if the wait times out. - **timeoutUrl** (string) - The URL to call if the wait times out. - **timeoutHeaders** (object) - The HTTP headers to send with the timeout callback. #### Response Example ```json [ { "url": "https://example.com/callback", "headers": { "Content-Type": ["application/json"] }, "deadline": 1678886400, "timeoutBody": "{\"status\": \"timeout\"}", "timeoutUrl": "https://example.com/timeout", "timeoutHeaders": { "X-Custom-Header": ["timeout-value"] } } ] ``` #### Error Response (400) - **error** (string) - Bad Request - Invalid event ID #### Error Response (401) - **error** (string) - Unauthorized #### Error Response (500) - **error** (string) - Internal Server Error ``` -------------------------------- ### Pause Workflow Until Specific Time Source: https://upstash.com/docs/workflow/basics/context/sleepUntil Demonstrates how to use context.sleepUntil() to pause a workflow until a calculated future date. The example shows calculating a date one week from the current time and then scheduling the workflow to resume at that point. It includes necessary imports and function calls for both TypeScript and Python. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail} from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // 👇 Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // 👇 Sleep until the calculated date await context.sleepUntil("wait-for-one-week", oneWeekFromNow); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` ```python from fastapi import FastAPI from datetime import datetime, timedelta from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Calculate the date for one week from now one_week_from_now = datetime.now() + timedelta(days=7) # 👇 Wait until the calculated date await context.sleep_until("wait-for-one-week", one_week_from_now) async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ``` -------------------------------- ### Tool Implementation with context.run Source: https://upstash.com/docs/workflow/integrations/aisdk Illustrates how to implement a tool's `execute` function within an Upstash Workflow. It emphasizes wrapping the tool's logic in `context.run()` with a descriptive name for better tracking and adhering to the standard error handling pattern. ```typescript execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) ``` -------------------------------- ### List Flow Control Keys (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/flow-control/list-flow-control-keys This snippet provides the OpenAPI 3.1.0 specification for the Upstash Workflow REST API endpoint used to list Flow Control keys. It details the request method (GET), path (/v2/flowControl), and the structure of the expected JSON response, including the schema for FlowControlKey objects. Authentication methods are also described. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/flowControl: get: tags: - Flow Control summary: List Flow Control Keys description: List all Flow Control keys responses: '200': description: Flow control keys retrieved successfully content: application/json: schema: type: array items: $ref: '#/components/schemas/FlowControlKey' components: schemas: FlowControlKey: type: object properties: flowControlKey: type: string description: The flow control key name waitlistSize: type: integer description: The number of messages waiting due to flow control configuration. securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### GET /v2/flowControl - List Flow Control Keys Source: https://upstash.com/docs/workflow/api-refence/flow-control/list-flow-control-keys Retrieves a list of all configured Flow Control keys. This is useful for monitoring and managing message flow within your Upstash workflows. ```APIDOC ## GET /v2/flowControl ### Description List all Flow Control keys. This endpoint allows you to retrieve a list of all active flow control keys, which are used to manage the rate of message processing in your workflows. ### Method GET ### Endpoint /v2/flowControl ### Parameters #### Query Parameters - **qstash_token** (string) - Required - QStash authentication token passed as a query parameter. ### Request Example ```json { "example": "No request body needed for GET request" } ``` ### Response #### Success Response (200) - **flowControlKey** (string) - The flow control key name. - **waitlistSize** (integer) - The number of messages waiting due to flow control configuration. #### Response Example ```json [ { "flowControlKey": "my-key-1", "waitlistSize": 10 }, { "flowControlKey": "my-key-2", "waitlistSize": 5 } ] ``` ``` -------------------------------- ### Initialize Upstash Workflow with base serve method Source: https://upstash.com/docs/workflow/quickstarts/platforms Imports the base serve method to initialize Upstash Workflow on platforms not natively supported. This allows developers to manually configure the workflow handler for their specific environment. ```typescript import { serve } from "@upstash/workflow"; ``` ```python from upstash_workflow import serve, async_serve ``` -------------------------------- ### Create .env File Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Creates an empty .env file in the project root. This file will be used to store environment-specific variables, such as API keys and URLs. ```bash touch .env ``` -------------------------------- ### Configure Workflow Options Source: https://upstash.com/docs/workflow/basics/serve Illustrates how to pass configuration options, such as failure handlers, to the serve function to manage workflow execution behavior. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, // 👇 Workflow options { failureFunction: async ({ ... }) => {} } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### QStash CLI Output and Environment Variables Source: https://upstash.com/docs/workflow/howto/local-development/development-server Displays the typical output from the QStash CLI development server, including the local server URL, authentication tokens, and signing keys. These values are crucial for configuring your application's environment variables to direct QStash requests to the local server. ```plaintext Upstash QStash development server is runnning at A default user has been created for you to authorize your requests. QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= QSTASH_CURRENT_SIGNING_KEY=sig_7RvLjqfZBvP5KEUimQCE1pvpLuou QSTASH_NEXT_SIGNING_KEY=sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE Sample cURL request: curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" Check out documentation for more details: https://upstash.com/docs/qstash/howto/local-development ``` ```env QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7RvLjqfZBvP5KEUimQCE1pvpLuou" QSTASH_NEXT_SIGNING_KEY="sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE" ``` -------------------------------- ### Handling Stateful Resources Source: https://upstash.com/docs/workflow/basics/context/run Explains how to handle stateful resources by returning plain data and rehydrating class instances if necessary. ```APIDOC ## Workflow Execution with Stateful Resources ### Description When using `context.run()`, results are JSON-serialized. Avoid returning stateful resources like database connections. If class instances are returned, they are restored as plain objects. Instance methods are not available unless the object is rehydrated using `Object.assign()` or a similar method. ### Method N/A (Applies to any `context.run()` usage) ### Endpoint N/A ### Parameters N/A ### Request Example ```json { "stepName": "step-1", "stepFunction": "async () => { return new User('John Doe', 'john.doe@example.com'); }" } ``` ### Response #### Success Response (200) - **user** (object) - A plain object representing the User instance. #### Response Example ```json { "name": "John Doe", "email": "john.doe@example.com" } ``` ### Rehydration Example ```typescript // Assuming 'user' is the plain object returned from context.run() const userInstance = Object.assign(new User(), user); console.log(userInstance.greet()); // Now instance methods are available ``` ``` -------------------------------- ### Configure Migration Mode Source: https://upstash.com/docs/workflow/howto/multi-region Sets up environment variables to support migration between regions. This configuration allows the SDK to handle requests from multiple regions simultaneously by defining primary and secondary regional credentials. ```bash QSTASH_REGION="US_EAST_1" US_EAST_1_QSTASH_URL="https://qstash-us-east-1.upstash.io" US_EAST_1_QSTASH_TOKEN="your_us_token" US_EAST_1_QSTASH_CURRENT_SIGNING_KEY="your_us_current_key" US_EAST_1_QSTASH_NEXT_SIGNING_KEY="your_us_next_key" EU_CENTRAL_1_QSTASH_URL="https://qstash-eu-central-1.upstash.io" EU_CENTRAL_1_QSTASH_TOKEN="your_eu_token" EU_CENTRAL_1_QSTASH_CURRENT_SIGNING_KEY="your_eu_current_key" EU_CENTRAL_1_QSTASH_NEXT_SIGNING_KEY="your_eu_next_key" ``` -------------------------------- ### Implement Idempotent Workflow Steps Source: https://upstash.com/docs/workflow/basics/caveats Demonstrates how to structure workflow steps using 'context.run' to ensure that business logic remains idempotent even if the workflow is retried due to network or system failures. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { return someWork(input) }) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: return await some_work(input) await context.run("step-1", _step_1) ``` -------------------------------- ### OpenAI Compatible Provider Integration Source: https://upstash.com/docs/workflow/integrations/openai This demonstrates how to use the `context.api.openai.call` method to interact with OpenAI-compatible API providers by specifying a custom `baseURL`. ```APIDOC ## OpenAI Compatible Provider Integration ### Description This section explains how to connect to OpenAI-compatible API endpoints by providing a custom `baseURL` to the `context.api.openai.call` method. This allows you to use services like Deepseek. ### Method POST ### Endpoint (Depends on the compatible provider, e.g., /v1/chat/completions for Deepseek) ### Parameters #### Request Body - **baseURL** (string) - Required - The base URL of the OpenAI-compatible API provider (e.g., "https://api.deepseek.com"). - **token** (string) - Required - Your API key for the compatible provider. - **operation** (string) - Required - The specific API operation to perform (e.g., "chat.completions.create"). - **body** (object) - Required - The request payload, similar to the standard OpenAI API. ### Request Example ```json { "baseURL": "https://api.deepseek.com", "token": "", "operation": "chat.completions.create", "body": { "model": "deepseek-chat", "messages": [ { "role": "system", "content": "Assistant says 'hello!'" }, { "role": "user", "content": "User shouts back 'hi!'" } ] } } ``` ### Response #### Success Response (200) (Structure depends on the compatible provider, typically similar to OpenAI's response) #### Response Example ```json { "content": [ { "text": "Hello from Deepseek!" } ] } ``` ``` -------------------------------- ### Execute Workflow Steps Sequentially Source: https://upstash.com/docs/workflow/basics/context/run Demonstrates how to run workflow steps one after another by awaiting them individually. This pattern ensures that the result of one step can be passed into the subsequent step. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const input = context.requestPayload; const result1 = await context.run("step-1", async () => { return someWork(input); }); await context.run("step-2", async () => { someOtherWork(result1); }); }); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1(): return some_work(input) result1 = await context.run("step-1", _step1) async def _step2(): return some_other_work(result1) await context.run("step-2", _step2) ``` -------------------------------- ### POST /v2/flowControl/{flowControlKey}/resetRate Source: https://upstash.com/docs/workflow/api-refence/flow-control/reset-rate-for-flow-control-key Resets the rate configuration state for a specific flow-control key. This allows immediate resumption of message delivery by clearing the current rate count and starting a new rate period. ```APIDOC ## POST /v2/flowControl/{flowControlKey}/resetRate ### Description Resets the rate configuration state for a specific flow-control key. This allows immediate resumption of message delivery by clearing the current rate count and starting a new rate period. ### Method POST ### Endpoint /v2/flowControl/{flowControlKey}/resetRate ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The flow-control key for which the rate state will be reset. #### Query Parameters None #### Request Body None ### Request Example None ### Response #### Success Response (200) Description: The rate state for the flow-control key has been reset. #### Response Example None #### Error Response (400) - **error** (string) - Bad request. Returned when the flow-control key is not provided. #### Error Response (500) - **error** (string) - Internal server error. Returned when an unexpected error occurs. ``` -------------------------------- ### Verify Stock Availability (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This snippet demonstrates how to create an order ID and check stock availability for items in an order. If any items are out of stock, a warning is logged, and the process is halted. This ensures that only available items proceed to the next stage. ```typescript const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } ``` ```python async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ``` -------------------------------- ### Sequential Step Execution with Context.run Source: https://upstash.com/docs/workflow/examples/dynamicWorkflow Illustrates the core logic for executing steps sequentially within an Upstash Workflow. Each step is wrapped in `context.run` to ensure state persistence, exactly-once execution, and safe retries or resumes. ```typescript let lastResult = 0 for (let i = 0; i < steps.length; i++) { const stepName = steps[i] lastResult = await context.run( `step-${i}:${stepName}`, async () => { const fn = functions[stepName] if (!fn) throw new WorkflowNonRetryableError("Unknown step") return fn(lastResult) } ) } ``` -------------------------------- ### Configure Single-Region Mode Source: https://upstash.com/docs/workflow/howto/multi-region Sets up the environment variables for operating in a single region (EU). This is the default mode when QSTASH_REGION is not defined. ```bash QSTASH_URL="https://qstash.upstash.io" QSTASH_TOKEN="your_eu_token" QSTASH_CURRENT_SIGNING_KEY="your_eu_current_key" QSTASH_NEXT_SIGNING_KEY="your_eu_next_key" ``` -------------------------------- ### Pause workflow execution with context.waitForEvent Source: https://upstash.com/docs/workflow/basics/context/waitForEvent This example demonstrates how to use context.waitForEvent within an Upstash Workflow to pause execution until a specific event is received or a timeout occurs. It highlights the usage of stepId, eventId, and a custom timeout duration. ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { eventData, timeout, } = await context.waitForEvent("wait for some event", "my-event-id", { timeout: "1000s", // 1000 second timeout }); }); ``` -------------------------------- ### Define Custom Tools for Workflow and AI SDK Source: https://upstash.com/docs/workflow/agents/features Demonstrates how to create custom mathematical tools using the WorkflowTool class and the AI SDK tool definition. Both approaches utilize Zod for schema validation and mathjs for evaluation. ```typescript import { WorkflowTool } from '@upstash/workflow' import { z } from 'zod' import * as mathjs from 'mathjs' const tool = new WorkflowTool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", schema: z.object({ expression: z.string() }), invoke: async ({ expression }) => mathjs.evaluate(expression), }) ``` ```typescript import { z } from 'zod' import { tool } from 'ai' import * as mathjs from 'mathjs' const mathTool = tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }) ``` -------------------------------- ### Data Preparation API Source: https://upstash.com/docs/workflow/examples/allInOne This section details the process of retrieving a dataset URL and downloading the dataset content. It highlights the use of `context.call` for making HTTP requests that can exceed standard serverless execution limits. ```APIDOC ## GET /api/workflow/dataset ### Description Retrieves the dataset URL and downloads the dataset content. Uses `context.call` for potentially long-running HTTP requests. ### Method GET ### Endpoint /api/workflow/dataset ### Parameters #### Query Parameters - **datasetId** (string) - Required - The ID of the dataset to retrieve. ### Request Example ```json { "datasetId": "your_dataset_id" } ``` ### Response #### Success Response (200) - **dataset** (object) - The downloaded dataset content. #### Response Example ```json { "dataset": { "key1": "value1", "key2": "value2" } } ``` ``` -------------------------------- ### Creating Custom Middleware with Direct Callbacks Source: https://upstash.com/docs/workflow/howto/middlewares Shows how to create a custom middleware using direct callbacks for lifecycle and debug events. ```APIDOC ## Creating Custom Middleware with Direct Callbacks ### Description This section details how to create a custom middleware by providing callbacks directly to the `WorkflowMiddleware` constructor. These callbacks can hook into various lifecycle and debug events of the workflow. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters None #### Request Body (Workflow definition and execution payload) ### Request Example ```json { "workflow": "your_workflow_definition", "payload": {} } ``` ### Response #### Success Response (200) - **result** (any) - The result of the workflow execution. #### Response Example ```json { "result": "Workflow completed successfully" } ``` ``` -------------------------------- ### List Active Waiters via OpenAPI Source: https://upstash.com/docs/workflow/api-refence/notify/list-waiters This OpenAPI specification defines the GET /v2/waiters/{eventId} endpoint. It requires a valid Bearer token and returns an array of Waiter objects representing workflow runs awaiting a specific event. ```yaml /v2/waiters/{eventId}: get: tags: - Notify summary: List Waiters description: List all active waiters for a specific event ID. parameters: - in: path name: eventId required: true schema: type: string description: The event ID to list waiters for. responses: '200': description: List of active waiters content: application/json: schema: type: array items: $ref: '#/components/schemas/Waiter' ``` -------------------------------- ### Deploy Workflow to Production Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Deploys the Cloudflare Worker project to the production environment using the Wrangler CLI. ```bash wrangler deploy ``` -------------------------------- ### Serve Workflow in Next.js and FastAPI Source: https://upstash.com/docs/workflow/basics/context Demonstrates how to serve an Upstash workflow using the 'serve' function in both Next.js (TypeScript) and FastAPI (Python). This sets up the entry point for your workflow, providing the context object to your route function. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( // 👇 the workflow context async (context) => { // ... } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Execute Workflow Steps in Parallel Source: https://upstash.com/docs/workflow/basics/context/run Demonstrates how to trigger multiple workflow steps simultaneously using Promise.all. This improves performance by allowing independent tasks to run concurrently. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { const input = context.requestPayload; const promise1 = context.run("step-1", async () => { return someWork(input); }); const promise2 = context.run("step-2", async () => { return someOtherWork(input); }); await Promise.all([promise1, promise2]); }, ); ``` -------------------------------- ### Execute Workflow Step Source: https://upstash.com/docs/workflow/examples/customerOnboarding Demonstrates how to wrap a function execution within the workflow context to ensure durability and state management. ```typescript await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` ```python async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` -------------------------------- ### Restarting Workflow Runs from DLQ Source: https://upstash.com/docs/workflow/basics/client/dlq/restart Demonstrates how to restart a single workflow run using a specific DLQ ID with flow control, and how to restart multiple workflow runs simultaneously. ```typescript const { messages } = await client.dlq.list(); const response = await client.dlq.restart({ dlqId: messages[0].dlqId, flowControl: { key: "my-flow-control-key", parallelism: 10, }, retries: 3, }); ``` ```typescript const responses = await client.dlq.restart({ dlqId: ["dlq-12345", "dlq-67890"], retries: 5, }); ``` -------------------------------- ### Run Upstash Workflow Server Locally Source: https://upstash.com/docs/workflow/sdk/workflow-py This snippet outlines the steps to run the Upstash Workflow server locally. It involves setting up a local tunnel using `ngrok`, exporting the tunnel URL as an environment variable, sourcing a `.env` file, and finally running the FastAPI application with `uvicorn`. ```bash # Create the tunnel and set the UPSTASH_WORKFLOW_URL environment variable in the .env file: ngrok http localhost:8000 export UPSTASH_WORKFLOW_URL= # Set the environment variables: source .env # Finally, run the server: uvicorn main:app --reload ``` -------------------------------- ### Workflow Middleware Implementation Source: https://upstash.com/docs/workflow/howto/middlewares Explains how to create custom middleware using WorkflowMiddleware and how to register multiple middlewares within the workflow serve function. ```APIDOC ## Workflow Middleware ### Description Middlewares allow for cross-cutting concerns like error tracking, logging, or performance monitoring. They are executed in the order provided in the array. ### Request Body - **name** (string) - Required - The identifier for the middleware. - **callbacks** (object) - Required - An object containing event handlers like onError. ### Implementation Example ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const errorTrackingMiddleware = new WorkflowMiddleware({ name: "error-tracking", callbacks: { onError: async ({ workflowRunId, error }) => { // Logic to send errors to monitoring service } } }); // Usage with serve export const { POST } = serve( async (context) => { /* logic */ }, { middlewares: [loggingMiddleware, errorTrackingMiddleware] } ); ``` ``` -------------------------------- ### Create Custom Middleware with Callbacks Source: https://upstash.com/docs/workflow/howto/middlewares Shows how to implement a custom middleware using the WorkflowMiddleware class and direct callbacks to handle lifecycle and debug events. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const customMiddleware = new WorkflowMiddleware({ name: "custom-logger", callbacks: { runStarted: async ({ context }) => { console.log(`Workflow ${context.workflowRunId} started`); }, beforeExecution: async ({ context, stepName }) => { console.log(`Executing step: ${stepName}`); }, afterExecution: async ({ context, stepName, result }) => { console.log(`Step ${stepName} completed with result:`, result); }, runCompleted: async ({ context, result }) => { console.log(`Workflow ${context.workflowRunId} completed:`, result); }, onError: async ({ workflowRunId, error }) => { console.error(`Error in ${workflowRunId}:`, error); }, onWarning: async ({ workflowRunId, warning }) => { console.warn(`Warning in ${workflowRunId}:`, warning); }, onInfo: async ({ workflowRunId, info }) => { console.info(`Info from ${workflowRunId}:`, info); } } }); ``` -------------------------------- ### Create New User in Stripe (Python & TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook This snippet illustrates the process of creating a new user record within the Stripe payment platform. It highlights the asynchronous nature of the operation and its role in setting up payment-related functionalities for a new user. This is often performed after initial user registration. ```python async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) ``` -------------------------------- ### Send Welcome Email in Workflow Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a welcome email to a user upon registration. The email sending process is wrapped in a workflow run step. ```typescript await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` ```python async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### Set Environment Variables Source: https://upstash.com/docs/workflow/agents/getting-started Configures essential environment variables in the .env.local file for Upstash Workflow and OpenAI integration. This includes QStash URL and token, as well as the OpenAI API key. ```txt QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" OPENAI_API_KEY= ``` -------------------------------- ### Call OpenAI Compatible Provider Source: https://upstash.com/docs/workflow/integrations/openai Illustrates how to connect to an OpenAI-compatible API provider, such as Deepseek, by specifying the `baseURL` parameter within the `context.api.openai.call` method. This allows you to use the same SDK methods for different compatible services. ```typescript const { status, body } = await context.api.openai.call( "Call Deepseek", { baseURL: "https://api.deepseek.com", token: process.env.DEEPSEEK_API_KEY, operation: "chat.completions.create", body: { model: "deepseek-chat", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); ``` -------------------------------- ### context.run() - Parallel Execution (TypeScript) Source: https://upstash.com/docs/workflow/basics/context/run Illustrates parallel execution of workflow steps using Promise.all() with context.run(). Multiple steps can run concurrently. ```APIDOC ## POST /api/example ### Description Executes workflow steps in parallel using Promise.all(). Multiple steps run concurrently, and the workflow waits for all to complete. ### Method POST ### Endpoint /api/example ### Parameters #### Request Body - **input** (string) - Required - The input payload for the workflow. ### Request Example ```json { "input": "some data" } ``` ### Response #### Success Response (200) - **results** (array) - An array containing the results from all parallel steps. #### Response Example ```json { "results": [ "result from step 1", "result from step 2" ] } ``` ``` -------------------------------- ### Authentication Schemes Source: https://upstash.com/docs/workflow/api-refence/notify/notify-workflow-run-event Overview of the available security schemes for authenticating requests to QStash. ```APIDOC ## Authentication Overview ### Description QStash supports two primary methods for authenticating API requests. You can provide a JWT token via the Authorization header or pass the token as a query parameter. ### Security Schemes #### Bearer Authentication (Header) - **Type**: http - **Scheme**: bearer - **Bearer Format**: JWT - **Description**: QStash authentication token passed in the `Authorization: Bearer ` header. #### API Key (Query Parameter) - **Type**: apiKey - **In**: query - **Name**: qstash_token - **Description**: QStash authentication token passed as a query parameter `?qstash_token=`. ``` -------------------------------- ### Create a Webhook in Upstash Workflow Source: https://upstash.com/docs/workflow/basics/context/createWebhook This snippet demonstrates how to initialize a workflow using the Upstash SDK and invoke the createWebhook method. The resulting webhook URL is logged to the console for use by external services. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const webhook = await context.createWebhook("create webhook"); console.log(webhook.webhookUrl); // Use this URL with external services }); ``` -------------------------------- ### Inject Custom QStash Client Source: https://upstash.com/docs/workflow/basics/serve/advanced Provide a custom QStash client instance to the workflow. This is useful for managing multiple QStash projects within a single application. ```typescript import { Client } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { qstashClient: new Client({ token: "" }) } ); ``` ```python from qstash import AsyncQStash @serve.post("/api/example", qstash_client=AsyncQStash(os.environ["QSTASH_TOKEN"])) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### TypeScript Orchestrator-Workers Workflow Source: https://upstash.com/docs/workflow/agents/patterns/orchestrator-workers This TypeScript code defines an Upstash Workflow that uses an orchestrator to manage three worker agents. Each worker is specialized in a different area of physics (general physics, quantum mechanics, relativity) and uses the WikipediaQueryRun tool. The orchestrator then synthesizes the results from these workers to create a Q&A document. This example is designed for use with Next.js. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-4o'); // Worker agents const worker1 = agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ``` -------------------------------- ### Generate Text with OpenAI and Tools in Upstash Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk This advanced snippet demonstrates using the OpenAI client with tools within an Upstash Workflow. Each tool execution is wrapped in a workflow step, and the `maxSteps` parameter is set to allow for tool processing. It includes a 'weather' tool that returns mock weather data based on a location. ```typescript import { z } from 'zod'; import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError, tool } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) }), }, maxSteps: 2, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` -------------------------------- ### POST /workflow/resume Source: https://upstash.com/docs/workflow/api-refence/dlq/bulk-resume-workflows-from-dlq Resumes workflow runs with optional configuration overrides provided via headers. ```APIDOC ## POST /workflow/resume ### Description Resumes workflow runs. Allows overriding workflow parameters such as retry delays, flow control, labels, and failure callbacks using request headers. ### Method POST ### Endpoint /workflow/resume ### Parameters #### Header Parameters - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key. - **Upstash-Flow-Control-Value** (string) - Optional - Override flow control config (parallelism, rate, period). - **Upstash-Label** (string) - Optional - Override the label for remaining steps. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL. ### Request Example { "headers": { "Upstash-Retry-Delay": "5m", "Upstash-Label": "resumed-run" } } ### Response #### Success Response (200) - **workflowRuns** (array) - List of resumed workflow run objects. - **cursor** (string) - Pagination cursor for subsequent requests. #### Response Example { "workflowRuns": [ { "workflowRunId": "wr_12345", "workflowCreatedAt": 1715678900000 } ], "cursor": "next_page_token" } ``` -------------------------------- ### Define a Workflow with Steps Source: https://upstash.com/docs/workflow/features/flow-control This TypeScript code defines a basic Upstash Workflow with three sequential steps: 'step-1', 'step-2', and 'step-3'. The `serve` function is used to define the workflow's entry point and its payload structure. The `context.run` method executes each step sequentially. ```typescript export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload await context.run("step-1", () => { ... }); await context.run("step-2", () => { ... }); await context.run("step-3", () => { ... }); }) ``` -------------------------------- ### Workflow Step Execution with Flow Control Source: https://upstash.com/docs/workflow/features/flow-control Illustrates how workflow steps are executed under flow control, including queuing and adherence to parallelism and rate limits. ```APIDOC ## Workflow Step Execution with Flow Control ### Description When a workflow is triggered with flow control, its steps are managed according to the defined `parallelism`, `rate`, and `period` limits. Steps exceeding these limits are automatically queued and executed once resources become available. This ensures that external services are not overloaded and API rate limits are respected. Note that flow control is applied at the workflow run level and inherited by all its steps, with exceptions for `context.call` and `context.invoke`. ### Method N/A (Internal execution) ### Endpoint N/A (Internal execution) ### Parameters N/A ### Request Example ```typescript export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload // This step will adhere to the workflow's flow control limits await context.run("step-1", () => { ... }); // This step will also adhere to the workflow's flow control limits await context.run("step-2", () => { ... }); // This step will also adhere to the workflow's flow control limits await context.run("step-3", () => { ... }); }) ``` ### Response N/A (Internal execution) ### Notes - Flow control configuration is applied when triggering a workflow run. - `context.call` and `context.invoke` steps can have their own independent flow control configurations. - To throttle specific `context.run` steps, consider extracting them into separate workflows and invoking them via `context.invoke` with custom flow control. - Changes to flow control settings in a new deployment do not affect already running workflow instances; they retain their original configuration. ``` -------------------------------- ### Configure Local Environment Variables Source: https://upstash.com/docs/workflow/howto/local-development Sets the required environment variables to redirect Upstash SDK requests from the production service to the local development server. ```bash QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7RvLjqfZBvP5KEUimQCE1pvpLuou" QSTASH_NEXT_SIGNING_KEY="sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE" ``` -------------------------------- ### Configure OpenAI Compatible Providers Source: https://upstash.com/docs/workflow/agents/features Connect to OpenAI compatible providers by specifying a custom baseURL and API key. ```typescript const model = agents.openai('deepseek-chat', { baseURL: "https://api.deepseek.com", apiKey: process.env.DEEPSEEK_API_KEY }) ``` -------------------------------- ### Verify Production Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/hono After deploying your application, use this cURL command to send a POST request to your production workflow endpoint. This helps verify that the endpoint is accessible and functioning as expected. Replace '' with your actual production URL. ```bash curl -X POST https:///workflow ``` -------------------------------- ### Configure Local QStash Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Sets environment variables for QStash URL and Token when using the local QStash CLI. These variables are typically added to the .env file. ```bash export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN="" ``` -------------------------------- ### Verify Production Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/astro Sends a POST request to the production deployment URL to verify that the workflow endpoint is correctly configured and accessible. ```bash curl -X POST /api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Migrate onStepFinish to WorkflowMiddleware Source: https://upstash.com/docs/workflow/howto/migrations Replaces the deprecated onStepFinish callback with the new WorkflowMiddleware pattern. This allows for more modular and extensible step lifecycle management. ```typescript // Old export const { POST } = serve( async (context) => { ... }, { onStepFinish: (stepName, result) => { console.log(`Step ${stepName} finished with:`, result); } } ); // New import { WorkflowMiddleware } from "@upstash/workflow"; const stepFinishMiddleware = new WorkflowMiddleware({ name: "step-finish", callbacks: { afterExecution: async ({ stepName, result }) => { console.log(`Step ${stepName} finished with:`, result); } } }); export const { POST } = serve( async (context) => { ... }, { middlewares: [stepFinishMiddleware] } ); ``` -------------------------------- ### Update Serve Method Imports and Exports Source: https://upstash.com/docs/workflow/howto/migrations Updates the import path from @upstash/qstash to @upstash/workflow and adjusts the export pattern for Next.js serve methods. ```typescript // old import { serve } from "@upstash/qstash/nextjs" // new import { serve } from "@upstash/workflow/nextjs" // old export const POST = serve(...); // new export const { POST } = serve(...); ``` -------------------------------- ### Using context.cancel() Source: https://upstash.com/docs/workflow/features/retries/prevent-retries Shows how to use `context.cancel()` to gracefully stop a workflow run. This marks the run as canceled, bypassing the failure handler and DLQ. ```APIDOC ## POST /graceful-cancellation ### Description This endpoint checks an order status. If the order is 'cancelled', it uses `context.cancel()` to stop the workflow gracefully without marking it as failed. ### Method POST ### Endpoint /graceful-cancellation ### Parameters #### Request Body - **orderId** (string) - Required - The ID of the order to check. ### Request Example ```json { "orderId": "12345" } ``` ### Response #### Success Response (200) - **message** (string) - Indicates the order was processed or gracefully cancelled. #### Response Example ```json { "message": "Order processed successfully" } ``` ``` -------------------------------- ### Update SDK Dependencies Source: https://upstash.com/docs/workflow/howto/multi-region Command to update the Upstash Workflow and QStash SDKs to the latest versions required for migration support. ```bash npm install @upstash/workflow@latest @upstash/qstash@latest ``` -------------------------------- ### Define a Workflow Endpoint with FastAPI Source: https://upstash.com/docs/workflow/sdk/workflow-py This Python code demonstrates how to define a workflow endpoint using the Upstash Workflow SDK and FastAPI. It includes setting up the FastAPI app, initializing the `Serve` class, defining a mock function, and creating an asynchronous endpoint that utilizes `context.run` for executing workflow steps. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) # mock function def some_work(input: str) -> str: return f"processed '{input}'" # serve endpoint which expects a string payload: @serve.post("/example") async def example(context: AsyncWorkflowContext[str]) -> None: # get request body: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output # run the first step: result: str = await context.run("step1", _step1) async def _step2() -> None: output = some_work(result) print("step 2 input", result, "output", output) # run the second step: await context.run("step2", _step2) ``` -------------------------------- ### Integrate LangChain and Agentic Tools Source: https://upstash.com/docs/workflow/agents/features Shows how to leverage existing LangChain tools and Agentic toolkits to extend agent capabilities. This includes using DynamicStructuredTool for custom logic and pre-built community tools like Wikipedia. ```typescript import { createAISDKTools } from '@agentic/ai-sdk' import { WeatherClient } from '@agentic/weather' const weather = new WeatherClient() const tools = createAISDKTools(weather) ``` ```typescript import { DynamicStructuredTool } from "@langchain/core/tools"; const numberGenerator = new DynamicStructuredTool({ name: "random-number-generator", description: "generates a random number between two input numbers", schema: z.object({ low: z.number().describe("The lower bound of the generated number"), high: z.number().describe("The upper bound of the generated number"), }), func: async ({ low, high }) => (Math.random() * (high - low) + low).toString(), }) ``` ```typescript import { WikipediaQueryRun } from '@langchain/community/tools/wikipedia_query_run' const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) ``` -------------------------------- ### Trigger Workflow Run Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Shows how to initiate a workflow run using the Upstash Workflow client. Includes a pattern for dynamically setting the base URL for local and production environments. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: process.env.QSTASH_TOKEN! }) const { workflowRunId } = await client.trigger({ url: `http://localhost:3000/api/workflow`, retries: 3 }); ``` ```javascript const BASE_URL = process.env.VERCEL_URL ? `https://${process.env.VERCEL_URL}` : `http://localhost:3000` const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, retries: 3 }); ``` -------------------------------- ### POST /api/workflow (Production Deployment) Source: https://upstash.com/docs/workflow/quickstarts/astro Verifies the workflow endpoint accessibility in a production environment by making a POST request. ```APIDOC ## POST /api/workflow (Production) ### Description Verifies that the workflow endpoint is accessible after deployment to a production environment. Replace `` with your actual production URL. ### Method POST ### Endpoint `/api/workflow` ### Parameters #### Request Body - **message** (string) - Required - A sample message payload to send with the workflow trigger. ### Request Example ```bash curl -X POST /api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` ### Response #### Success Response (200) - **workflowRunId** (string) - The unique identifier for the triggered workflow run. #### Response Example ```json { "workflowRunId": "wfr_xxxxxx" } ``` ``` -------------------------------- ### Configure Workflow Run Options Source: https://upstash.com/docs/workflow/howto/configure Demonstrates how to trigger a workflow run with specific configurations for retries, retry delay, and flow control using the Upstash Workflow client. The configuration options are passed as arguments to the client.trigger method. ```typescript import { Client } from "@upstash/workflow"; const client = Client() const { workflowRunId } = await client.trigger({ url: `http://localhost:3000/api/workflow`, retries: 3, retryDelay: "(1 + retries) * 1000", flowControl: { key: "limit-ads", rate: 1, parallelism: 10 } }); ``` -------------------------------- ### Create Stripe Customer from Webhook Data Source: https://upstash.com/docs/workflow/howto/use-webhooks This snippet illustrates how to create a Stripe customer record after successfully processing webhook event data. It uses the `context.run` method to perform the customer creation asynchronously, passing the user's email, name, and ID for integration. This step is typically performed after initial data validation and extraction. ```TypeScript export const { POST } = serve(async (context) => { // ... Previous validation and user data extraction if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); ``` ```Python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Previous validation and user data extraction if not user: return async def _create_stripe_customer(): return await stripe.customers.create( email=user["email"], name=f"{user['first_name']} {user['last_name']}", metadata={\"user_id\": user["user_id"]}, ) customer = await context.run("create-stripe-customer", _create_stripe_customer) # ... Additional steps ``` -------------------------------- ### Integrate AI SDK Providers Source: https://upstash.com/docs/workflow/agents/features Use external AI SDK providers like Anthropic by importing their creation methods and passing them to the AISDKModel. ```typescript import { createAnthropic } from "@ai-sdk/anthropic"; const model = agents.AISDKModel({ context, provider: createAnthropic, providerParams: { apiKey: "", }, }); ``` -------------------------------- ### Configure Local Tunnel Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Sets environment variables for QStash Token and Upstash Workflow URL when using a local tunnel. The UPSTASH_WORKFLOW_URL should be the public URL provided by the local tunnel service. ```bash export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### Configure Upstash Realtime instance Source: https://upstash.com/docs/workflow/howto/realtime/basic Defines the event schema and initializes the Realtime client using Upstash Redis. This serves as the central configuration for event types. ```typescript import { InferRealtimeEvents, Realtime } from "@upstash/realtime"; import { Redis } from "@upstash/redis"; import z from "zod/v4"; const redis = Redis.fromEnv(); const schema = { workflow: { runFinish: z.object({}), stepFinish: z.object({ stepName: z.string(), result: z.unknown().optional(), }), }, }; export const realtime = new Realtime({ schema, redis }); export type RealtimeEvents = InferRealtimeEvents; ``` -------------------------------- ### Configure QStash Client with Vercel Bypass Header Source: https://upstash.com/docs/workflow/troubleshooting/vercel This TypeScript code snippet demonstrates how to configure the QStash client within a Next.js workflow serve function to include the Vercel automation bypass secret in the headers. This ensures that all subsequent calls within the workflow, including context.invoke, automatically include the necessary header to bypass Vercel's deployment protection. ```typescript import { Client } from '@upstash/qstash' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { // your workflow logic }, { qstashClient: new Client({ token: process.env.QSTASH_TOKEN!, headers: { "x-vercel-protection-bypass": process.env.VERCEL_AUTOMATION_BYPASS_SECRET! } }) }) ``` -------------------------------- ### Execute Business Logic in context.run Source: https://upstash.com/docs/workflow/basics/caveats Demonstrates how to wrap business logic inside context.run to ensure correct execution across multiple workflow endpoint calls. Code outside of context.run is executed on every invocation, while code inside runs only when the specific step is triggered. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ``` -------------------------------- ### Rehydrating Class Instances in Workflow Steps Source: https://upstash.com/docs/workflow/basics/context/run Shows how to handle class instances returned from steps, which are restored as plain objects. Use Object.assign to restore instance methods after retrieval. ```typescript export const { POST } = serve( async (context) => { let user = await context.run("step-1", async () => { return new User("John Doe", "john.doe@example.com"); }); console.log(user.name) user = Object.assign(new User(), user); await context.run("greet", async () => { console.log(user.greet()); }); } ); ``` -------------------------------- ### Call OpenAI Chat Completions Source: https://upstash.com/docs/workflow/integrations/openai Demonstrates how to call the OpenAI chat completions endpoint using the `context.api.openai.call` method. This method provides type safety for request and response bodies. It requires an OpenAI API key and specifies the model and messages for the chat. ```typescript const { status, body } = await context.api.openai.call( "Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); // get text: console.log(body.content[0].text) ``` -------------------------------- ### Using Conditional Execution Source: https://upstash.com/docs/workflow/features/retries/prevent-retries Explains how to use guard conditions to skip steps and exit early from a workflow without errors. The workflow completes successfully in this scenario. ```APIDOC ## POST /conditional-execution ### Description This endpoint processes data based on validation. If the data is invalid, it returns early without executing the processing step, completing the workflow successfully. ### Method POST ### Endpoint /conditional-execution ### Parameters #### Request Body - **data** (any) - Required - The data payload to process. ### Request Example ```json { "data": {"key": "value"} } ``` ### Response #### Success Response (200) - **message** (string) - Indicates successful processing or early exit due to validation. #### Response Example ```json { "message": "Data processed successfully" } ``` ``` -------------------------------- ### QStash Query Parameter Authentication Source: https://upstash.com/docs/workflow/api-refence/notify/notify-event Authenticate with QStash by providing a JWT token as a query parameter named `qstash_token`. ```APIDOC ## Query Parameter Authentication ### Description Authenticate requests to QStash by providing a JWT token in the `qstash_token` query parameter. ### Method Any (e.g., GET, POST, PUT, DELETE) ### Endpoint Any QStash endpoint ### Parameters #### Query Parameters - **qstash_token** (string) - Required - The JWT token. ### Request Example ```http GET /some/endpoint?qstash_token=YOUR_JWT_TOKEN HTTP/1.1 Host: qstash.com ``` ### Response #### Success Response (200) - **Data** (object) - The response payload. #### Response Example ```json { "message": "Success" } ``` ``` -------------------------------- ### Execute Parallel Steps with TypeScript Source: https://upstash.com/docs/workflow/features/parallel-steps This snippet demonstrates how to use Promise.all() to run multiple context.run() steps concurrently within an Upstash Workflow. It ensures that inventory checks for coffee beans, cups, and milk happen simultaneously. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (context) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ context.run("check-coffee-beans", () => checkInventory("coffee-beans")), context.run("check-cups", () => checkInventory("cups")), context.run("check-milk", () => checkInventory("milk")), ]); }); ``` -------------------------------- ### Call OpenAI API Source: https://upstash.com/docs/workflow/basics/context/api Demonstrates how to make a chat completion request to the OpenAI API using the `context.api.openai.call` method. It requires an OpenAI API key and specifies the operation and request body, including the model and messages. ```typescript const { status, body } = await context.api.openai.call("Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" }, ], }, }); ``` -------------------------------- ### Handling WorkflowAbort in try/catch blocks Source: https://upstash.com/docs/workflow/troubleshooting/general Demonstrates how to re-throw WorkflowAbort errors when using try/catch blocks to ensure the workflow engine can properly manage step execution state. ```typescript import { WorkflowAbort } from '@upstash/workflow'; try { await context.run( ... ); } catch (error) { if (error instanceof WorkflowAbort) { throw error; } else { // handle other errors } } ``` ```python from upstash_workflow import WorkflowAbort try: await context.run( ... ) except Exception as e: if isinstance(e, WorkflowAbort): raise e else: # handle other errors ``` -------------------------------- ### Persisting requestPayload using context.run Source: https://upstash.com/docs/workflow/troubleshooting/general Provides a workaround to ensure requestPayload is available by wrapping its access in a context.run step. ```typescript export const { POST } = serve(async (context) => { // Payload will never be undefined const payload = await context.run("get payload", () => context.requestPayload) // ... steps or any other code context.call( ... ) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _get_payload() -> str: return context.request_payload # Payload will never be None payload = await context.run("get payload", _get_payload) # ... steps or any other code context.call( ... ) ``` -------------------------------- ### Initialize Workflow Agents Model Source: https://upstash.com/docs/workflow/agents/features Define the agent workflow and initialize an OpenAI model within a Next.js route handler. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo') }) ``` -------------------------------- ### Dynamic Workflow Execution in TypeScript Source: https://upstash.com/docs/workflow/examples/dynamicWorkflow This TypeScript code demonstrates a dynamic workflow that accepts a list of function names to execute in order. It uses `context.run` to execute each step, persisting results and allowing for retries and idempotency. The workflow dynamically resolves and executes functions based on the input payload. ```typescript import { WorkflowNonRetryableError } from "@upstash/workflow" import { serve } from "@upstash/workflow/nextjs" const addOne = (data: number): number => { return data + 1 } const multiplyWithTwo = (data: number): number => { return data * 2 } type FunctionName = "AddOne" | "MultiplyWithTwo" const functions: Record number> = { AddOne: addOne, MultiplyWithTwo: multiplyWithTwo, } interface WorkflowPayload { version: string functions: FunctionName[] } export const { POST } = serve(async (context) => { const { functions: steps } = context.requestPayload let lastResult = 0 for (let i = 0; i < steps.length; i++) { const stepName = steps[i] lastResult = await context.run( `step-${i}:${stepName}`, async () => { const fn = functions[stepName] if (!fn) throw new WorkflowNonRetryableError("Unknown step") return fn(lastResult) } ) } }) ``` -------------------------------- ### Send Confirmation and Notification Emails (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This section covers sending out essential communication to the customer. It includes sending an order confirmation email upon successful order placement and a dispatch notification email once the order has been shipped, enhancing customer experience. ```typescript await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) ``` ```python async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Use Built-in Logging Middleware Source: https://upstash.com/docs/workflow/howto/middlewares Demonstrates how to integrate the built-in logging middleware into an Upstash Workflow serve function to automatically log execution details to the console. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { await context.run("step-1", () => { return "Hello World"; }); }, { middlewares: [loggingMiddleware] } ); ``` -------------------------------- ### POST /v2/workflows/dlq/restart Source: https://upstash.com/docs/workflow/api-refence/dlq/bulk-restart-workflows-from-dlq Restarts multiple failed workflow runs from the DLQ. Supports filtering by various parameters and pagination via cursors. ```APIDOC ## POST /v2/workflows/dlq/restart ### Description Restart multiple failed workflow runs from the DLQ. Each workflow will start from the beginning. Unlike resume, which continues from where workflows failed, restart executes the entire workflows from the first step. New workflow run IDs are generated and all steps will be executed again. ### Method POST ### Endpoint /v2/workflows/dlq/restart ### Parameters #### Query Parameters - **dlqIds** (array) - Optional - List of specific DLQ IDs to restart. - **cursor** (string) - Optional - Pagination cursor for restarting workflows in batches. - **count** (integer) - Optional - Maximum number of workflows to restart. - **fromDate** (integer) - Optional - Filter workflows by starting date (Unix timestamp in ms). - **toDate** (integer) - Optional - Filter workflows by ending date (Unix timestamp in ms). - **workflowUrl** (string) - Optional - Filter workflows by workflow URL. - **workflowRunId** (string) - Optional - Filter workflows by workflow run ID. - **workflowCreatedAt** (integer) - Optional - Filter workflows by creation timestamp (Unix timestamp in ms). - **label** (string) - Optional - Filter workflows by label. - **failureFunctionState** (string) - Optional - Filter by failure function state (CALLBACK_INPROGRESS, CALLBACK_SUCCESS, CALLBACK_FAIL, CALLBACK_CANCELED). - **callerIp** (string) - Optional - Filter by IP address of the publisher. - **flowControlKey** (string) - Optional - Filter by Flow Control Key. #### Headers - **Upstash-Retries** (integer) - Optional - Override the number of retries for the workflow steps. - **Upstash-Delay** (string) - Optional - Override the delay before executing the workflows (e.g., "10s", "5m"). ### Request Example POST /v2/workflows/dlq/restart?count=10 ### Response #### Success Response (200) - **cursor** (string) - The cursor to use for the next batch of restarts, if applicable. ``` -------------------------------- ### Configure Local Tunnel for QStash Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Configures Upstash Workflow to use a local tunnel for connecting to the production QStash service. This requires your QStash token and the public URL of your local tunnel. ```text QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### POST /auth-provider-webhook Source: https://upstash.com/docs/workflow/examples/authWebhook This endpoint serves as a webhook for authentication providers. It handles the creation of new users, their integration with Stripe, sending welcome emails, and managing trial periods with subsequent notifications. ```APIDOC ## POST /auth-provider-webhook ### Description This endpoint is triggered by an authentication provider to create a new user. It synchronizes user data with the database, sets up the user in Stripe, initiates a free trial, and sends a welcome email. The workflow also includes scheduled tasks to send problem-solving statistics, trial warnings, and a final trial-end notification. ### Method POST ### Endpoint /auth-provider-webhook ### Parameters #### Request Body - **context** (AsyncWorkflowContext[UserCreatedPayload]) - Required - The workflow context containing user creation payload. ### Request Example ```json { "name": "John Doe", "email": "john.doe@example.com" } ``` ### Response #### Success Response (200) - **None** - Indicates the workflow has been successfully initiated. #### Response Example ```json null ``` ### Error Handling - **500 Internal Server Error**: If any of the internal operations (database, Stripe, email) fail. ``` -------------------------------- ### List and filter DLQ messages using Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client/dlq/list This snippet demonstrates how to initialize the Upstash Workflow client and retrieve messages from the Dead Letter Queue. It shows both a basic list operation and an advanced query using pagination and specific filter criteria. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // List all DLQ messages const { messages, cursor } = await client.dlq.list({ filter: dlqFilters }); // List with pagination and filtering const result = await client.dlq.list({ cursor, count: 10, filter: { fromDate: Date.now() - 86400000, // last 24 hours toDate: Date.now(), url: "https://your-endpoint.com", responseStatus: 500 } }); ``` -------------------------------- ### Retrieve Workflow Run Logs (TypeScript) Source: https://upstash.com/docs/workflow/basics/client/logs This snippet demonstrates how to use the `logs` method from the Upstash Workflow client to retrieve workflow run logs. It shows how to initialize the client and call the `logs` method with an empty options object to fetch the first set of logs. The response includes the list of runs and a cursor for pagination. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { runs, cursor } = await client.logs({}) ``` -------------------------------- ### Define Basic Workflow Endpoint in Flask Source: https://upstash.com/docs/workflow/quickstarts/flask This snippet demonstrates how to set up a basic workflow endpoint in a Flask application using Upstash Workflow. It defines two simple steps, '_step1' and '_step2', which are executed sequentially. The 'context.run' method is used to execute each step. ```python from flask import Flask from upstash_workflow.flask import Serve app = Flask(__name__) serve = Serve(app) @serve.route("/api/workflow") def workflow(context) -> None: def _step1() -> None: print("initial step ran") context.run("initial-step", _step1) def _step2() -> None: print("second step ran") context.run("second-step", _step2) ``` -------------------------------- ### Configuring Environment Variables in Workflows Source: https://upstash.com/docs/workflow/basics/serve/advanced Demonstrates how to pass environment variables to the workflow context. These values are accessible within the workflow execution via context.env. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { const env = context.env; }, { env: { QSTASH_URL: "", QSTASH_TOKEN: "", QSTASH_CURRENT_SIGNING_KEY: "", QSTASH_NEXT_SIGNING_KEY: "", } } ); ``` ```python @serve.post( "/api/example", env={ "QSTASH_CURRENT_SIGNING_KEY": os.environ["QSTASH_CURRENT_SIGNING_KEY"], "QSTASH_NEXT_SIGNING_KEY": os.environ["QSTASH_NEXT_SIGNING_KEY"], }, ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### QStash Authentication Source: https://upstash.com/docs/workflow/api-refence/dlq/get-failed-workflow-run This section describes the authentication methods supported by the QStash API. It covers JWT bearer tokens for authorization headers and a specific query parameter for token passing. ```APIDOC ## QStash API Authentication ### Description This API supports authentication using JWT bearer tokens and a query parameter for passing the token. ### Method All methods (GET, POST, PUT, DELETE, etc.) ### Endpoint All QStash endpoints ### Parameters #### Header Parameters - **Authorization** (string) - Required - The JWT token in the format: `Bearer ` #### Query Parameters - **qstash_token** (string) - Required - The QStash authentication token passed as a query parameter. ### Request Example ```json { "example": "Authorization: Bearer " } ``` ### Response #### Success Response (200) - **message** (string) - A success message indicating authentication is valid. #### Response Example ```json { "example": "Authentication successful" } ``` ``` -------------------------------- ### Workflow Context Definition Source: https://upstash.com/docs/workflow/basics/context Defines how to initialize a workflow context using the Upstash Workflow SDK in Next.js or FastAPI. ```APIDOC ## Workflow Context Initialization ### Description This endpoint demonstrates how to initialize the workflow context within a serverless function or API route. ### Method POST ### Endpoint /api/workflow/route.ts (Next.js) or /api/example (FastAPI) ### Parameters #### Request Body - **context** (object) - Required - The workflow context object provided by the serve function. ### Request Example { "payload": "data" } ### Response #### Success Response (200) - **status** (string) - Workflow execution status. ### Response Example { "status": "success" } ``` -------------------------------- ### Configure QStash Request Verification Source: https://upstash.com/docs/workflow/basics/serve/advanced Explicitly define a Receiver to verify that incoming requests originate from QStash. This is essential for security and supporting multiple QStash projects. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ currentSigningKey: "", nextSigningKey: "", }) } ); ``` ```python from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### TypeScript: Explicit Receiver for Request Verification Source: https://upstash.com/docs/workflow/howto/security This TypeScript code demonstrates how to explicitly create and use a `Receiver` object to verify request signatures when environment variables are not feasible. It shows the import of `Receiver` from '@upstash/qstash' and its integration into the `serve` function from '@upstash/workflow/nextjs'. This method allows for programmatic control over the signing keys used for verification. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ currentSigningKey: "", nextSigningKey: "", }), } ); ``` -------------------------------- ### POST client.trigger Source: https://upstash.com/docs/workflow/basics/client/trigger Initiates a new workflow run or multiple runs by providing the endpoint URL and optional configuration parameters. ```APIDOC ## POST client.trigger ### Description Starts a new workflow run and returns its unique workflowRunId. Supports single or batch triggers. ### Method POST ### Endpoint client.trigger ### Parameters #### Request Body - **url** (string) - Required - The public URL of the workflow endpoint. - **workflowRunId** (string) - Optional - A custom identifier for the workflow run (prefixed with wfr_). - **body** (string | object) - Optional - The request payload to pass into the workflow run. - **headers** (object) - Optional - HTTP headers to pass into the workflow run. - **retries** (string) - Optional - Number of retry attempts for workflow steps. - **retryDelay** (string) - Optional - Delay between retries using expressions. - **flowControl** (object) - Optional - Configuration to limit concurrency and execution rate. - **delay** (string) - Optional - Delay for the workflow run (e.g., "10s", "1h"). - **notBefore** (number) - Optional - Unix timestamp in seconds to delay delivery. - **label** (string) - Optional - Label for identifying and filtering runs. - **disableTelemetry** (boolean) - Optional - Disable telemetry data collection. ### Request Example { "url": "https://example.com/workflow", "body": "hello there!", "workflowRunId": "my-workflow", "retries": 3 } ### Response #### Success Response (200) - **workflowRunId** (string) - The unique identifier for the triggered workflow run. #### Response Example { "workflowRunId": "wfr_my-workflow" } ``` -------------------------------- ### Execute Tools as Workflow Steps Source: https://upstash.com/docs/workflow/agents/features Configures a WorkflowTool to execute as a step, allowing the use of context-aware methods like context.call within the tool's invoke function. ```typescript import { WorkflowTool } from '@upstash/workflow' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { const tool = new WorkflowTool({ description: "...", schema: "...", invoke: async (args) => { await context.call( ... ) }, executeAsStep: false }) }) ``` -------------------------------- ### Enabling Verbose Logging Source: https://upstash.com/docs/workflow/basics/serve/advanced Shows how to enable verbose mode to output detailed workflow execution logs to stdout. This is useful for debugging and is disabled by default. ```typescript export const { POST } = serve( async (context) => { ... }, { verbose: true } ); ``` -------------------------------- ### Create SvelteKit Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/svelte Define a workflow endpoint in your SvelteKit application by creating a +server.ts file within the routes/api/workflow directory. This file uses the @upstash/workflow/svelte serve function to define and export the workflow, including initial and second steps. ```typescript import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ``` -------------------------------- ### Pin Flow Control Key Configuration (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/flow-control/pin-configuration-for-flow-control-key This OpenAPI specification defines the endpoint for pinning a processing configuration for a specific flow-control key. It allows setting parallelism, rate, and period parameters. Pinning overrides message-based configurations and resets the key's state. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/flowControl/{flowControlKey}/pin: post: tags: - Flow Control summary: Pin Configuration for Flow Control Key description: Pins a processing configuration for a specific flow-control key. parameters: - name: flowControlKey in: path required: true schema: type: string description: The flow-control key for which the configuration will be pinned. - name: parallelism in: query schema: type: integer description: The parallelism value to apply to the flow-control key. - name: rate in: query schema: type: integer description: The rate value to apply to the flow-control key. - name: period in: query schema: type: integer description: The period value to apply to the flow-control key, in seconds. responses: '200': description: The flow-control key configuration has been pinned. '400': description: >- Bad request. Returned when the flow-control key is not provided, or when the parallelism, rate or period values are invalid. content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Schedule a Workflow via QStash Dashboard Source: https://upstash.com/docs/workflow/howto/schedule This section describes how to schedule a workflow using the Upstash QStash dashboard. Users can input their live endpoint URL and a CRON expression to define the execution interval. ```APIDOC ## Schedule a Workflow via QStash Dashboard ### Description This process involves using the Upstash QStash dashboard to set up recurring executions for your API endpoints (workflows). You will provide the endpoint URL and a CRON expression to define the schedule. ### Steps 1. Navigate to `Schedules` in your QStash dashboard. 2. Click `Create Schedule`. 3. Enter your live endpoint URL. 4. Add a CRON expression to define the desired interval (e.g., daily, every 15 minutes). 5. Click `Schedule`. Your workflow will now run repeatedly at the defined interval. ``` -------------------------------- ### Create Workflow-Compatible OpenAI Client Source: https://upstash.com/docs/workflow/integrations/aisdk Implements a custom fetch function for the Vercel AI SDK that uses Upstash Workflow's context.call. This ensures that HTTP requests to LLM providers are managed durably by the workflow engine. ```typescript import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; const body = init?.body ? JSON.parse(init.body as string) : undefined; const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record) ); return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; } } }, }); }; ``` -------------------------------- ### Configure Built-in Verification with Environment Variables Source: https://upstash.com/docs/workflow/howto/security This snippet shows how to set environment variables for Upstash Workflow's built-in request verification. These variables, QSTASH_CURRENT_SIGNING_KEY and QSTASH_NEXT_SIGNING_KEY, are essential for the SDK to validate incoming request signatures, ensuring only authorized requests are processed. Ensure these keys are kept secure and are retrieved from your Upstash Workflow dashboard. ```bash QSTASH_CURRENT_SIGNING_KEY=xxxxxxxxx QSTASH_NEXT_SIGNING_KEY=xxxxxxxxx ``` -------------------------------- ### Generate Text with OpenAI Client in Upstash Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk This snippet demonstrates how to create an OpenAI client within an Upstash Workflow and use it to generate text based on a provided prompt. It includes basic error handling for workflow aborts. The function expects a 'prompt' string in the request payload and returns the generated text. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` -------------------------------- ### Move Configuration from serve to trigger Source: https://upstash.com/docs/workflow/howto/migrations Move workflow configuration options like retries, retryDelay, and flowControl from the serve function to the client.trigger method. ```typescript export const { POST } = serve(async (context) => { ... }); await client.trigger({ url: "...", retries: 3, retryDelay: "1000 * (1 + retried)", flowControl: { key: "my-key", rate: 10 } }); ``` -------------------------------- ### Handle Multiple Webhook Progress Updates Source: https://upstash.com/docs/workflow/features/webhooks Shows how to loop through multiple webhook calls to process ongoing progress updates from an external service until a final signal is received. ```typescript while (true) { const webhookResponse = await context.waitForWebhook( `wait for progress update ${stepCount}`, webhook, "5m" ); if (webhookResponse.timeout) { console.log("No progress update received in time, exiting"); break; } else { const request = webhookResponse.request; console.log("Progress update received:", await request.json()); if (request.headers.get("x-task-finished") === "true") { console.log("Task finished, exiting loop"); break; } } } ``` -------------------------------- ### Serve API Endpoint with External Call (TypeScript) Source: https://upstash.com/docs/workflow/basics/context/call This snippet shows how to define a serverless API endpoint using Upstash Workflow's `serve` function in a Next.js environment. It demonstrates making a POST request to an external service ('sync-user-data') with a JSON payload and custom headers, including an API key from environment variables. The function expects a `topic` in its request payload. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const { userId, name } = context.requestPayload; const { status, headers, body } = await context.call("sync-user-data", { url: "https://my-third-party-app", // Endpoint URL method: "POST", body: JSON.stringify({ userId, name }), headers: { authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); }); ``` -------------------------------- ### Send Trial Warning Email Source: https://upstash.com/docs/workflow/examples/authWebhook Monitors trial status and sends a warning email if the user has not upgraded their plan 2 days before the trial ends. ```typescript await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); ``` ```python await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) ``` -------------------------------- ### POST /v2/workflows/dlq/resume/{dlqId} Source: https://upstash.com/docs/workflow/api-refence/dlq/resume-workflow-from-dlq Resumes a workflow that has failed and moved to the Dead Letter Queue (DLQ). ```APIDOC ## POST /v2/workflows/dlq/resume/{dlqId} ### Description Resumes a workflow run that is currently in the Dead Letter Queue. This action generates a new workflow run ID. ### Method POST ### Endpoint /v2/workflows/dlq/resume/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the workflow run to resume. #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries. - **Upstash-Delay** (string) - Optional - Override the delay before executing the next step. - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration. - **Upstash-Label** (string) - Optional - Override the label for remaining steps. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL. ### Request Example POST /v2/workflows/dlq/resume/dlq_12345 Authorization: Bearer ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the resumed workflow run. - **workflowCreatedAt** (integer) - The timestamp when the resumed workflow run was created. #### Response Example { "workflowRunId": "wr_abc123", "workflowCreatedAt": 1715678900000 } ``` -------------------------------- ### Resume Workflow from DLQ (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/dlq/resume-workflow-from-dlq This OpenAPI specification defines the endpoint for resuming a workflow run from the Dead Letter Queue (DLQ). It details the POST request to `/v2/workflows/dlq/resume/{dlqId}`, including path parameters, optional headers for retries and delays, and possible responses (success, bad request, unauthorized, not found, internal server error). ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/workflows/dlq/resume/{dlqId}: post: tags: - DLQ summary: Resume Workflow from DLQ parameters: - in: path name: dlqId required: true schema: type: string description: The DLQ ID of the workflow run to resume. - in: header name: Upstash-Retries required: false schema: type: integer description: Override the number of retries for the remaining workflow steps. - in: header name: Upstash-Delay required: false schema: type: string description: >- Override the delay before executing the next workflow step. Format is `` (e.g., "10s", "5m"). - in: header name: Upstash-Retry-Delay required: false schema: type: string description: >- Override the retry delay expression for the remaining workflow steps. - in: header name: Upstash-Flow-Control-Key required: false schema: type: string description: Override the flow control key for the remaining workflow steps. - in: header name: Upstash-Flow-Control-Value required: false schema: type: string description: >- Override the flow control configuration in the format `parallelism=, rate=, period=`. - in: header name: Upstash-Label required: false schema: type: string description: Override the label for the remaining workflow steps. - in: header name: Upstash-Failure-Callback required: false schema: type: string description: Override the failure callback URL for the remaining workflow steps. responses: '200': description: Workflow resumed successfully content: application/json: schema: type: object properties: workflowRunId: type: string description: >- The ID of the resumed workflow run (a new ID is generated for the resumed run). workflowCreatedAt: type: integer format: int64 description: >- The timestamp when the resumed workflow run was created (Unix timestamp in milliseconds). '400': description: >- Bad Request - Invalid DLQ ID, not a workflow message, or workflow doesn't support resume content: application/json: schema: $ref: '#/components/schemas/Error' '401': description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/Error' '404': description: DLQ message not found content: application/json: schema: $ref: '#/components/schemas/Error' '500': description: Internal Server Error content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Generate and Send Reports in Upstash Workflow Source: https://upstash.com/docs/workflow/examples/allInOne This snippet demonstrates how to use the context.run method to execute report generation and delivery tasks. It ensures that both the generation and the notification steps are tracked and managed by the workflow engine. ```typescript const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) ``` ```python async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` -------------------------------- ### Create Realtime API Endpoint Source: https://upstash.com/docs/workflow/howto/realtime/basic Exposes an API route to handle Server-Sent Events (SSE) connections for the Realtime client. ```typescript import { handle } from "@upstash/realtime"; import { realtime } from "@/lib/realtime"; export const GET = handle({ realtime }); ``` -------------------------------- ### Trigger Workflow Run Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start This snippet demonstrates how to trigger a workflow run using the Upstash Workflow SDK. It shows how to initialize the client, specify the workflow endpoint URL, provide a request body, and set the number of retries. It also includes a tip on using environment variables for the base URL to handle both local development and production environments. ```APIDOC ## POST /api/workflow ### Description Triggers a new workflow run. This endpoint is typically called from a server-side action to securely invoke a defined workflow. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters - **retries** (number) - Optional - The number of times to retry the workflow run if it fails. #### Request Body - **body** (any) - Required - The payload to send to the workflow. ### Request Example ```javascript import { Client } from "@upstash/workflow"; const client = Client() const BASE_URL = process.env.NODE_ENV === 'production' ? 'https://yourapp.com' : 'http://localhost:3000' const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, body: "Hello World!", retries: 3 }); ``` ### Response #### Success Response (200) - **workflowRunId** (string) - The unique identifier for the triggered workflow run. #### Response Example ```json { "workflowRunId": "some-unique-run-id" } ``` ``` -------------------------------- ### Access Workflow Context and Define Steps Source: https://upstash.com/docs/workflow/basics/serve Shows how to access request payloads and define sequential workflow steps within the route function using the context object. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // 👇 Access context properties const { userId } = context.requestPayload; // 👇 Define a workflow step await context.run("step-1", async () => {}) } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### Retrieve Waiters by Event ID Source: https://upstash.com/docs/workflow/basics/client/waiters This snippet demonstrates how to initialize the Upstash Workflow client and invoke the getWaiters method. It requires a valid QStash token and a specific eventId to return a list of waiting workflow objects. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ``` -------------------------------- ### Implement Sleep Intervals in Workflows Source: https://upstash.com/docs/workflow/quickstarts/fastapi Shows how to pause workflow execution using sleep_until or sleep methods. This is useful for scheduling tasks or waiting for external events within a workflow. ```python from fastapi import FastAPI import time from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/sleep") async def sleep(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) await context.sleep_until("sleep1", time.time() + 3) async def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = await context.run("step2", _step2) await context.sleep("sleep2", 2) async def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) await context.run("step3", _step3) ``` -------------------------------- ### Define Multi-Agent Workflow in TypeScript Source: https://upstash.com/docs/workflow/agents/features This snippet demonstrates how to initialize a workflow with two specialized agents: a researcher agent equipped with a Wikipedia tool and a mathematician agent equipped with a calculation tool. The workflow uses a manager task to coordinate these agents to answer a complex prompt. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; import * as mathjs from 'mathjs' import { tool } from "ai"; import { z } from "zod"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-4o'); const researcherAgent = agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const mathAgent = agents.agent({ model, name: "mathematician", maxSteps: 2, tools: { calculate: tool({ description: 'A tool for evaluating mathematical expressions. ' + "Example expressions: '1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." + "only call this tool if you need to calculate a mathematical expression." + "when writing an expression, don't use words like 'thousand' or 'million'", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }), }, background: "You are a mathematician agent which can utilize" + "a calculator to compute expressions" }) const task = agents.task({ model, maxSteps: 3, agents: [researcherAgent, mathAgent], prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations", }); const { text } = await task.run(); console.log("result:", text) }) ``` -------------------------------- ### Perform External API Calls in Workflows Source: https://upstash.com/docs/workflow/quickstarts/fastapi Illustrates how to integrate external HTTP requests into a workflow using the context.call method. It handles request payloads and processes responses within workflow steps. ```python from fastapi import FastAPI from typing import Dict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.post("/get-data") async def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.post("/call") async def call(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) response: CallResponse[Dict[str, str]] = await context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) async def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output await context.run("step2", _step2) ``` -------------------------------- ### Configure Workflow Endpoint URL Dynamically Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Demonstrates how to dynamically set the workflow endpoint URL based on the environment (production or local development). This approach helps avoid hardcoding URLs and ensures your application correctly targets the workflow endpoint in different deployment stages. It utilizes environment variables to determine the base URL. ```javascript const BASE_URL = process.env.NODE_ENV === 'production' ? 'https://yourapp.com' : 'http://localhost:3000' const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, body: "Hello World!", retries: 3 }); ``` -------------------------------- ### Trigger Workflow Run with Local Address Source: https://upstash.com/docs/workflow/howto/local-development/development-server Demonstrates how to trigger a workflow run using the Upstash Workflow client. It shows how to specify a local URL for the workflow endpoint during development and includes a pattern for dynamically setting the base URL based on the environment (local vs. production). ```javascript import { Client } from "@upstash/workflow"; const client = Client() const { workflowRunId } = await client.trigger({ url: `http://localhost:3000/api/workflow`, retries: 3 }); ``` ```javascript const BASE_URL = process.env.VERCEL_URL ? `https://${process.env.VERCEL_URL}` : `http://localhost:3000` const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, retries: 3 }); ``` -------------------------------- ### Automated Order Fulfillment Workflow Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This snippet demonstrates a complete order fulfillment pipeline. It orchestrates steps for order creation, stock checking, payment processing, dispatching, and customer notifications. ```typescript import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; await context.run("process-payment", async () => { return await processPayment(orderId) }) await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: payload = context.request_payload user_id = payload["user_id"] items = payload["items"] async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Implement Periodic User Engagement Workflow Source: https://upstash.com/docs/workflow/examples/customerOnboarding This snippet demonstrates an infinite loop workflow that checks user engagement status and sends targeted emails. It uses non-blocking sleep to pause execution for one month between cycles, ensuring minimal resource consumption in serverless environments. ```typescript while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } ``` ```python while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### Implement Workflow Middleware for Realtime Events Source: https://upstash.com/docs/workflow/howto/realtime/basic Creates a custom middleware that automatically emits events to a unique channel based on the workflow run ID whenever a step finishes or the run completes. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; import { realtime } from "./realtime"; export const realtimeMiddleware = new WorkflowMiddleware({ name: "realtime-events", callbacks: { afterExecution: async ({ context, stepName, result }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.stepFinish", { stepName, result, }); }, runCompleted: async ({ context }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.runFinish", {}); }, }, }); ``` -------------------------------- ### Wrap Non-Deterministic Logic in context.run Source: https://upstash.com/docs/workflow/basics/caveats The correct approach to handling non-idempotent operations by wrapping them in 'context.run' to ensure they are tracked and handled correctly during workflow retries. ```typescript const result = await context.run(async () => { await getResultFromDb(entryId) }); if (result.return) { return; } ``` ```python async def _get_result_from_db(): return await get_result_from_db(entry_id) result = await context.run("get-result-from-db", _get_result_from_db) if result.should_return: return ``` -------------------------------- ### Apply Image Filters Source: https://upstash.com/docs/workflow/examples/imageProcessing Applies various filters (grayscale, sepia, contrast) to resized images. This function iterates through resized images and filters, calling context.call for each combination. Promises are collected and executed in parallel using Promise.all (TypeScript) or a loop (Python). The input is the resized image URL and filter type, outputting processed image results. ```TypeScript const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) ``` ```Python filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] ``` -------------------------------- ### Add Authentication to Workflow Endpoints Source: https://upstash.com/docs/workflow/quickstarts/fastapi Demonstrates how to validate incoming request headers within a workflow endpoint to secure execution. If authentication fails, the workflow terminates early. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ``` -------------------------------- ### Invoke and Await Workflow Execution (TypeScript) Source: https://upstash.com/docs/workflow/features/invoke Demonstrates how to invoke another workflow and wait for its completion using `context.invoke` in TypeScript. It shows how to access the response body, failure status, and cancellation status from the invoked workflow. This method is useful for orchestrating sequential tasks across different workflows without external synchronization. ```typescript const { body, isFailed, isCanceled } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ) ``` -------------------------------- ### Initialize Workflow Endpoint with Retry Loop Source: https://upstash.com/docs/workflow/examples/customRetry Sets up a workflow endpoint with a loop to manage retry attempts for API calls. This structure provides a foundation for executing tasks reliably over multiple attempts. ```typescript export const { POST } = serve<{ userData: string }>(async (context) => { for (let attempt = 0; attempt < 10; attempt++) { // TODO: call API in here } }) ``` ```python @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: for attempt in range(10): # TODO: call API in here ``` -------------------------------- ### POST /api/workflow/human-in-loop Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop Defines the main workflow endpoint that processes initial steps and waits for an external approval event. ```APIDOC ## POST /api/workflow/human-in-loop ### Description Initializes a workflow that performs processing and pauses execution to wait for a human approval event via QStash. ### Method POST ### Endpoint /api/workflow/human-in-loop ### Request Body - **userId** (string) - Required - The ID of the user triggering the workflow. - **action** (string) - Required - The action to be processed. ### Request Example { "userId": "user_123", "action": "approve_document" } ### Response #### Success Response (200) - **success** (boolean) - Indicates if the workflow completed successfully. - **approved** (boolean) - The result of the human approval. - **workflowRunId** (string) - The unique identifier for the workflow run. #### Response Example { "success": true, "approved": true, "workflowRunId": "wfr_abc123" } ``` -------------------------------- ### Return Results from context.run for State Management Source: https://upstash.com/docs/workflow/basics/caveats Shows the correct pattern for passing data between workflow steps. Returning values from context.run ensures that data is persisted and available for subsequent steps, avoiding issues with uninitialized variables during re-invocations. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", async () => { return await someWork(input) }) await context.run("step-2", async () => { someOtherWork(result) }) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return await some_work(input) result = await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` -------------------------------- ### POST /v2/workflows/dlq/callback/{dlqId} Source: https://upstash.com/docs/workflow/api-refence/dlq/retry-failure-callback Manually trigger a retry for a failed workflow failure callback. ```APIDOC ## POST /v2/workflows/dlq/callback/{dlqId} ### Description If the failure callback for a workflow run has failed, you can use this endpoint to manually trigger the failure callback again. This is useful for ensuring that your system is notified of workflow failures even if the original callback attempt did not succeed. ### Method POST ### Endpoint /v2/workflows/dlq/callback/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the failed workflow run whose failure callback should be retried. ### Request Example POST /v2/workflows/dlq/callback/dlq_123456789 ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the workflow run. - **workflowCreatedAt** (integer) - The timestamp when the workflow run was created (Unix timestamp in milliseconds). #### Response Example { "workflowRunId": "run_abc123", "workflowCreatedAt": 1715856000000 } ``` -------------------------------- ### POST /api/workflow Source: https://upstash.com/docs/workflow/howto/realtime/basic This endpoint creates and executes an Upstash Workflow. It defines the steps for validation and action processing, and supports middleware for real-time event emission. ```APIDOC ## POST /api/workflow ### Description This endpoint serves as the definition for your Upstash Workflow. It outlines the sequence of operations, such as data validation and action processing, and integrates with middleware to emit real-time events during execution. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **userId** (string) - Required - The identifier for the user initiating the workflow. - **action** (string) - Required - The specific action the workflow should perform. ### Request Example ```json { "userId": "user-123", "action": "process-data" } ``` ### Response #### Success Response (200) - **success** (boolean) - Indicates if the workflow execution was initiated successfully. - **workflowRunId** (string) - The unique identifier for this specific workflow run. #### Response Example ```json { "success": true, "workflowRunId": "some-unique-workflow-run-id" } ``` ``` -------------------------------- ### Integrate Realtime Provider in React Source: https://upstash.com/docs/workflow/howto/realtime/basic Wraps the application root with the RealtimeProvider to enable real-time functionality across the frontend. ```tsx "use client"; import { RealtimeProvider } from "@upstash/realtime/client"; export default function RootLayout({ children }: { children: React.ReactNode }) { return ( {children} ); } ``` -------------------------------- ### POST /trigger Workflow Run with Flow Control Source: https://upstash.com/docs/workflow/features/flow-control This endpoint demonstrates how to trigger a workflow run with specific flow control configurations, including parallelism, rate, and period. ```APIDOC ## POST /trigger Workflow Run with Flow Control ### Description This endpoint allows you to initiate a new workflow run while applying flow control mechanisms. Flow control helps manage the execution of steps within a workflow to prevent overwhelming external services or hitting API rate limits. You can define limits for how many steps can run concurrently (parallelism) and how many can start within a given time frame (rate and period). ### Method POST ### Endpoint `/trigger` ### Parameters #### Query Parameters - **url** (string) - Required - The URL of the workflow endpoint to trigger. - **flowControl** (object) - Optional - Configuration for flow control. - **key** (string) - Required - A unique key to identify this flow control configuration. - **parallelism** (number) - Optional - The maximum number of steps that can run concurrently. - **rate** (number) - Optional - The maximum number of steps that can start within the specified period. - **period** (string) - Optional - The time period (e.g., '1m' for 1 minute) for the rate limit. ### Request Example ```typescript const { workflowRunId } = await client.trigger({ url: "https:///", flowControl: { key: "fw_example", parallelism: 7, rate: 3, period: "1m", } }); ``` ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the newly created workflow run. #### Response Example ```json { "workflowRunId": "" } ``` ``` -------------------------------- ### Image Processing Workflow (TypeScript) Source: https://upstash.com/docs/workflow/examples/imageProcessing This TypeScript code defines an image processing workflow using Upstash Workflow within a Next.js environment. It orchestrates image retrieval, resizing, filtering, and storage. The workflow leverages `context.run` for sequential tasks and `context.call` for parallelizable external service calls. Dependencies include `@upstash/workflow/nextjs` and utility functions for image operations. ```typescript import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: JSON.stringify({ imageUrl, width: resolution, }) } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: JSON.stringify({ imageUrl: resizedImage.body.imageUrl, filter, }) } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` -------------------------------- ### Built-in Logging Middleware Source: https://upstash.com/docs/workflow/howto/middlewares Demonstrates how to integrate the built-in logging middleware into your Upstash Workflow application. ```APIDOC ## Using Built-in Logging Middleware ### Description Upstash Workflow provides a built-in logging middleware that outputs detailed execution logs to your application's console. This includes workflow run status, step execution, and debug messages. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters None #### Request Body (Workflow definition and execution payload) ### Request Example ```json { "workflow": "your_workflow_definition", "payload": {} } ``` ### Response #### Success Response (200) - **result** (any) - The result of the workflow execution. #### Response Example ```json { "result": "Workflow completed successfully" } ``` ``` -------------------------------- ### Expose Multiple Workflows with serveMany Source: https://upstash.com/docs/workflow/features/invoke/serveMany Use serveMany to expose multiple workflow objects under a single catch-all route. This is required when workflows need to invoke each other within the same application context. ```typescript export const { POST } = serveMany( { "workflow-one-route": workflowOne, "workflow-two-route": workflowTwo, } ) ``` -------------------------------- ### Restart Workflow from DLQ (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/dlq/restart-workflow-from-dlq This OpenAPI definition describes the POST endpoint for restarting a workflow from the Dead Letter Queue (DLQ). It specifies the request parameters, including the DLQ ID, and various optional headers to control retry behavior, delays, flow control, and callbacks. The response includes the new workflow run ID and creation timestamp upon success, or error details for failure. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/workflows/dlq/restart/{dlqId}: post: tags: - DLQ summary: Restart Workflow from DLQ description: > Restart a failed workflow run from the DLQ. The workflow will start from the beginning. Unlike resume, which continues from where the workflow failed, restart executes the entire workflow from the first step. A new workflow run ID is generated and all steps will be executed again. parameters: - in: path name: dlqId required: true schema: type: string description: The DLQ ID of the workflow run to restart. - in: header name: Upstash-Retries required: false schema: type: integer description: Override the number of retries for the workflow steps. - in: header name: Upstash-Delay required: false schema: type: string description: >- Override the delay before executing the workflow. Format is `` (e.g., "10s", "5m"). - in: header name: Upstash-Retry-Delay required: false schema: type: string description: Override the retry delay expression for the workflow steps. - in: header name: Upstash-Flow-Control-Key required: false schema: type: string description: Override the flow control key for the workflow. - in: header name: Upstash-Flow-Control-Value required: false schema: type: string description: >- Override the flow control configuration in the format `parallelism=, rate=, period=`. - in: header name: Upstash-Label required: false schema: type: string description: Override the label for the workflow. - in: header name: Upstash-Failure-Callback required: false schema: type: string description: Override the failure callback URL for the workflow. responses: '200': description: Workflow restarted successfully content: application/json: schema: type: object properties: workflowRunId: type: string description: >- The ID of the restarted workflow run (a new ID is generated for the restarted run). workflowCreatedAt: type: integer format: int64 description: >- The timestamp when the restarted workflow run was created (Unix timestamp in milliseconds). '400': description: >- Bad Request - Invalid DLQ ID, not a workflow message, or workflow doesn't support restart content: application/json: schema: $ref: '#/components/schemas/Error' '401': description: Unauthorized content: application/json: schema: $ref: '#/components/schemas/Error' '404': description: DLQ message not found content: application/json: schema: $ref: '#/components/schemas/Error' '500': description: Internal Server Error content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Configuring Retries and Timeouts Source: https://upstash.com/docs/workflow/basics/context/call This snippet shows how to configure automatic retries and set a timeout for a request made with context.call(). It includes setting the number of retries and the delay between them, along with a maximum timeout for the entire operation. This is crucial for building resilient workflows that can handle transient network issues or slow responses. ```javascript const result = await context.call({ url: "https://api.example.com/long-process", method: "POST", retries: 3, retryDelay: "max(1000, pow(2, retried) * 100)", // Exponential backoff with minimum 1s delay timeout: 60 // 60 seconds timeout }); console.log("Final Status:", result.status); ``` -------------------------------- ### Unpin Flow Control Configuration (OpenAPI) Source: https://upstash.com/docs/workflow/api-refence/flow-control/unpin-configuration-for-flow-control-key This OpenAPI specification defines the endpoint for unpinning the flow-control configuration for a specific key. It allows unpinning parallelism and/or rate configurations. The endpoint requires the flow-control key as a path parameter and accepts boolean flags for parallelism and rate unpinning. ```yaml openapi: 3.1.0 info: title: Upstash Workflow REST API description: > Upstash Workflow is a serverless workflow orchestration service built on top of Upstash QStash and Upstash Redis. version: 2.0.0 contact: name: Upstash url: https://upstash.com servers: - url: https://qstash.upstash.io security: - bearerAuth: [] - bearerAuthQuery: [] paths: /v2/flowControl/{flowControlKey}/unpin: post: tags: - Flow Control summary: Unpin Configuration for Flow Control Key description: Removes the pinned configuration for a specific flow-control key. parameters: - name: flowControlKey in: path required: true schema: type: string description: The flow-control key for which the configuration will be unpinned. - name: parallelism in: query schema: type: boolean default: false description: The flag to indicate whether to unpin the parallelism configuration. - name: rate in: query schema: type: boolean default: false description: The flag to indicate whether to unpin the rate configuration. responses: '200': description: The flow-control key configuration is pinned '400': description: Bad request. Returned when the flow-control key is not provided. content: application/json: schema: $ref: '#/components/schemas/Error' components: schemas: Error: type: object required: - error properties: error: type: string description: Error message securitySchemes: bearerAuth: type: http scheme: bearer bearerFormat: JWT description: QStash authentication token bearerAuthQuery: type: apiKey in: query name: qstash_token description: QStash authentication token passed as a query parameter ``` -------------------------------- ### Implement Failure Handling with TypeScript Source: https://upstash.com/docs/workflow/howto/failures This TypeScript code snippet demonstrates how to use the `failureFunction` parameter within the `serve` method to gracefully handle workflow execution failures. It logs the error response and can optionally return a custom string for UI and log visibility. ```typescript export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // Handle error, i.e. log to Sentry console.error("Workflow failed:", failResponse); // You can optionally return a string that will be visible // in the UI (coming soon) and in workflow logs return `Workflow failed with status ${failStatus}: ${failResponse}`; }, } ); ``` -------------------------------- ### Python: Explicit Receiver for Request Verification Source: https://upstash.com/docs/workflow/howto/security This Python code illustrates how to configure a custom `Receiver` for request signature verification within an Upstash Workflow. It utilizes environment variables for the signing keys and integrates the `Receiver` into the `@serve.post` decorator. This approach ensures that requests are validated before the workflow logic is executed, enhancing security. ```python from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Middleware Event Types Source: https://upstash.com/docs/workflow/howto/middlewares Details the available event types for middleware callbacks, including lifecycle and debug events. ```APIDOC ## Middleware Event Types ### Description This section outlines the various event types that can be handled by Upstash Workflow middlewares, categorized into Lifecycle Events and Debug Events. ### Lifecycle Events #### `runStarted` - **Description**: Called when a workflow run begins. - **Parameters**: `context` (workflow context). - **Signature**: `async ({ context }) => { ... }` #### `beforeExecution` - **Description**: Called before each step executes. - **Parameters**: `context` (workflow context), `stepName` (string). - **Signature**: `async ({ context, stepName }) => { ... }` #### `afterExecution` - **Description**: Called after each step completes. - **Parameters**: `context` (workflow context), `stepName` (string), `result` (any). - **Signature**: `async ({ context, stepName, result }) => { ... }` #### `runCompleted` - **Description**: Called when the entire workflow run finishes. - **Parameters**: `context` (workflow context), `result` (any). - **Signature**: `async ({ context, result }) => { ... }` ### Debug Events #### `onError` - **Description**: Called when an error occurs during workflow execution. - **Parameters**: `workflowRunId` (string), `error` (Error). - **Signature**: `async ({ workflowRunId, error }) => { ... }` #### `onWarning` - **Description**: Called when a warning message is generated. - **Parameters**: `workflowRunId` (string), `warning` (any). - **Signature**: `async ({ workflowRunId, warning }) => { ... }` #### `onInfo` - **Description**: Called when an informational message is generated. - **Parameters**: `workflowRunId` (string), `info` (any). - **Signature**: `async ({ workflowRunId, info }) => { ... }` ``` -------------------------------- ### Configure Retry Delay Strategy Source: https://upstash.com/docs/workflow/features/retries Shows how to configure a custom retry delay strategy using a mathematical expression. The 'retryDelay' parameter accepts an expression that calculates the delay in milliseconds based on the number of retries (retried). A constant delay can also be applied. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", retries: 3, retryDelay: "(1 + retried) * 1000" }) ``` -------------------------------- ### Implement Payment Retry Logic Source: https://upstash.com/docs/workflow/examples/paymentRetry Executes a payment attempt loop with a 24-hour delay between retries. If the payment succeeds, the workflow terminates early. ```typescript for (let i = 0; i < 3; i++) { if (!result) { await context.sleep("wait for retry", 24 * 60 * 60); } else { return; } } ``` ```python for i in range(3): if not result: await context.sleep("wait for retry", 24 * 60 * 60) else: return ``` -------------------------------- ### Configure Failure URL Source: https://upstash.com/docs/workflow/howto/failures Configuring a failure URL allows you to receive error notifications even if your primary application is down. ```APIDOC ## POST /workflow/failure-callback ### Description Configures a callback URL to be triggered when a workflow execution fails. This ensures error handling remains functional even if the main application is unreachable. ### Method POST ### Parameters #### Request Body - **failureUrl** (string) - Required - The external URL to receive failure notifications. ### Request Example ```typescript export const { POST } = serve( async (context) => { /* logic */ }, { failureUrl: "https:///workflow-failure" } ); ``` ### Response #### Success Response (200) - **status** (number) - HTTP status of the callback attempt. - **sourceMessageId** (string) - The ID of the message that triggered the failure. - **body** (string) - Base64 encoded body of the failed request. ``` -------------------------------- ### POST /v1/chat/completions Source: https://upstash.com/docs/workflow/integrations/openai This endpoint is used to call the OpenAI Chat Completions API to generate text. It supports various models and message formats. The SDK provides type safety for request and response bodies. ```APIDOC ## POST /v1/chat/completions ### Description This endpoint allows you to interact with the OpenAI Chat Completions API to generate text-based responses. You can specify the model, system and user messages, and the SDK provides type-safe handling for requests and responses. ### Method POST ### Endpoint /v1/chat/completions ### Parameters #### Request Body - **model** (string) - Required - The model to use for generating completions (e.g., "gpt-4o"). - **messages** (array) - Required - An array of message objects, each with a `role` (system, user, assistant) and `content`. ### Request Example ```json { "model": "gpt-4o", "messages": [ { "role": "system", "content": "Assistant says 'hello!'" }, { "role": "user", "content": "User shouts back 'hi!'" } ] } ``` ### Response #### Success Response (200) - **content** (array) - An array of message content objects, where each object contains the generated text. #### Response Example ```json { "content": [ { "text": "Hello there!" } ] } ``` ``` -------------------------------- ### Handle Authorization Errors with Early Return Source: https://upstash.com/docs/workflow/troubleshooting/general Demonstrates how to handle authorization errors in Upstash Workflows by returning early from a function. This pattern is useful when a condition needs to be checked before proceeding with the workflow. The SDK interprets an early return without executing steps as an authentication failure, resulting in a 400 status code and a specific error message. ```typescript export const { POST } = serve(async (context) => { if (someCondition()) => { return; } // rest of the workflow }) ``` ```typescript export const { POST } = serve(async (context) => { const shouldReturn = await context.run("check condition", () => someCondition()) if (shouldReturn) => { return; } // rest of the workflow }) ``` -------------------------------- ### Dynamic Step Configuration Interface Source: https://upstash.com/docs/workflow/examples/dynamicWorkflow Defines the interface for the workflow payload, specifying the version and an array of function names representing the steps to be executed. This allows for flexible and configurable workflow definitions. ```typescript interface WorkflowPayload { version: string functions: FunctionName[] } ``` -------------------------------- ### Resume Failed Workflow Run (TypeScript) Source: https://upstash.com/docs/workflow/features/dlq/resume This snippet demonstrates how to use the Upstash Workflow client to resume a failed workflow run. It requires a workflow token, the DLQ ID of the failed run, and the number of retries to attempt. The resume action preserves all successfully completed steps and their results. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.dlq.resume({ dlqId: "dlq-12345", retries: 3, }); ``` -------------------------------- ### Avoid Nesting Context Methods Source: https://upstash.com/docs/workflow/basics/caveats Demonstrates the incorrect practice of calling workflow context methods like sleep, run, or call inside another context.run block. This pattern is unsupported and should be avoided to ensure correct workflow execution. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { await context.sleep(...) // ❌ INCORRECT await context.run(...) // ❌ INCORRECT await context.call(...) // ❌ INCORRECT }) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: await context.sleep(...) # ❌ INCORRECT await context.run(...) # ❌ INCORRECT await context.call(...) # ❌ INCORRECT await context.run("step-1", _step_1) ``` -------------------------------- ### User Management and Email Notifications Source: https://upstash.com/docs/workflow/examples/authWebhook This section details auxiliary functions for retrieving user statistics, checking plan upgrades, and sending emails, which are utilized within the main workflow. ```APIDOC ## Auxiliary Functions ### GET /users/{userid}/stats #### Description Retrieves the statistics for a given user, including total problems solved and most interested topic. #### Method GET #### Endpoint /users/{userid}/stats #### Parameters ##### Path Parameters - **userid** (string) - Required - The unique identifier of the user. #### Response ##### Success Response (200) - **total_problems_solved** (integer) - The total number of problems solved by the user. - **most_interested_topic** (string) - The topic the user is most interested in. ### POST /check-upgrade #### Description Checks if a user has upgraded their plan. #### Method POST #### Endpoint /check-upgrade #### Parameters ##### Request Body - **email** (string) - Required - The email address of the user. #### Response ##### Success Response (200) - **upgraded** (boolean) - True if the user has upgraded, false otherwise. ### POST /send-email #### Description Sends an email to a specified recipient with given content. #### Method POST #### Endpoint /send-email #### Parameters ##### Request Body - **email** (string) - Required - The recipient's email address. - **content** (string) - Required - The content of the email. #### Response ##### Success Response (200) - **status** (string) - Indicates the success of the email sending operation (e.g., "sent"). ``` -------------------------------- ### POST /notify with Lookback Source: https://upstash.com/docs/workflow/features/notify This endpoint demonstrates how to use the `notify` function with the `workflowRunId` parameter to enable lookback. This ensures that notifications are delivered even if they are sent before the workflow reaches the `waitForEvent` step, preventing race conditions. ```APIDOC ## POST /notify with Lookback ### Description This endpoint enables the lookback functionality for notifications. By providing a `workflowRunId`, notifications sent before a workflow reaches its `waitForEvent` step will be stored and delivered, preventing race conditions and lost events. ### Method POST ### Endpoint `/notify` ### Parameters #### Query Parameters - **eventId** (string) - Required - The ID of the event to notify. - **eventData** (object) - Optional - The data associated with the event. - **workflowRunId** (string) - Required - The ID of the specific workflow run to target. This enables lookback. ### Request Example ```json { "eventId": "payment-verified", "eventData": { "verified": true, "amount": 100 }, "workflowRunId": "" } ``` ### Response #### Success Response (200) - **message** (string) - Indicates the notification was sent successfully. #### Response Example ```json { "message": "Notification sent successfully" } ``` ### Error Handling - **400 Bad Request**: If required parameters are missing or invalid. - **404 Not Found**: If the specified `workflowRunId` does not exist. ``` -------------------------------- ### Verify Workflow Endpoint with cURL Source: https://upstash.com/docs/workflow/quickstarts/express Use this command to test if your production workflow endpoint is correctly configured and accessible. It sends a POST request with a JSON payload to your deployment URL. ```bash curl -X POST /workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Invoking a Workflow using context.invoke Source: https://upstash.com/docs/workflow/basics/context/invoke This snippet demonstrates how to trigger a secondary workflow from within a primary workflow using context.invoke. It shows how to pass a payload, headers, and configuration options such as retries and flow control. ```typescript const { body, isFailed, isCanceled } = await context.invoke( "invoke another workflow", { workflow: anotherWorkflow, body: "test", header: {...}, retries, retryDelay, flowControl, workflowRunId } ); ``` -------------------------------- ### POST /workflow Source: https://upstash.com/docs/workflow/agents/getting-started Triggers a workflow execution by sending a request to the specified workflow endpoint URL. ```APIDOC ## POST /workflow ### Description Triggers a new workflow run by sending a payload to the defined workflow endpoint. This initiates the agentic process where steps, tool invocations, and LLM calls are managed by Upstash. ### Method POST ### Endpoint /workflow ### Parameters #### Request Body - **prompt** (string) - Required - The input prompt for the workflow agent to process. ### Request Example { "prompt": "Explain the future of space exploration" } ### Response #### Success Response (200) - **workflowRunId** (string) - The unique identifier for the triggered workflow run. #### Response Example { "workflowRunId": "wf_1234567890abcdef" } ``` -------------------------------- ### Define Realtime Event Schema for Human-in-the-Loop Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop Extends the Realtime schema to include event types for human-in-the-loop workflows. Specifically, `waitingForInput` signals the workflow is paused for user input, and `inputResolved` indicates that input has been provided, allowing the frontend to update its UI. ```typescript const schema = { workflow: { runFinish: z.object({}), stepFinish: z.object({ stepName: z.string(), result: z.unknown().optional(), }), waitingForInput: z.object({ eventId: z.string(), message: z.string(), }), inputResolved: z.object({ eventId: z.string(), }), }, }; ``` -------------------------------- ### Send Batch Emails via Resend Source: https://upstash.com/docs/workflow/integrations/resend Shows how to send multiple emails in a single request using the batch parameter set to true. This is useful for high-throughput messaging scenarios within your workflow. ```typescript const { status, body } = await context.api.resend.call("Call Resend", { batch: true, token: "", body: [ { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, ], headers: { "content-type": "application/json", }, }); ``` -------------------------------- ### POST /workflows/restart Source: https://upstash.com/docs/workflow/api-refence/dlq/bulk-restart-workflows-from-dlq Restarts a workflow run, allowing for configuration overrides via request headers. ```APIDOC ## POST /workflows/restart ### Description Restarts a specific workflow run. You can override workflow execution parameters such as retry delays, flow control settings, labels, and failure callbacks using headers. ### Method POST ### Endpoint /workflows/restart ### Parameters #### Header Parameters - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key. - **Upstash-Flow-Control-Value** (string) - Optional - Override flow control configuration (parallelism, rate, period). - **Upstash-Label** (string) - Optional - Override the workflow label. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL. ### Request Example { "workflowRunId": "run_12345" } ### Response #### Success Response (200) - **workflowRuns** (array) - List of restarted workflow run details. - **cursor** (string) - Pagination cursor for subsequent requests. #### Response Example { "workflowRuns": [ { "workflowRunId": "new_run_67890", "workflowCreatedAt": 1715678900000 } ], "cursor": "next_page_token" } ``` -------------------------------- ### Create Realtime Middleware for Workflow Events Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop A custom middleware for Upstash Workflow that automatically emits Realtime events. It intercepts workflow execution to send `workflow.waitingForInput` before a wait step, `workflow.inputResolved` after a wait step, and `workflow.stepFinish` for all steps. It also emits `workflow.runFinish` upon completion, centralizing event logic. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; import { realtime } from "./realtime"; export const realtimeMiddleware = new WorkflowMiddleware({ name: "realtime-events", callbacks: { beforeExecution: async ({ context, stepName }) => { const channel = realtime.channel(context.workflowRunId); // Detect wait-for-event steps and emit waitingForInput if (stepName === "wait-for-approval") { await channel.emit("workflow.waitingForInput", { eventId: `approval-${context.workflowRunId}`, message: `Waiting for approval`, }); } }, afterExecution: async ({ context, stepName, result }) => { const channel = realtime.channel(context.workflowRunId); // Emit inputResolved after wait-for-event steps complete if (stepName === "wait-for-approval") { await channel.emit("workflow.inputResolved", { eventId: `approval-${context.workflowRunId}`, }); } // Emit stepFinish for all steps await channel.emit("workflow.stepFinish", { stepName, result, }); }, runCompleted: async ({ context }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.runFinish", {}); }, }, }); ``` -------------------------------- ### Notify with Lookback Support Source: https://upstash.com/docs/workflow/basics/client/notify Shows how to use the workflowRunId parameter to enable lookback functionality. This ensures that the notification is delivered even if it is sent before the workflow reaches the waitForEvent step. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "payment-processed", eventData: { amount: 100, status: "success" }, workflowRunId: "wfr_abc123", }); ``` -------------------------------- ### Trigger Workflow via HTTP Request Source: https://upstash.com/docs/workflow/quickstarts/astro Sends a POST request to the workflow endpoint to initiate a new workflow run. The server responds with a unique workflow run ID. ```bash curl -X POST http://localhost:3000/api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Report Generation and Sending API Source: https://upstash.com/docs/workflow/examples/allInOne This section covers the final steps of generating a comprehensive data report and sending it to the user. It involves running a report generation function and then sending the report. ```APIDOC ## POST /api/workflow/generate-and-send-report ### Description Generates a final data report based on the aggregated results and sends it to the specified user. ### Method POST ### Endpoint /api/workflow/generate-and-send-report ### Parameters #### Request Body - **dataset_id** (string) - Required - The ID of the dataset for which the report is generated. - **user_id** (string) - Required - The ID of the user to whom the report will be sent. ### Request Example ```json { "dataset_id": "your_dataset_id", "user_id": "user123" } ``` ### Response #### Success Response (200) - **report_status** (string) - Status of the report generation and sending process. #### Response Example ```json { "report_status": "sent" } ``` ``` -------------------------------- ### Retry Failure Function using Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client/dlq/callback This snippet demonstrates how to initialize the Upstash Workflow client and invoke the retryFailureFunction method. It requires a valid QStash token and the specific dlqId of the failed execution to re-trigger the failure callback. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // Retry the failure callback for a specific DLQ message const response = await client.dlq.retryFailureFunction({ dlqId: "dlq-12345" // The ID of the DLQ message to retry }); ``` -------------------------------- ### Customizing Request/Response Types Source: https://upstash.com/docs/workflow/integrations/resend Demonstrates how to override the default request and response body types for the Resend API calls when using `context.api.resend.call`. ```APIDOC ## Customizing Request/Response Types ### Description Allows customization of the request and response body types for Resend API calls. ### Method N/A (Code example demonstrates usage) ### Endpoint N/A ### Parameters #### Request Body (Example) - **IsBatch** (boolean) - Set to `true` or `false` to indicate batch mode. - **ResponseBodyType** (object) - Define your custom response body type. - **RequestBodyType** (object) - Define your custom request body type. ### Request Example ```typescript type IsBatch = true; // Set to either true or false type ResponseBodyType = { /* Define your response body type */ }; type RequestBodyType = { /* Define your request body type */ }; const { status, body } = await context.api.resend.call( "Call Resend", { // ... Resend API call configuration } ); ``` ### Response N/A (This snippet focuses on code structure for type customization) ### Response Example N/A ``` -------------------------------- ### POST /trigger (failureUrl Configuration) Source: https://upstash.com/docs/workflow/features/failureFunction/advanced Configuring the failureUrl option when triggering a workflow to handle failures on a separate infrastructure. ```APIDOC ## POST /trigger ### Description Triggers a new workflow run with an optional failureUrl to redirect failure callbacks to an external endpoint. ### Method POST ### Endpoint /trigger ### Parameters #### Request Body - **url** (string) - Required - The workflow endpoint URL. - **failureUrl** (string) - Optional - The external URL to receive failure callbacks. Mutually exclusive with failureFunction. ### Request Example { "url": "https:///workflow", "failureUrl": "https:///workflow-failure" } ### Response #### Success Response (200) - **workflowRunId** (string) - The unique identifier for the triggered workflow run. #### Response Example { "workflowRunId": "wf_123456789" } ``` -------------------------------- ### QStash Bearer Authentication Source: https://upstash.com/docs/workflow/api-refence/notify/notify-event Authenticate with QStash using a JWT token provided in the Authorization header as a Bearer token. ```APIDOC ## Bearer Authentication ### Description Authenticate requests to QStash by providing a JWT token in the `Authorization` header. ### Method Any (e.g., GET, POST, PUT, DELETE) ### Endpoint Any QStash endpoint ### Parameters #### Header Parameters - **Authorization** (string) - Required - The JWT token prefixed with `Bearer `. ### Request Example ```http GET /some/endpoint HTTP/1.1 Host: qstash.com Authorization: Bearer YOUR_JWT_TOKEN ``` ### Response #### Success Response (200) - **Data** (object) - The response payload. #### Response Example ```json { "message": "Success" } ``` ``` -------------------------------- ### Orchestrate AI Data Processing Workflow (TypeScript) Source: https://upstash.com/docs/workflow/examples/allInOne This TypeScript code defines a Next.js API route using Upstash Workflow to orchestrate a data processing pipeline. It downloads a dataset, splits it into chunks, processes each chunk using OpenAI's GPT-4 model, aggregates intermediate results, and finally generates and sends a report. It highlights asynchronous operations, API calls, and workflow step management. ```typescript import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }> ( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ``` -------------------------------- ### POST /v2/workflows/dlq/restart/{dlqId} Source: https://upstash.com/docs/workflow/api-refence/dlq/restart-workflow-from-dlq Restarts a failed workflow run from the Dead Letter Queue (DLQ). This action initiates a complete re-execution of the workflow from its first step, generating a new workflow run ID. ```APIDOC ## POST /v2/workflows/dlq/restart/{dlqId} ### Description Restarts a failed workflow run from the DLQ. The workflow will start from the beginning. Unlike resume, which continues from where the workflow failed, restart executes the entire workflow from the first step. A new workflow run ID is generated and all steps will be executed again. ### Method POST ### Endpoint /v2/workflows/dlq/restart/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the workflow run to restart. #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries for the workflow steps. - **Upstash-Delay** (string) - Optional - Override the delay before executing the workflow. Format is `` (e.g., "10s", "5m"). - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression for the workflow steps. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key for the workflow. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration in the format `parallelism=, rate=, period=`. - **Upstash-Label** (string) - Optional - Override the label for the workflow. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL for the workflow. ### Request Example ```json { "dlqId": "your_dlq_id_here" } ``` ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the restarted workflow run (a new ID is generated for the restarted run). - **workflowCreatedAt** (integer) - The timestamp when the restarted workflow run was created (Unix timestamp in milliseconds). #### Response Example ```json { "workflowRunId": "new_workflow_run_id_12345", "workflowCreatedAt": 1678886400000 } ``` #### Error Responses - **400 Bad Request**: Invalid DLQ ID, not a workflow message, or workflow doesn't support restart. - **401 Unauthorized**: Authentication failed. - **404 Not Found**: DLQ message not found. - **500 Internal Server Error**: An unexpected error occurred on the server. ```