### Create TanStack Start Project Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Use this command to initialize a new TanStack Start project. Ensure Node.js and pnpm are installed. ```bash pnpm create @tanstack/start@latest ``` -------------------------------- ### QStash CLI Output Example Source: https://upstash.com/docs/workflow/howto/local-development/development-server This is an example of the output you will see when the QStash CLI development server starts. It provides essential tokens and URLs for local requests. ```plaintext Upstash QStash development server is runnning at A default user has been created for you to authorize your requests. QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= QSTASH_CURRENT_SIGNING_KEY=sig_7RvLjqfZBvP5KEUimQCE1pvpLuou QSTASH_NEXT_SIGNING_KEY=sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE Sample cURL request: curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" Check out documentation for more details: https://upstash.com/docs/qstash/howto/local-development ``` -------------------------------- ### Start Local QStash Server with pnpm Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Run this command to start a local QStash server for development with pnpm. ```bash pnpx @upstash/qstash-cli dev ``` -------------------------------- ### FastAPI Workflow Setup Source: https://upstash.com/docs/workflow/examples/authWebhook Set up an asynchronous workflow in FastAPI using Python. This example demonstrates the basic structure for serving workflows, defining payload types, and initiating asynchronous tasks. Ensure FastAPI and Upstash Workflow libraries are installed. ```python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: # Implement logic to create a new user in Stripe print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: # Implement logic to start a trial in Stripe print("Starting a trial of 14 days in Stripe for", email) ``` -------------------------------- ### Start TanStack Start Development Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Use pnpm or npm to start the development server for your TanStack Start application. ```bash pnpm dev ``` ```bash npm run dev ``` -------------------------------- ### Client Usage Examples Source: https://upstash.com/docs/workflow/basics/client/logs Examples demonstrating how to use the client to fetch workflow logs with different options. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) // Fetch logs with default options const { runs, cursor } = await client.logs() // Paginate through all logs const allRuns = []; let cursor: string | undefined; do { const result = await client.logs({ cursor }); allRuns.push(...result.runs); cursor = result.cursor; } while (cursor); // Filter by state const { runs: failedRuns } = await client.logs({ filter: { state: "RUN_FAILED", } }) // Filter by label and date range const { runs: filteredRuns } = await client.logs({ filter: { label: "my-workflow", fromDate: new Date("2024-01-01"), toDate: new Date("2024-06-01"), } }) ``` -------------------------------- ### Start Local QStash Server (npm) Source: https://upstash.com/docs/workflow/agents/getting-started Start the local QStash server using npm. This command is for development purposes. ```bash npx @upstash/qstash-cli dev ``` -------------------------------- ### Install Upstash Workflow SDK (bun) Source: https://upstash.com/docs/workflow/quickstarts/astro Install the Upstash Workflow SDK using bun. Ensure you have Node.js and bun installed. ```bash bun add @upstash/workflow ``` -------------------------------- ### Install Project Dependencies Source: https://upstash.com/docs/workflow/sdk/workflow-py Install project dependencies using Poetry. This command also sets up the development environment. ```bash poetry install ``` -------------------------------- ### Clone Next.js & FastAPI Example Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Clone the example project from GitHub to begin. This sets up the basic structure for your application. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-fastapi ``` -------------------------------- ### Clone Next.js & Flask Example Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Clone the example project to start building your Upstash Workflow application with Next.js and Flask. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-flask ``` -------------------------------- ### Create Virtual Environment and Install Dependencies Source: https://upstash.com/docs/workflow/quickstarts/flask Sets up a Python virtual environment and installs necessary packages like FastAPI, Uvicorn, and the Upstash Workflow SDK. ```bash python -m venv venv source venv/bin/activate ``` ```bash pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Start Local QStash Server (pnpm) Source: https://upstash.com/docs/workflow/agents/getting-started Start the local QStash server using pnpm. This command is for development purposes. ```bash pnpm dlx @upstash/qstash-cli dev ``` -------------------------------- ### Install Project Dependencies with Bun Source: https://upstash.com/docs/workflow/sdk/workflow-js Install project dependencies using Bun. Ensure Bun is installed on your system before running this command. ```bash bun install ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Install the Upstash Workflow SDK using your preferred package manager. This makes the SDK available in your TanStack Start application. ```bash pnpm install @upstash/workflow ``` ```bash npm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Install Dependencies with Bun Source: https://upstash.com/docs/workflow/integrations/aisdk Install the necessary packages for Upstash Workflow and Vercel AI SDK using Bun. ```bash bun install @ai-sdk/openai ai zod ``` -------------------------------- ### Example ngrok HTTP Tunnel Command Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel An example of starting an ngrok tunnel for a server running on port 3000. This command exposes your local server to the internet. ```bash ngrok http 3000 ``` -------------------------------- ### Install Upstash Workflow SDK (npm) Source: https://upstash.com/docs/workflow/quickstarts/astro Install the Upstash Workflow SDK using npm. Ensure you have Node.js and npm installed. ```bash npm install @upstash/workflow ``` -------------------------------- ### Example ngrok Output Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel This is an example of the output you will see after starting an ngrok tunnel. The 'Forwarding' line provides the public URL that can be used with Upstash Workflow. ```plaintext Session Status online Account (Plan: Free) Version 3.1.0 Region Europe (eu) Latency - Web Interface http://127.0.0.1:4040 Forwarding https://e02f-2a02-810d-af40-5284-b139-58cc-89df-b740.eu.ngrok.io -> http://localhost:3000 Connections ttl opn rt1 rt5 p50 p90 0 0 0.00 0.00 0.00 0.00 ``` -------------------------------- ### Install Upstash Workflow Packages Source: https://upstash.com/docs/workflow/agents/getting-started Install the necessary Upstash Workflow and AI packages using npm. ```bash npm i @upstash/workflow @upstash/workflow-agents ai zod ``` -------------------------------- ### Install Upstash Workflow SDK (pnpm) Source: https://upstash.com/docs/workflow/quickstarts/astro Install the Upstash Workflow SDK using pnpm. Ensure you have Node.js and pnpm installed. ```bash pnpm install @upstash/workflow ``` -------------------------------- ### Initialize Vercel Project Deployment Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Run this command in your terminal to start the Vercel deployment process for your project. ```bash vercel ``` -------------------------------- ### Multiple Middlewares Example Source: https://upstash.com/docs/workflow/howto/middlewares Demonstrates how to use multiple middlewares together in a workflow. ```APIDOC ## Multiple Middlewares You can use multiple middlewares together. ### Request Example ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { // Your workflow logic }, { middlewares: [ loggingMiddleware, errorTrackingMiddleware, performanceMiddleware ] } ); Middlewares are executed in the order they're provided in the array. ``` -------------------------------- ### Start Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Run a local QStash server for development without relying on managed servers. This command provides credentials needed for environment configuration. ```bash pnpx @upstash/qstash-cli dev ``` ```bash npx @upstash/qstash-cli dev ``` -------------------------------- ### Run Next.js App Source: https://upstash.com/docs/workflow/agents/getting-started Start your Next.js application locally to make it available for endpoint calls. Ensure your environment variables for QSTASH_URL and QSTASH_TOKEN are set. ```bash npm run dev ``` -------------------------------- ### Install Upstash Workflow and Realtime Packages Source: https://upstash.com/docs/workflow/howto/realtime/basic Install the necessary Upstash packages and zod for schema definition using npm. ```bash npm install @upstash/workflow @upstash/realtime @upstash/redis zod ``` -------------------------------- ### Install Dependencies with npm Source: https://upstash.com/docs/workflow/integrations/aisdk Install the necessary packages for Upstash Workflow and Vercel AI SDK using npm. ```bash npm install @ai-sdk/openai ai zod ``` -------------------------------- ### Complete Workflow Example with Parallel Inventory Checks Source: https://upstash.com/docs/workflow/howto/parallel-runs A full workflow example demonstrating parallel inventory checks before proceeding to brew coffee. This snippet requires importing `serve` from `@upstash/workflow/nextjs` and utility functions like `checkInventory`, `brewCoffee`, and `printReceipt`. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` -------------------------------- ### Start ngrok HTTP Tunnel Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel Make your local server publicly available by starting an ngrok HTTP tunnel. Replace `` with the port your local server is running on (e.g., 3000 for a Next.js server). ```bash ngrok http ``` -------------------------------- ### Install Node.js Dependencies Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Install the necessary Node.js packages for the Next.js frontend. This command should be run in the project's root directory. ```bash npm install ``` -------------------------------- ### Example Q&A Output Source: https://upstash.com/docs/workflow/agents/patterns/orchestrator-workers This is an example of the synthesized output generated by the worker agents in response to the prompt. It covers topics in Quantum Mechanics and Relativity. ```markdown ### Quantum Mechanics **Q: What is quantum mechanics?** A: Quantum mechanics is a fundamental theory in physics that describes the behavior of nature at the atomic and subatomic levels. It serves as the foundation for all quantum physics, including quantum chemistry, quantum field theory, quantum technology, and quantum information science. **Q: What are some key principles of quantum mechanics?** A: 1. **Wave-Particle Duality**: Particles exhibit both wave-like and particle-like properties. 2. **Uncertainty Principle**: Certain pairs of physical properties, like position and momentum, cannot be simultaneously measured with arbitrary precision. 3. **Quantum Superposition**: A quantum system can exist in multiple states at once until it is measured. 4. **Quantum Entanglement**: Particles become interconnected such that the state of one influences the state of another, regardless of distance. 5. **Quantization**: Energy levels in quantum systems are discrete. 6. **Probability and Wave Functions**: Quantum systems are described by wave functions, which provide probabilities of finding a system in a particular state. 7. **Observer Effect**: Measurement affects the system being observed. ### Relativity **Q: What is the theory of relativity?** A: Developed by Albert Einstein, the theory of relativity encompasses two interrelated theories: special relativity and general relativity. **Q: What is special relativity?** A: Proposed by Einstein in 1905, special relativity addresses the relationship between space and time in the absence of gravity. It is based on two key postulates: the invariance of physical laws in all inertial frames and the constancy of the speed of light in a vacuum. **Q: What is general relativity?** A: Published by Einstein in 1915, general relativity is a geometric theory of gravitation. It describes gravity as a geometric property of space and time, or four-dimensional spacetime, and explains how massive objects cause a distortion in spacetime. These topics challenge classical intuitions and have led to significant advancements in our understanding of the universe and the development of new technologies. ``` -------------------------------- ### Install Dependencies with pnpm Source: https://upstash.com/docs/workflow/integrations/aisdk Install the necessary packages for Upstash Workflow and Vercel AI SDK using pnpm. ```bash pnpm install @ai-sdk/openai ai zod ``` -------------------------------- ### Run FastAPI Server Source: https://upstash.com/docs/workflow/sdk/workflow-py Start the FastAPI development server with hot-reloading enabled. ```bash uvicorn main:app --reload ``` -------------------------------- ### Basic Webhook Endpoint Setup - Python Source: https://upstash.com/docs/workflow/howto/use-webhooks Utilize the `Serve` class from `upstash_workflow.fastapi` to set up a webhook endpoint with FastAPI. The `initial_payload_parser` can be customized. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` -------------------------------- ### Install @upstash/workflow SDK Source: https://upstash.com/docs/workflow/howto/migrations Install the new @upstash/workflow package using npm, pnpm, or bun. If you were only using @upstash/qstash for workflow, you can uninstall it. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Python Workflow Missing Steps Example Source: https://upstash.com/docs/workflow/basics/caveats This Python example shows a workflow that lacks the necessary `context.run` call, leading to an authentication error. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 Problem: No context.run call print("Processing input:", input) # This workflow will fail with "Failed to authenticate Workflow request." ``` -------------------------------- ### Run Project Tests with Bun Source: https://upstash.com/docs/workflow/sdk/workflow-js Execute project tests using Bun. Ensure your environment variables are set up in a `.env` file, using `.env.template` as a guide. ```bash bun run test ``` -------------------------------- ### Run Flask Development Server Source: https://upstash.com/docs/workflow/quickstarts/flask Start the Flask development server, specifying the application file (`main.py`) and the port to run on (e.g., 8000). ```bash flask --app main run -p 8000 ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/express Install the Workflow SDK using npm, pnpm, or bun. This is the first step to integrating Upstash Workflow into your project. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Create TanStack Start Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Define a workflow endpoint as an API route in TanStack Start using the `serve` function from `@upstash/workflow/tanstack`. This example demonstrates a simple workflow with two steps. ```typescript import { createFileRoute } from '@tanstack/react-router' import { serve } from '@upstash/workflow/tanstack' const someWork = (input: string) => { return `processed '${JSON.stringify(input)}'` } export const Route = createFileRoute('/api/workflow')({ server: { handlers: serve(async (context) => { const input = context.requestPayload const result1 = await context.run('step1', () => { const output = someWork(input) console.log('step 1 input', input, 'output', output) return output }) await context.run('step2', () => { const output = someWork(result1) console.log('step 2 input', result1, 'output', output) }) }), }, }) ``` -------------------------------- ### Install FastAPI and Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/fastapi Install the required Python packages for FastAPI and Upstash Workflow using pip. ```bash pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Serve Workflow with FastAPI Source: https://upstash.com/docs/workflow/basics/context Example of how to serve an Upstash Workflow using the `Serve` class in a FastAPI application. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Agent Response Example Source: https://upstash.com/docs/workflow/agents/patterns/prompt-chaining This is an example of the output generated by the chained agents, summarizing the work of famous physicists. ```text Albert Einstein was a German physicist known for his theory of relativity and the famous equation E=mc^2. He made significant contributions to quantum mechanics and was awarded the Nobel Prize in Physics in 1921. Isaac Newton, an English polymath, was a key figure in the Scientific Revolution and the Enlightenment. He is famous for his laws of motion and universal gravitation, as outlined in his book "Philosophiæ Naturalis Principia Mathematica." Marie Curie, a Polish-French physicist and chemist, conducted pioneering research on radioactivity and was the first woman to win a Nobel Prize. She is the only person to win Nobel Prizes in two scientific fields and her work has had a lasting impact on physics and chemistry. ``` -------------------------------- ### Configuring Workflow Options Source: https://upstash.com/docs/workflow/basics/serve The serve() function accepts an optional second argument for configuration options. This example shows how to provide a failureFunction for handling workflow failures. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, // 👇 Workflow options { failureFunction: async ({ ... }) => {} } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### Install Workflow Agent Dependencies Source: https://upstash.com/docs/workflow/agents/features Install necessary packages for defining tools in your workflow agents. Ensure you have Node.js and npm installed. ```bash npm i ai mathjs zod @agentic/ai-sdk @agentic/weather @langchain/core @langchain/community ``` -------------------------------- ### Agent Response Example Source: https://upstash.com/docs/workflow/agents/patterns/parallelization This is an example of the aggregated response from the parallel agents, summarizing explanations on quantum physics, relativity, and string theory. ```text Quantum physics explores the behavior of very small particles, such as atoms and subatomic particles, in the strange world of quantum mechanics, where particles can exist in multiple states simultaneously. Key principles include superposition and entanglement, leading to technological advancements like quantum computers. Relativity, developed by Albert Einstein, consists of special relativity and general relativity, explaining the behavior of objects moving at high speeds and the warping of spacetime by massive objects. String theory proposes that the universe's fundamental building blocks are tiny vibrating strings, aiming to unify the four fundamental forces of nature and suggest extra dimensions beyond the familiar ones. ``` -------------------------------- ### Configure Environment Variables for QStash Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Set environment variables in a .env file to connect your TanStack Start app to the local QStash server. Use the credentials printed when starting the local server. ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r" QSTASH_NEXT_SIGNING_KEY="sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs" ``` -------------------------------- ### Install @upstash/workflow-agents package Source: https://upstash.com/docs/workflow/howto/migrations Install the new package for the Agents API. This is required for migrating to the separate package structure. ```bash npm install @upstash/workflow-agents ``` -------------------------------- ### TypeScript Workflow Missing Steps Example Source: https://upstash.com/docs/workflow/basics/caveats This TypeScript example demonstrates a workflow missing the required `context.run` call, which will result in an authentication failure. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: No context.run call console.log("Processing input:", input) // This workflow will fail with "Failed to authenticate Workflow request." }) ``` -------------------------------- ### Error Tracking Middleware Example Source: https://upstash.com/docs/workflow/howto/middlewares An example of a middleware that sends errors to an external monitoring service. ```APIDOC ## Error Tracking Middleware Send errors to an external monitoring service. ### Request Example ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const errorTrackingMiddleware = new WorkflowMiddleware({ name: "error-tracking", callbacks: { onError: async ({ workflowRunId, error }) => { await fetch("https://your-monitoring-service.com/errors", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ workflowRunId, error: error.message, stack: error.stack, timestamp: new Date().toISOString() }) }); } } }); ``` ``` -------------------------------- ### Serve Workflow with Next.js Source: https://upstash.com/docs/workflow/basics/context Example of how to serve an Upstash Workflow using the `serve` function in a Next.js application. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( // \భ the workflow context async (context) => { // ... } ); ``` -------------------------------- ### Install Upstash Workflow Dependencies Source: https://upstash.com/docs/workflow/sdk/workflow-py Install the necessary Python packages for FastAPI and Upstash Workflow. Ensure you have a virtual environment activated. ```bash python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Install Upstash Workflow SDK with npm Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Install the Workflow SDK using npm. This is the first step in integrating Upstash Workflow into your worker project. ```bash npm install @upstash/workflow ``` -------------------------------- ### Send HTTP Request to Start Workflow Source: https://upstash.com/docs/workflow/howto/start Start a workflow by sending an HTTP POST request to the workflow endpoint. This method is suitable for quick testing but not recommended for production, especially if endpoint security is enabled. ```bash curl -X POST https:/// \ -H "my-header: foo" \ -d '{"foo": "bar"}' ``` -------------------------------- ### Basic Webhook Endpoint Setup - TypeScript Source: https://upstash.com/docs/workflow/howto/use-webhooks Use the `serve` function from `@upstash/workflow/nextjs` to create a basic webhook endpoint. This function handles incoming requests and allows for custom logic. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` -------------------------------- ### Next.js Workflow for User Onboarding Source: https://upstash.com/docs/workflow/examples/authWebhook Implement a user onboarding workflow in Next.js using TypeScript. This example handles user creation, Stripe integration, email notifications, and trial period management. Ensure necessary imports are present. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10_000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` -------------------------------- ### Start Trial in Stripe (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Initiates a trial in Stripe for a given email address using Python. This asynchronous function requires a compatible execution context. ```python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` -------------------------------- ### Start Local Workflow Worker Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Run this command to start your worker locally. It prints a local URL for your workflow endpoint, typically http://localhost:8787. ```bash npm run wrangler dev ``` -------------------------------- ### Navigate to Project Directory Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Change into the newly created project directory. Replace 'your-project-name' with your actual project name. ```bash cd your-project-name ``` -------------------------------- ### Return Results from context.run for Later Use (Correct Python) Source: https://upstash.com/docs/workflow/basics/caveats This Python example correctly handles step results by returning the value from `context.run`. This ensures the result is available for subsequent steps. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return await some_work(input) result = await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` -------------------------------- ### Initialize and Use Upstash Workflow Client Source: https://upstash.com/docs/workflow/sdk/workflow-js Initialize the client with your QSTASH_TOKEN to interact with Upstash Workflows. Use this client to trigger, cancel, notify, and get waiters for your workflows. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // trigger a workflow const { workflowRunId } = await client.trigger({ url: "https://workflow-endpoint.com", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); // cancel workflow: await client.cancel({ workflowRunId: "" }); // notify workflows: await client.notify({ eventId: "my-event-id", eventData: "my-data", // data passed to the workflow run workflowRunId: "wfr_123", // optional workflow run ID for lookback }); // get waiters: const result = await client.getWaiters({ eventId: "my-event-id", }); ``` -------------------------------- ### Basic Usage of context.waitForWebhook Source: https://upstash.com/docs/workflow/basics/context/waitForWebhook This example demonstrates how to create a webhook, wait for it to be called with a timeout, and then process the response. It logs a message indicating success or timeout and prints the request body and headers if the webhook was called. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { // Create webhook const webhook = await context.createWebhook("create webhook"); // Wait for webhook to be called with 30 second timeout const webhookResponse = await context.waitForWebhook( "wait for webhook", webhook, "30s" ); if (webhookResponse.timeout) { console.log("Webhook was not called within the timeout period"); } else { console.log("Webhook was called successfully"); console.log("Request body:", webhookResponse.request.body); console.log("Request headers:", webhookResponse.request.headers); } }); ``` -------------------------------- ### Start Trial in Stripe (TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook Initiates a trial in Stripe for a given email address. This function should be used within a context that supports asynchronous operations. ```typescript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` -------------------------------- ### Update Workflow and QStash SDKs Source: https://upstash.com/docs/workflow/howto/multi-region Ensure you have the minimum required versions of the Upstash Workflow and QStash SDKs installed for migration support. Update using npm. ```bash npm install @upstash/workflow@latest @upstash/qstash@latest ``` -------------------------------- ### Return Results from context.run for Later Use (Correct TypeScript) Source: https://upstash.com/docs/workflow/basics/caveats This example demonstrates the correct way to handle step results by returning the value from `context.run`. This ensures the result is available for subsequent steps. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", async () => { return await someWork(input) }) await context.run("step-2", async () => { someOtherWork(result) }) }) ``` -------------------------------- ### Create .env file Source: https://upstash.com/docs/workflow/quickstarts/astro Create a .env file in your project root to store environment variables, such as your QStash token. ```bash touch .env ``` -------------------------------- ### Run FastAPI Application Source: https://upstash.com/docs/workflow/quickstarts/fastapi This command starts the FastAPI application with hot-reloading enabled. It's used to serve the workflow endpoints defined in your application. ```bash uvicorn main:app --reload ``` -------------------------------- ### Retrieve Waiters by Event ID Source: https://upstash.com/docs/workflow/basics/client/waiters Use this method to get all workflows waiting for a specific event. Ensure you have your QSTASH_TOKEN configured. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ``` -------------------------------- ### Implement Authentication for Workflow Endpoint in FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi This example shows how to secure a workflow endpoint by checking for a specific authentication header. If the header is missing or incorrect, the workflow execution is halted. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ``` -------------------------------- ### Basic Webhook Usage in Upstash Workflow Source: https://upstash.com/docs/workflow/features/webhooks Demonstrates the basic workflow for using webhooks: creating a webhook, calling an external service with the webhook URL, and waiting for the webhook to be called or timeout. Requires setup with @upstash/workflow/nextjs. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { // Step 1: Create webhook const webhook = await context.createWebhook("create webhook"); // Step 2: Call an external endpoint, which calls the webhookUrl upon completion const callResult = await context.call("call webhook caller", { url: "https://webhook/caller", method: "POST", body: JSON.stringify({ webhookUrl: webhook.webhookUrl, }), }); // Step 3: Wait for the webhook to be called const webhookResponse = await context.waitForWebhook( "wait for webhook", webhook, "30s" // timeout ); if (webhookResponse.timeout) { console.log("Webhook was not called in time"); // Handle timeout scenario } else { console.log("Webhook received:", webhookResponse.request); // Process the webhook data } }); ``` -------------------------------- ### Configure Environment Variables Source: https://upstash.com/docs/workflow/agents/getting-started Add QStash URL, Token, and OpenAI API Key to your .env.local file. Ensure you replace placeholders with your actual credentials. ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" OPENAI_API_KEY= ``` -------------------------------- ### Run Hono Development Server Source: https://upstash.com/docs/workflow/quickstarts/hono Start your Hono application locally using the npm run dev command. This command will typically output a local URL for your workflow endpoint. ```bash npm run dev ``` -------------------------------- ### Get Signing Keys Source: https://upstash.com/docs/workflow/api-reference/signing-keys/get-signing-keys Retrieve your current and next signing keys for Upstash Workflow. ```APIDOC ## GET /v2/keys ### Description Retrieve your current and next signing keys. ### Method GET ### Endpoint /v2/keys ### Parameters #### Query Parameters - **qstash_token** (string) - Required - QStash authentication token passed as a query parameter. ### Request Example (No request body for GET requests) ### Response #### Success Response (200) - **current** (string) - The current signing key. - **next** (string) - The next signing key. #### Response Example ```json { "current": "your_current_signing_key", "next": "your_next_signing_key" } ``` ``` -------------------------------- ### Implementing Tools with context.run() in Upstash Source: https://upstash.com/docs/workflow/integrations/aisdk Wrap each tool's `execute` function within `context.run()` and provide a descriptive name for tracking. This example shows a basic tool implementation for fetching weather data. ```typescript execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) ``` -------------------------------- ### Build Project with Bun Source: https://upstash.com/docs/workflow/sdk/workflow-js Build the project using Bun. This command compiles the project's code. ```bash bun run build ``` -------------------------------- ### Initialize Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client Initialize a new client instance with your Upstash credentials. Ensure that QSTASH_URL and QSTASH_TOKEN environment variables are set. ```javascript import { Client } from "@upstash/workflow" const client = new Client({ baseUrl: process.env.QSTASH_URL!, token: process.env.QSTASH_TOKEN! }) ``` -------------------------------- ### Get User Stats Source: https://upstash.com/docs/workflow/examples/authWebhook Retrieves user statistics. This function is a placeholder and should be implemented with actual logic. ```python async def get_user_stats(userid: str) -> UserStats: # Implement logic to get user stats print("Getting user stats for", userid) return {"total_problems_solved": 10000, "most_interested_topic": "Python"} ``` -------------------------------- ### TypeScript Workflow with Next.js Source: https://upstash.com/docs/workflow/examples/allInOne Implement a serverless workflow using Upstash Workflow with Next.js. This example demonstrates downloading data, processing it in chunks with OpenAI, and generating/sending a report. Ensure OPENAI_API_KEY is set in your environment. ```typescript import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }>( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ``` -------------------------------- ### Implement Workflow with Sleep and Sequential Steps in FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi This example demonstrates a workflow with multiple steps, including timed sleeps. It processes input, sleeps for a duration, processes the result, sleeps again, and then performs a final processing step. ```python from fastapi import FastAPI import time from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/sleep") async def sleep(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) await context.sleep_until("sleep1", time.time() + 3) async def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = await context.run("step2", _step2) await context.sleep("sleep2", 2) async def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) await context.run("step3", _step3) ``` -------------------------------- ### client.trigger Source: https://upstash.com/docs/workflow/basics/client/trigger Starts a new workflow run or multiple workflow runs. Returns the `workflowRunId` for each triggered run. ```APIDOC ## POST /client/trigger ### Description Starts a new workflow run and returns its `workflowRunId`. Can also trigger multiple workflow runs in a single call by passing an array of arguments. ### Method POST ### Endpoint /client/trigger ### Parameters #### Request Body - **url** (string) - Required - The public URL of the workflow endpoint. - **workflowRunId** (string) - Optional - A custom identifier for the workflow run. If omitted, a run ID will be generated automatically. The final ID will be prefixed with `wfr_`. - **body** (string | object) - Optional - The request payload to pass into the workflow run. Accessible as `context.requestPayload` inside the workflow. - **headers** (object) - Optional - HTTP headers to pass into the workflow run. Accessible as `context.headers` inside the workflow. - **retries** (string) - Optional - Number of retry attempts for workflow steps. Default is 3. - **retryDelay** (string) - Optional - Delay between retries. Can use expressions like "1000 * (1 + retried)". - **flowControl** (object) - Optional - An optional flow control configuration to limit concurrency and execution rate of the workflow runs. - **key** (string) - Required - A logical grouping key that identifies which requests share the same flow control limits. - **rate** (number) - Required - The maximum number of allowed requests per second. - **parallelism** (number) - Required - The maximum number of concurrent requests allowed. - **period** (string|number) - Optional - The time window used to enforce the defined rate limit. Default is `1s`. - **delay** (string) - Optional - Delay for the workflow run. This is used to delay the execution of the workflow run. The delay is in seconds or can be passed as a string with a time unit (e.g. "1h", "30m", "15s"). - **notBefore** (number) - Optional - Optionally set the absolute delay of this message. This will override the delay option. The message will not delivered until the specified time. Unix timestamp in seconds. - **label** (string) - Optional - An optional label to assign to the workflow run. This can be useful for identifying and filtering runs in the dashboard or logs. - **disableTelemetry** (boolean) - Optional - If set to true, telemetry data collection for this workflow run will be disabled. ### Request Example ```json { "url": "https:///", "body": "hello there!", "headers": {}, "workflowRunId": "my-workflow", "retries": 3, "retryDelay": "1000 * (1 + retried)", "delay": "10s", "flowControl": { "key": "USER_GIVEN_KEY", "rate": 10, "parallelism": 5, "period": "10m" } } ``` ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the triggered workflow run. #### Response Example ```json { "workflowRunId": "wfr_my-workflow" } ``` ``` ```APIDOC ## POST /client/trigger (Multiple) ### Description Triggers multiple workflow runs in a single call by passing an array of arguments. ### Method POST ### Endpoint /client/trigger ### Parameters #### Request Body - **Array of Trigger Arguments** (object[]) - Required - Each object in the array represents a single workflow run trigger and should contain the same fields as the single trigger request (e.g., `url`, `body`, `headers`, etc.). ### Request Example ```json [ { "url": "https:///route1", "body": "payload1" }, { "url": "https:///route2", "body": "payload2" } ] ``` ### Response #### Success Response (200) - **Array of Results** (object[]) - An array of objects, where each object corresponds to a triggered workflow run and contains its `workflowRunId`. #### Response Example ```json [ { "workflowRunId": "wfr_run1" }, { "workflowRunId": "wfr_run2" } ] ``` ``` -------------------------------- ### Cancel Workflow by ID (Usage Example) Source: https://upstash.com/docs/workflow/basics/client/cancel Demonstrates how to cancel a single workflow run or a group of workflow runs using their respective IDs. ```typescript // cancel a single workflow await client.cancel(""); // cancel a set of workflow runs await client.cancel(["", ""]); ``` -------------------------------- ### Trigger Workflow with Configuration Source: https://upstash.com/docs/workflow/howto/configure Pass configuration options such as retries, retry delay, and flow control when starting a workflow run. Note that these configurations do not apply to context.call() and context.invoke() steps. ```typescript import { Client } from "@upstash/workflow"; const client = Client() const { workflowRunId } = await client.trigger({ url: `http://localhost:3000/api/workflow`, retries: 3, retryDelay: "(1 + retries) * 1000", flowControl: { key: "limit-ads", rate: 1, parallelism: 10 } }); ``` -------------------------------- ### Return Results from context.run for Later Use (Incorrect Python) Source: https://upstash.com/docs/workflow/basics/caveats This Python example shows an incorrect way to handle step results. The `result` variable, declared outside the `context.run` scope, might be uninitialized when `step-2` is executed. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload result = None async def _step_1() -> Dict: nonlocal result result = await some_work(input) await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` -------------------------------- ### GET /websites/upstash_workflow Source: https://upstash.com/docs/workflow/api-reference/logs/list-workflow-run-logs Retrieves a list of workflow runs, optionally filtered by caller IP. Supports pagination. ```APIDOC ## GET /websites/upstash_workflow ### Description Filter workflow run by the callerIp that started to workflow run. This endpoint retrieves workflow runs and supports pagination. ### Method GET ### Endpoint /websites/upstash_workflow ### Query Parameters - **callerIp** (string) - Optional - Filters workflow runs by the IP address of the client that triggered them. - **cursor** (string) - Optional - Pagination cursor for the next page of results. ### Response #### Success Response (200) - **cursor** (string) - Pagination cursor for the next page. Empty if no more results. - **runs** (array) - Array of complete workflow runs with all steps and metadata. - **workflowRunId** (string) - The unique identifier for this workflow run. - **workflowUrl** (string) - The URL of the workflow. - **workflowState** (string) - The current state of the workflow run. Possible values: `RUN_STARTED`, `RUN_SUCCESS`, `RUN_FAILED`, `RUN_CANCELED`. - **workflowRunCreatedAt** (integer) - When the workflow run was created (Unix timestamp in milliseconds). - **workflowRunCompletedAt** (integer) - When the workflow run completed (Unix timestamp in milliseconds). - **workflowRunCallerIp** (string) - IP address of the client who triggered the workflow. - **steps** (array) - The workflow steps grouped by execution pattern. - **workflowRunResponse** (string) - The response returned by the workflow run. - **invoker** (object) - Information about the workflow that invoked this one. - **workflowRunId** (string) - The workflow run ID of the invoker. - **workflowUrl** (string) - The workflow URL of the invoker. - **workflowRunCreatedAt** (integer) - The creation timestamp of the invoker workflow run. - **failureFunction** (object) - Information about the failure callback if configured. - **dlqId** (string) - The DLQ ID if the workflow run failed. - **label** (string) - Label assigned to the workflow run. - **flowControlKey** (string) - Flow Control Key assigned to the workflow run. #### Error Response (400) - **error** (string) - Bad Request - Invalid parameters (e.g., invalid cursor, state, or groupBy value) #### Error Response (401) - **error** (string) - Unauthorized #### Error Response (429) - **error** (string) - Too Many Requests - Rate limit exceeded #### Error Response (500) - **error** (string) - Internal Server Error ### Response Example (200) ```json { "cursor": "some_cursor_string", "runs": [ { "workflowRunId": "wr_12345", "workflowUrl": "https://example.com/workflow", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1678886400000, "workflowRunCompletedAt": 1678886460000, "workflowRunCallerIp": "192.168.1.1", "steps": [], "workflowRunResponse": "Success", "invoker": { "workflowRunId": "wr_invoker_67890", "workflowUrl": "https://example.com/invoker", "workflowRunCreatedAt": 1678886300000 }, "failureFunction": null, "dlqId": null, "label": "My Workflow Run", "flowControlKey": "fc_abcde" } ] } ``` ### Response Example (400) ```json { "error": "Invalid cursor provided." } ``` ``` -------------------------------- ### Minimal Workflow Endpoint with Hono Source: https://upstash.com/docs/workflow/quickstarts/hono Define a basic workflow endpoint using Hono and Upstash Workflow. This example shows how to set up two sequential steps within the workflow. ```typescript import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` -------------------------------- ### Define a Workflow Endpoint with FastAPI Source: https://upstash.com/docs/workflow/sdk/workflow-py Define a workflow endpoint using the `@serve.post` decorator from `upstash_workflow.fastapi`. This example demonstrates defining two sequential steps within a workflow. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) # mock function def some_work(input: str) -> str: return f"processed '{input}'" # serve endpoint which expects a string payload: @serve.post("/example") async def example(context: AsyncWorkflowContext[str]) -> None: # get request body: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output # run the first step: result: str = await context.run("step1", _step1) async def _step2() -> None: output = some_work(result) print("step 2 input", result, "output", output) # run the second step: await context.run("step2", _step2) ``` -------------------------------- ### Python Workflow with FastAPI Source: https://upstash.com/docs/workflow/examples/allInOne Implement a serverless workflow using Upstash Workflow with FastAPI. This example mirrors the TypeScript version, demonstrating data downloading, chunked processing with OpenAI, and report generation. Ensure OPENAI_API_KEY is set in your environment. ```python from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( ``` -------------------------------- ### Define a Workflow Endpoint with serve() Source: https://upstash.com/docs/workflow/basics/serve Use the serve() function to define an API endpoint for your workflow. It takes a route function and optional configuration. This example shows basic usage in Next.js and FastAPI. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { // Route function }, { // Options }); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### Lifecycle Event: beforeExecution Callback Source: https://upstash.com/docs/workflow/howto/middlewares Callback function executed before each step in a workflow starts. It receives the workflow context and the name of the step. ```typescript beforeExecution: async ({ context, stepName }) => { // Handle step start } ``` -------------------------------- ### Customer Onboarding Workflow Source: https://upstash.com/docs/workflow/examples/customerOnboarding This workflow registers a new user, sends a welcome email, waits for 3 days, and then periodically checks the user's activity state to send appropriate emails. It requires initial data containing the user's email. ```typescript import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ``` ```python from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### Create Astro Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/astro Define a workflow endpoint in Astro by importing and using the 'serve' function from '@upstash/workflow/astro'. This example demonstrates a minimal workflow with two steps. ```typescript import { serve } from "@upstash/workflow/astro"; export const { POST } = serve(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { // env must be passed in astro. // for local dev, we need import.meta.env. // For deployment, we need process.env: env: { ...process.env, ...import.meta.env } }) ``` -------------------------------- ### List DLQ Messages with Pagination and Filtering Source: https://upstash.com/docs/workflow/basics/client/dlq/list Fetches DLQ messages with specific pagination and filtering options. This example demonstrates how to use the `cursor`, `count`, and `filter` arguments to narrow down results, including filtering by date range, workflow URL, and label. ```typescript // List with pagination and filtering const result = await client.dlq.list({ cursor, count: 10, filter: { fromDate: Date.now() - 86400000, // last 24 hours toDate: Date.now(), workflowUrl: "https://your-endpoint.com", label: "my-workflow", } }); ``` -------------------------------- ### Get Global Parallelism Source: https://upstash.com/docs/workflow/api-reference/flow-control/get-global-parallelism Retrieves the current global parallelism usage across all flow control keys. ```APIDOC ## GET /v2/globalParallelism ### Description Returns the current global parallelism usage across all flow control keys. ### Method GET ### Endpoint /v2/globalParallelism ### Parameters ### Request Body ### Request Example (No request body for GET request) ### Response #### Success Response (200) - **parallelismMax** (integer) - The configured maximum global parallelism - **parallelismCount** (integer) - The current number of messages running globally in parallel #### Response Example ```json { "parallelismMax": 100, "parallelismCount": 50 } ``` ``` -------------------------------- ### Image Processing Workflow in FastAPI Source: https://upstash.com/docs/workflow/examples/imageProcessing Implement an image processing workflow in a FastAPI application using Upstash Workflow. This example mirrors the Next.js version, demonstrating image retrieval, resizing, filtering, and storage. ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) ``` -------------------------------- ### POST /v2/trigger/{workflowUrl} Source: https://upstash.com/docs/workflow/api-reference/runs/trigger-workflow-run This endpoint allows you to start a new workflow run. You can configure various aspects of the run, such as custom run IDs, retries, delays, and callback URLs. ```APIDOC ## POST /v2/trigger/{workflowUrl} ### Description Start a new workflow run. ### Method POST ### Endpoint /v2/trigger/{workflowUrl} ### Parameters #### Path Parameters - **workflowUrl** (string) - Required - The URL of the workflow to trigger. #### Header Parameters - **Upstash-Workflow-RunId** (string) - Optional - Optional custom run ID for the workflow run. A random ID will be generated if not provided. - **Upstash-Forward-*** (string) - Optional - You can send custom headers to your workflow. To send a custom header, prefix the header name with `Upstash-Forward-`. We will strip prefix and send them to the destination. Example: `Upstash-Forward-My-Header: my-value` will be forwarded as `My-Header: my-value`. - **Upstash-Retries** (integer) - Optional - Number of retries for the workflow steps in case of failure. Defaults to 3. - **Upstash-Delay** (string) - Optional - Delay the message delivery. The format is `` where unit is one of: `s` (seconds), `m` (minutes), `h` (hours), `d` (days). Example: `50s`, `1d10h30m`. - **Upstash-Not-Before** (integer) - Optional - Delay the message delivery until a certain timestamp in the future. The format is a unix timestamp in seconds (UTC). Takes precedence over `Upstash-Delay` if both are provided. - **Upstash-Label** (string) - Optional - Optional label to attach to the workflow run for easier identification in logs and DLQ. - **Upstash-Flow-Control-Key** (string) - Optional - Flow control key to manage concurrency for the workflow run. Steps with the same key will respect the same concurrency limit. Requires `Upstash-Flow-Control-Value` header. - **Upstash-Flow-Control-Value** (string) - Optional - Parallelism and rate limit configuration for the flow control key in the format: `parallelism=, rate=, period=`. See flow control documentation for details. - **Upstash-Failure-Callback** (string) - Optional - Failure callback URL to be called if the workflow run fails after all retries. Must be prefixed with `http://` or `https://`. Failure callbacks are charged as a regular message and use the retry settings from the original request. - **Upstash-Failure-Callback-Forward-*** (string) - Optional - Custom headers to be forwarded to the failure callback URL. Similar to `Upstash-Forward-*` headers. ### Request Example ```json { "example": "request body" } ``` ### Response #### Success Response (200) - **runId** (string) - The ID of the triggered workflow run. #### Response Example ```json { "runId": "clt123abc456def789" } ``` ``` -------------------------------- ### Webhook Request Validation and Parsing - Python Source: https://upstash.com/docs/workflow/howto/use-webhooks This Python example demonstrates how to integrate request validation into your webhook handler. The `validate_request` function should contain your specific validation logic. ```python async def validate_request(payload_string: str, header_payload: dict): # Validate the request pass @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: payload_string = context.request_payload header_payload = context.headers try { event = await validate_request(payload_string, header_payload) } catch { return } # Next steps based on the event ``` -------------------------------- ### Trigger Workflow Run with Flow Control Source: https://upstash.com/docs/workflow/features/flow-control Trigger a workflow run and configure flow control limits for rate and parallelism. This example sets parallelism to 7 and a rate of 3 steps per minute. ```typescript const { workflowRunId } = await client.trigger({ url: "https:///", flowControl: { key: "fw_example", parallelism: 7, rate: 3, period: "1m", } }) ``` -------------------------------- ### Trigger Workflow with Upstash Client Source: https://upstash.com/docs/workflow/features/invoke/serveMany This example demonstrates how to trigger a workflow using the Upstash Client. Both `workflowOne` and `workflowTwo` are exposed through `serveMany`, sharing the same parent path. The URL for triggering is constructed using the route names defined in `serveMany`. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ // 👇 URL of workflow one url: "https://your-app/serve-many/workflow-one-route" }) ``` -------------------------------- ### Define a Workflow Endpoint with Next.js Source: https://upstash.com/docs/workflow/sdk/workflow-js Declare workflow endpoints using the `serve` method from `@upstash/workflow/nextjs`. This example demonstrates defining two sequential steps within a workflow, including logging inputs and outputs. ```typescript import { serve } from "@upstash/workflow/nextjs"; // mock function const someWork = (input: string) => { return `processed '${JSON.stringify(input)}'`; }; // serve endpoint which expects a string payload: export const { POST } = serve(async (context) => { // get request body: const input = context.requestPayload; // run the first step: const result1 = await context.run("step1", async () => { const output = someWork(input); console.log("step 1 input", input, "output", output); return output; }); // run the second step: await context.run("step2", async () => { const output = someWork(result1); console.log("step 2 input", result1, "output", output); }); }); ``` -------------------------------- ### Return Results from context.run for Later Use (Incorrect TypeScript) Source: https://upstash.com/docs/workflow/basics/caveats This example shows an incorrect way to handle step results. The `result` variable may be uninitialized when `step-2` is executed because it's declared outside the `context.run` scope. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload let result await context.run("step-1", async () => { result = await someWork(input) }) await context.run("step-2", async () => { await someOtherWork(result) }) }) ``` -------------------------------- ### Expose Multiple Workflows with serveMany() Source: https://upstash.com/docs/workflow/features/invoke/serveMany Use `serveMany()` instead of `serve()` to expose multiple workflows on a single catch-all route. If one workflow invokes another, both must be included in the same `serveMany` definition. This example shows how to define a catch-all route in Next.js. ```typescript export const { POST } = serveMany( { "workflow-one-route": workflowOne, "workflow-two-route": workflowTwo, } ) ``` -------------------------------- ### POST /api/workflow - QStash Client Configuration Source: https://upstash.com/docs/workflow/basics/serve/advanced Explains how to provide a custom QStash client instance, useful when managing multiple QStash projects within the same application. ```APIDOC ## POST /api/workflow ### Description Allows providing a custom QStash client instance. Useful for managing multiple QStash projects within the same application. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **qstashClient** (object) - Optional - A custom QStash client instance. ### Request Example ```json { "qstashClient": "new Client({ token: \"\" })" } ``` ### Response #### Success Response (200) - **(No specific fields mentioned)** #### Response Example ```json { "message": "Workflow scheduled with custom QStash client" } ``` ``` -------------------------------- ### Set Up Environment Variables for QStash and OpenAI Source: https://upstash.com/docs/workflow/agents/features Before defining your model, set the QSTASH_TOKEN and OPENAI_API_KEY environment variables. These are required for authentication and API access. ```bash QSTASH_TOKEN="" OPENAI_API_KEY="" ``` -------------------------------- ### Serve API Requests in FastAPI Source: https://upstash.com/docs/workflow/basics/context/call Integrate Upstash Workflow with FastAPI to create an API endpoint that calls external services. This example demonstrates calling the OpenAI API with a specific model and system message. Ensure `OPENAI_API_KEY` is set in your environment. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class Request: topic: str @serve.post("/api/example") async def example(context: AsyncWorkflowContext[Request]) -> None: request: Request = context.request_payload result = await context.call( "generate-long-essay", url="https://api.openai.com/v1/chat/completions", method="POST", body={ "model": "gpt-4o", "messages": [ { "role": "system", "content": "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, {"role": "user", "content": request["topic"]}, ], }, headers={ "authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", }, ) status, headers, body = result.status, result.headers, result.body ``` -------------------------------- ### Workflow with External API Call Source: https://upstash.com/docs/workflow/quickstarts/flask Define a workflow that makes an external HTTP request using `context.call`. This example demonstrates calling a separate Flask endpoint (`/get-data`) within the workflow. ```python from flask import Flask from typing import Dict from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext, CallResponse app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.route("/get-data", methods=["POST"]) def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.route("/call") def call(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) response: CallResponse[Dict[str, str]] = context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output context.run("step2", _step2) ``` -------------------------------- ### POST /v2/workflows/dlq/restart Source: https://upstash.com/docs/workflow/api-reference/dlq/bulk-restart-workflows-from-dlq Bulk restart multiple failed workflow runs from the DLQ. Each workflow will start from the beginning with a new Run ID. A maximum of 50 workflow runs can be restarted per request. Use the cursor for pagination. ```APIDOC ## POST /v2/workflows/dlq/restart ### Description Restart multiple failed workflow runs from the DLQ. Each workflow will start from the beginning. Unlike resume, which continues from where workflows failed, restart executes the entire workflows from the first step. New workflow run IDs are generated and all steps will be executed again. A maximum of 50 workflow runs can be restarted per request. If more runs are available, a cursor is returned, which can be used in subsequent requests to continue the operation. When no cursor is returned, all entries have been processed. Each restarted workflow run is assigned a new random Run ID. ### Method POST ### Endpoint /v2/workflows/dlq/restart ### Parameters #### Query Parameters - **dlqIds** (array[string]) - Optional - List of specific DLQ IDs to restart. If provided, other filters are ignored. - **cursor** (string) - Optional - Pagination cursor for restarting workflows in batches. - **count** (integer) - Optional - Maximum number of workflows to restart. If not provided, all matching workflows will be restarted. - **fromDate** (integer[int64]) - Optional - Filter workflows by starting date, in milliseconds (Unix timestamp). This is inclusive. - **toDate** (integer[int64]) - Optional - Filter workflows by ending date, in milliseconds (Unix timestamp). This is inclusive. - **workflowUrl** (string) - Optional - Filter workflows by workflow URL. - **workflowRunId** (string) - Optional - Filter workflows by workflow run ID. - **workflowCreatedAt** (integer[int64]) - Optional - Filter workflows by creation timestamp in milliseconds (Unix timestamp). - **label** (string) - Optional - Filter workflows by label assigned by the user. - **failureFunctionState** (string) - Optional - Filter workflows by failure function state. Possible values: CALLBACK_INPROGRESS, CALLBACK_SUCCESS, CALLBACK_FAIL, CALLBACK_CANCELED. - **callerIp** (string) - Optional - Filter workflows by IP address of the publisher. - **flowControlKey** (string) - Optional - Filter workflows by Flow Control Key. #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries for the workflow steps. - **Upstash-Delay** (string) - Optional - Override the delay before executing the workflows. Format is `` (e.g., "10s", "5m"). - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay before executing the workflows. Format is `` (e.g., "10s", "5m"). ### Request Example ```json { "dlqIds": ["dlq-id-1", "dlq-id-2"], "count": 10 } ``` ### Response #### Success Response (200) - **cursor** (string) - Optional - Pagination cursor for the next batch of workflows. - **restartedCount** (integer) - The number of workflows successfully restarted. #### Response Example ```json { "cursor": "next-cursor-value", "restartedCount": 50 } ``` ``` -------------------------------- ### Image Processing Workflow in Next.js Source: https://upstash.com/docs/workflow/examples/imageProcessing Implement an image processing workflow in a Next.js application using Upstash Workflow. This example shows how to retrieve an image, resize it to multiple resolutions, apply filters, and store the processed images. ```typescript import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: JSON.stringify({ imageUrl, width: resolution, }) } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: JSON.stringify({ imageUrl: resizedImage.body.imageUrl, filter, }) } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` -------------------------------- ### Setup Workflow POST Endpoint with Retry Loop Source: https://upstash.com/docs/workflow/examples/customRetry Defines a POST endpoint for a workflow with a loop to attempt API calls up to 10 times. Use this to structure your workflow's main entry point and initial retry mechanism. ```typescript export const { POST } = serve<{ userData: string }>(async (context) => { for (let attempt = 0; attempt < 10; attempt++) { // TODO: call API in here } }) ``` ```python @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: for attempt in range(10): # TODO: call API in here ``` -------------------------------- ### Define and Run a Workflow in FastAPI Source: https://upstash.com/docs/workflow/examples/paymentRetry This Python snippet demonstrates how to define a workflow using `upstash_workflow/fastapi`. It mirrors the functionality of the TypeScript example, handling payment retries, user suspension, and email notifications within a FastAPI application. Use this for server-side workflow definitions in a FastAPI application. ```python from fastapi import FastAPI from typing import TypedDict, Optional from dataclasses import dataclass from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class ChargeResult: invoice_id: str total_cost: float class ChargeUserPayload(TypedDict): email: str async def send_email(email: str, content: str) -> None: # Implement the logic to send an email print("Sending email to", email, "with content:", content) async def check_suspension(email: str) -> bool: # Implement the logic to check if the user is suspended print("Checking suspension status for", email) return True async def suspend_user(email: str) -> None: # Implement the logic to suspend the user print("Suspending the user", email) async def unsuspend_user(email: str) -> None: # Implement the logic to unsuspend the user print("Unsuspending the user", email) async def charge_customer(attempt: int) -> Optional[ChargeResult]: # Implement the logic to charge the customer print("Charging the customer") if attempt <= 2: raise Exception("Payment failed") return ChargeResult(invoice_id="INV123", total_cost=100) @serve.post("/payment-retries") async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None: email = context.request_payload["email"] async def _check_suspension() -> bool: return await check_suspension(email) for i in range(3): # attempt to charge the user async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) if not result: # Wait for a day ``` -------------------------------- ### Handle Webhook Event in TypeScript Source: https://upstash.com/docs/workflow/howto/use-webhooks Use context.run to process webhook events. This example specifically handles 'user.created' events, extracting user payload. Ensure event data is correctly parsed and validated before processing. ```typescript export const { POST } = serve(async (context) => { // ... Parse and validate the incoming request const user = await context.run( "handle-webhook-event", async () => { if (event.type === "user.created") { const { id: clerkUserId, email_addresses, first_name } = event.data; const primaryEmail = email_addresses.find( (email) => (email.id = event.data.primary_email_address_id) ); if (!primaryEmail) { return false; } return { event: event.type, userId: clerkUserId, email: primaryEmail.email_address, firstName: first_name, } as UserPayload; } return false; } ); }); ``` -------------------------------- ### Configure Flow Control for Workflow Trigger Source: https://upstash.com/docs/workflow/features/flow-control Configure flow control settings when triggering a workflow run. This example uses a specific key, sets parallelism to 1, and a rate of 10 steps per 100 time units. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", flowControl: { key: "user-signup", parallelism: 1, rate: 10, period: 100, } }) ``` -------------------------------- ### Configure Retry Delay Strategy Source: https://upstash.com/docs/workflow/features/retries This TypeScript example shows how to set a custom retry delay strategy for workflow steps. The `retryDelay` is defined as a mathematical expression that calculates the delay in milliseconds based on the number of retries. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", retries: 3, retryDelay: "(1 + retried) * 1000" }) ``` -------------------------------- ### Pause Workflow Execution for One Day in Python Source: https://upstash.com/docs/workflow/basics/context/sleep Use `await context.sleep()` to pause workflow execution. This example pauses for one day using a human-readable string format. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d") async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ``` -------------------------------- ### Define an Agent with Tools in Upstash Workflow Source: https://upstash.com/docs/workflow/agents/features Define an agent with a specific LLM model, name, maximum steps, available tools, and background context. This setup is for use within a Next.js application using Upstash Workflow. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo') const researcherAgent = agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' 'Utilize Wikipedia as much as possible for correct information', }) }) ``` -------------------------------- ### Trigger Workflow with Redacted Body and Headers (Python) Source: https://upstash.com/docs/workflow/howto/redact-fields This Python snippet demonstrates triggering a workflow run while redacting the request body and specific headers such as 'Authorization'. Make sure the 'upstash_workflow' library is installed. ```python from upstash_workflow import Client client = Client("") client.trigger( url="https://my-app.com/api/workflow", body={ "hello": "world", }, redact={ "body": True, "header": ["Authorization"] // or `header: True` to redact all headers }, ) ``` -------------------------------- ### Create AI SDK Tools from Client Source: https://upstash.com/docs/workflow/agents/features Use createAISDKTools to generate tools from an existing client, like WeatherClient. ```typescript import { createAISDKTools } from '@agentic/ai-sdk' import { WeatherClient } from '@agentic/weather' const weather = new WeatherClient() const tools = createAISDKTools(weather) ``` -------------------------------- ### Chaining Agents in Upstash Workflow Source: https://upstash.com/docs/workflow/agents/patterns/prompt-chaining This TypeScript code sets up and chains three agents. The first agent lists physicists, the second describes their work using a Wikipedia tool, and the third summarizes the descriptions. Ensure necessary Upstash and Langchain dependencies are installed. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo'); const agent1 = agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = agents.agent({ model, name: 'secondAgent', // set to 2 as this agent will first request tools // and then summarize them: maxSteps: 2, background: 'You are an agent that describes the work of' ' the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the ' 'works of the physicists mentioned previously.', tools: {} }); // Chaining agents const firstOutput = await agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ``` -------------------------------- ### Trigger Single Workflow with Client Source: https://upstash.com/docs/workflow/howto/start Use `client.trigger` to start a single workflow. This method allows for optional parameters like body, headers, workflow run ID, retries, delay, failure URL, and flow control. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3 // optional retries in the initial request delay: "10s" // optional delay value failureUrl: "https://", // optional failure url flowControl: { ... } // optional flow control }) console.log(workflowRunId) // prints wfr_my-workflow ``` -------------------------------- ### Invoke Another Workflow and Await Completion Source: https://upstash.com/docs/workflow/features/invoke Use `context.invoke` to start another workflow and wait for its execution to finish before proceeding. The response from the invoked workflow, including its status, is destructured into `body`, `isFailed`, and `isCanceled`. ```typescript const { body, isFailed, isCanceled } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, flowControl, workflowRunId } ) ``` -------------------------------- ### Create Cloudflare Worker with Upstash Workflow Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Define a workflow endpoint in your Cloudflare Worker's entrypoint file (e.g., src/index.ts) using the Upstash Workflow SDK. This example sets up two sequential steps. ```typescript import { serve } from "@upstash/workflow/cloudflare" interface Env { ENVIRONMENT: "development" | "production" } export default serve<{ text: string }> async (context) => { const initialPayload = context.requestPayload.text const result = await context.run("initial-step", async () => { console.log(`Step 1 running with payload: ${initialPayload}`) return { text: "initial step ran" } } ) await context.run("second-step", async () => { console.log(`Step 2 running with result from step 1: ${result.text}`) }) } ) ``` -------------------------------- ### GET /v2/workflows/dlq Source: https://upstash.com/docs/workflow/api-reference/dlq/list-failed-workflow-runs Lists and paginates through all failed workflow runs currently in the DLQ. Failed workflows end up in the DLQ after exhausting all retry attempts. You can filter, paginate, and inspect these failures to understand what went wrong and decide whether to resume, restart, or delete them. ```APIDOC ## GET /v2/workflows/dlq ### Description List and paginate through all failed workflow runs currently in the DLQ. Failed workflows end up in the DLQ after exhausting all retry attempts. You can filter, paginate, and inspect these failures to understand what went wrong and decide whether to resume, restart, or delete them. ### Method GET ### Endpoint /v2/workflows/dlq ### Parameters #### Query Parameters - **cursor** (string) - Optional - Pagination cursor. If provided, returns the next page of results. - **count** (integer) - Optional - The maximum number of failed workflow runs to return per page. (default: 100, maximum: 100) - **fromDate** (integer) - Optional - Filter by starting date in milliseconds (Unix timestamp). This is inclusive. - **toDate** (integer) - Optional - Filter by ending date in milliseconds (Unix timestamp). This is inclusive. - **workflowUrl** (string) - Optional - Filter by workflow URL. - **workflowRunId** (string) - Optional - Filter by workflow run ID. - **workflowCreatedAt** (integer) - Optional - Filter by workflow creation timestamp in milliseconds (Unix timestamp). - **label** (string) - Optional - Filter by label assigned to the workflow run. - **failureFunctionState** (string) - Optional - Filter by failure function state. Possible values: CALLBACK_INPROGRESS, CALLBACK_SUCCESS, CALLBACK_FAIL, CALLBACK_CANCELED. - **callerIp** (string) - Optional - Filter by IP address of the publisher. - **flowControlKey** (string) - Optional - Filter by Flow Control Key. ### Responses #### Success Response (200) - **cursor** (string) - Pagination cursor for the next page. Empty if no more results. - **messages** (array) - Array of failed workflow messages. - **dlqId** (string) - The DLQ ID of the failed workflow message. - **workflowUrl** (string) - The URL of the workflow. #### Error Response (400) - **code** (string) - Error code. - **message** (string) - Error message. #### Error Response (401) - **code** (string) - Error code. - **message** (string) - Error message. #### Error Response (500) - **code** (string) - Error code. - **message** (string) - Error message. ``` -------------------------------- ### Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a welcome email to a user in Python, notifying them of their free trial. This function is designed for asynchronous execution within a workflow. ```python async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### Import serve and async_serve functions in Python Source: https://upstash.com/docs/workflow/quickstarts/platforms Import the `serve` and `async_serve` functions for use with Python projects. ```python from upstash_workflow import serve, async_serve ``` -------------------------------- ### Provide Custom QStash Client Source: https://upstash.com/docs/workflow/basics/serve/advanced Supply your own QStash client instance using the `qstashClient` option if you need to manage multiple QStash projects within the same application. This bypasses the default client initialization from environment variables. ```typescript import { Client } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { qstashClient: new Client({ token: "" }) } ); ``` ```python from qstash import AsyncQStash @serve.post("/api/example", qstash_client=AsyncQStash(os.environ["QSTASH_TOKEN"])) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Get Failed Workflow Run Source: https://upstash.com/docs/workflow/api-reference/dlq/get-failed-workflow-run Retrieve details of a specific failed workflow run from the DLQ using its DLQ ID. ```APIDOC ## GET /v2/workflows/dlq/{dlqId} ### Description Get details of a specific failed workflow run from the DLQ. ### Method GET ### Endpoint /v2/workflows/dlq/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the failed workflow run. ### Responses #### Success Response (200) - **dlqId** (string) - The DLQ ID of the failed workflow message. - **workflowUrl** (string) - The URL of the workflow. - **workflowRunId** (string) - The ID of the workflow run. - **workflowCreatedAt** (integer) - The timestamp when the workflow run was created (Unix timestamp in milliseconds). - **url** (string) - The URL of the failed workflow step. - **method** (string) - The HTTP method used for the workflow step. - **header** (object) - The HTTP headers sent to the workflow step. - **body** (string) - The body of the message if it is composed of utf8 chars only, empty otherwise. - **bodyBase64** (string) - The base64 encoded body if the body contains a non-utf8 char only, empty otherwise. - **maxRetries** (integer) - The number of retries that should be attempted in case of delivery failure. - **createdAt** (integer) - The unix timestamp in milliseconds when the message was created. - **failureCallback** (string) - The url where we send a callback to after the workflow fails. - **callerIP** (string) - IP address of the publisher of this workflow. - **label** (string) - The label assigned to the workflow run. - **flowControlKey** (string) - The flow control key used for rate limiting. - **failureFunctionState** (string) - The state of the failure function if applicable. - **responseStatus** (integer) - The HTTP status code received from the destination API. - **responseHeader** (object) - The HTTP response headers received from the destination API. - **responseBody** (string) - The body of the response if it is composed of utf8 chars only, empty otherwise. - **responseBodyBase64** (string) - The base64 encoded body of the response if the body contains a non-utf8 char only, empty otherwise. #### Error Response (400) - **error** (string) - Error message #### Error Response (401) - **error** (string) - Error message #### Error Response (404) - **error** (string) - Error message #### Error Response (500) - **error** (string) - Error message ``` -------------------------------- ### GET /v2/workflows/logs Source: https://upstash.com/docs/workflow/api-reference/logs/list-workflow-run-logs Retrieves a list of workflow run logs. Supports filtering by various parameters such as cursor, workflow URL, run ID, creation timestamp, state, date range, and more. It also allows for pagination and trimming of request/response bodies. ```APIDOC ## GET /v2/workflows/logs ### Description Retrieves a list of workflow run logs. Supports filtering by various parameters such as cursor, workflow URL, run ID, creation timestamp, state, date range, and more. It also allows for pagination and trimming of request/response bodies. ### Method GET ### Endpoint /v2/workflows/logs ### Parameters #### Query Parameters - **cursor** (string) - Optional - Pagination cursor for fetching the next page of results. - **workflowUrl** (string) - Optional - Filter by workflow URL (exact match). Must start with http:// or https://. - **workflowRunId** (string) - Optional - Filter by specific workflow run ID. - **workflowCreatedAt** (integer) - Optional - Filter by workflow creation timestamp in milliseconds (Unix timestamp). - **workflowRuns** (string) - Optional - Filter by multiple workflow runs. Provide a comma-separated list of `workflowRunId@workflowCreatedAt` pairs to query specific runs in a single request. When this parameter is provided, all other filters are ignored. - **state** (string) - Optional - Filter by workflow or step state. Common states include: `RUN_STARTED`, `RUN_SUCCESS`, `RUN_FAILED`, `RUN_CANCELED`, `STEP_SUCCESS`, `STEP_RETRY`, `STEP_FAILED`, `STEP_PROGRESS`, `STEP_CANCELED`. - **fromDate** (integer) - Optional - Filter events from this date onwards in milliseconds (Unix timestamp). Inclusive. - **toDate** (integer) - Optional - Filter events up to this date in milliseconds (Unix timestamp). Inclusive. - **count** (integer) - Optional - Maximum number of results to return per page. Event mode: Max 1000 (default 1000). Run mode (groupBy=workflowRunId): Max 10 (default 10). - **trimBody** (integer) - Optional - Trim request/response bodies to this many bytes. Use -1 to exclude bodies entirely. Useful for reducing response size when bodies are large. - **label** (string) - Optional - Filter workflow run by the label assigned by the user. - **flowControlKey** (string) - Optional - Filter workflow run by the flow control key assigned by the user on trigger. - **callerIp** (string) - Optional - Filter workflow run by the caller IP address. ### Response #### Success Response (200) - **logs** (array) - An array of workflow log objects. - **nextCursor** (string) - A cursor for fetching the next page of results. #### Response Example ```json { "logs": [ { "workflowRunId": "run-123", "workflowId": "workflow-abc", "workflowCreatedAt": 1678886400000, "stepId": "step-xyz", "stepCreatedAt": 1678886460000, "state": "STEP_SUCCESS", "logTimestamp": 1678886465000, "message": "Step completed successfully.", "metadata": { "key": "value" } } ], "nextCursor": "cursor-456" } ``` ``` -------------------------------- ### Send Welcome Email (TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a welcome email to a user, informing them about their free trial. Ensure the email service is configured and accessible. ```typescript await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` -------------------------------- ### API Call with Dynamic Retries in Python Source: https://upstash.com/docs/workflow/examples/customRetry Implement custom retry logic for API calls using Python with Upstash Workflow. This example handles rate limiting and other errors by dynamically adjusting sleep times based on response headers. ```python from fastapi import FastAPI from typing import Dict, Any, TypedDict import os from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_response app = FastAPI() serve = Serve(app) class InitialData(TypedDict): user_data: str def create_system_message() -> Dict[str, str]: return { "role": "system", "content": "You are an AI assistant providing a brief summary and key insights for any given data.", } def create_user_message(data: str) -> Dict[str, str]: return {"role": "user", "content": f"Analyze this data chunk: {data}"} @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: # 👇 initial data sent along when triggering the workflow user_data = context.request_payload["user_data"] for attempt in range(10): response: CallResponse[Dict[str, Any]] = await context.call( "call-openai", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [create_system_message(), create_user_message(user_data)], "max_tokens": 150, }, ) # Success case if response.status_code < 300: async def _store_response_in_db() -> None: await store_response(response.body) await context.run("store-response-in-db", _store_response_in_db) return # Rate limit case - wait and retry if response.status_code == 429: ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens") ratelimit_requests_header = response.header.get( "x-ratelimit-reset-requests" ) reset_time = ( (ratelimit_tokens_header[0] if ratelimit_tokens_header else None) or (ratelimit_requests_header[0] if ratelimit_requests_header else None) or 10 ) # assuming `reset_time` is in seconds await context.sleep("sleep-until-retry", float(reset_time)) continue # Any other scenario - pause for 5 seconds to avoid overloading OpenAI API await context.sleep("pause-to-avoid-spam", 5) ``` -------------------------------- ### Create Next.js Project Source: https://upstash.com/docs/workflow/agents/getting-started Use this command to create a new Next.js project. Replace [project-name] with your desired project name and specify [options] as needed. ```bash npx create-next-app@latest [project-name] [options] ``` -------------------------------- ### Single Agent Workflow with Wikipedia Tool Source: https://upstash.com/docs/workflow/agents/features This TypeScript code defines a single agent that uses OpenAI and a Wikipedia tool to research topics. It's suitable for tasks requiring external knowledge retrieval and summarization. Ensure you have the necessary Upstash and Langchain dependencies installed. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo'); const researcherAgent = agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const task = agents.task({ agent: researcherAgent, prompt: "Tell me about 5 topics in advanced physics.", }); const { text } = await task.run(); console.log("result:", text) }) ``` -------------------------------- ### Execute Business Logic in context.run (Python) Source: https://upstash.com/docs/workflow/basics/caveats Place your business logic inside the `context.run` function for each step. Code outside `context.run` is only for connecting steps and will execute multiple times. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ``` -------------------------------- ### Configure Local QStash Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/astro Add the QSTASH_URL and QSTASH_TOKEN obtained from the local QStash server to your .env file for local development. ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="" ``` -------------------------------- ### Set Up Local Tunnel and Workflow URL Source: https://upstash.com/docs/workflow/sdk/workflow-py Create a public URL for your local development server using ngrok and set the UPSTASH_WORKFLOW_URL environment variable. ```bash ngrok http localhost:8000 export UPSTASH_WORKFLOW_URL= ``` -------------------------------- ### Get Flow Control Key Source: https://upstash.com/docs/workflow/api-reference/flow-control/get-flow-control-key Retrieve details of a specific Flow Control key. This endpoint allows you to check the status and configuration of a flow control key. ```APIDOC ## GET /v2/flowControl/{flowControlKey} ### Description Get details of a specific Flow Control key. ### Method GET ### Endpoint /v2/flowControl/{flowControlKey} ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The Flow Control key to retrieve ### Responses #### Success Response (200) - **flowControlKey** (string) - The flow control key name - **waitlistSize** (integer) - The number of messages waiting due to flow control configuration. #### Error Response (404) - **error** (string) - Error message ``` -------------------------------- ### Create .env.local file Source: https://upstash.com/docs/workflow/quickstarts/nuxt Create a .env.local file in your project root to store environment variables for local development. ```bash touch .env.local ``` -------------------------------- ### Configure Migration Mode (US Primary) Source: https://upstash.com/docs/workflow/howto/multi-region Enable migration mode by setting QSTASH_REGION to 'US_EAST_1'. This configuration allows simultaneous handling of requests from both US and EU regions. Define region-specific credentials for both primary and secondary regions. ```bash QSTASH_REGION="US_EAST_1" US_EAST_1_QSTASH_URL="https://qstash-us-east-1.upstash.io" US_EAST_1_QSTASH_TOKEN="your_us_token" US_EAST_1_QSTASH_CURRENT_SIGNING_KEY="your_us_current_key" US_EAST_1_QSTASH_NEXT_SIGNING_KEY="your_us_next_key" EU_CENTRAL_1_QSTASH_URL="https://qstash-eu-central-1.upstash.io" EU_CENTRAL_1_QSTASH_TOKEN="your_eu_token" EU_CENTRAL_1_QSTASH_CURRENT_SIGNING_KEY="your_eu_current_key" EU_CENTRAL_1_QSTASH_NEXT_SIGNING_KEY="your_eu_next_key" ``` -------------------------------- ### Notify within Workflow (Python) Source: https://upstash.com/docs/workflow/features/notify Use this to notify other workflows waiting for a specific event from within a workflow. Ensure the necessary imports and FastAPI setup are included. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from datetime import datetime app = FastAPI() serve = Serve(app) @serve.post("/api/order-processor") async def order_processor(context: AsyncWorkflowContext[str]) -> None: order_id = context.request_payload["order_id"] processing_result = context.request_payload["processing_result"] # Process the order async def _process_order(): return await process_order(order_id) result = await context.run("process-order", _process_order) # Notify waiting workflows that processing is complete notify_response = await context.notify( "notify-processing-complete", f"order-{order_id}", { "order_id": order_id, "status": "completed", "result": processing_result, "completed_at": datetime.utcnow().isoformat() } ) # Log notification results async def _log_notification(): print(f"Notified {len(notify_response)} waiting workflows") return notify_response await context.run("log-notification", _log_notification) ``` -------------------------------- ### Configure Single-Region Mode (EU) Source: https://upstash.com/docs/workflow/howto/multi-region Set these environment variables to operate in single-region mode, defaulting to the EU region. Ensure QSTASH_TOKEN and signing keys are configured for your EU endpoint. ```bash QSTASH_URL="https://qstash.upstash.io" QSTASH_TOKEN="your_eu_token" QSTASH_CURRENT_SIGNING_KEY="your_eu_current_key" QSTASH_NEXT_SIGNING_KEY="your_eu_next_key" ``` -------------------------------- ### Send Welcome Email on New Signup Source: https://upstash.com/docs/workflow/examples/customerOnboarding This snippet sends a welcome email to a new user as part of the onboarding process. It's executed within the 'new-signup' step of the workflow. ```typescript await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` ```python async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` -------------------------------- ### Minimal Express.js Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/express Define a minimal workflow endpoint in your Express.js application using the @upstash/workflow/express serve function. This example demonstrates a simple two-step workflow. ```typescript import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string }יי( async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) } ) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ``` -------------------------------- ### Generate and Send Report in Python Source: https://upstash.com/docs/workflow/examples/allInOne Python implementation using `context.run` for task management. Define helper async functions for clarity when running tasks within the workflow. ```python async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` -------------------------------- ### Programmatically Schedule Weekly User Summaries (Python) Source: https://upstash.com/docs/workflow/howto/schedule This Python code demonstrates how to schedule weekly summary reports for users using FastAPI and QStash. It calculates the first summary date and constructs a cron expression. A unique schedule ID is crucial for managing per-user schedules. ```python from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ``` -------------------------------- ### Instantiate LangChain Wikipedia Tool Source: https://upstash.com/docs/workflow/agents/features Initialize a Wikipedia query tool for LangChain. Configure topKResults and maxDocContentLength as needed. ```typescript import { WikipediaQueryRun } from '@langchain/community/tools/wikipedia_query_run' const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) ``` -------------------------------- ### Trigger Workflow with Failure URL (TypeScript) Source: https://upstash.com/docs/workflow/features/failureFunction/advanced Use this when you need to send failure callbacks to a different endpoint than your workflow's main endpoint. Ensure you have the Upstash SDK installed. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///workflow" failureUrl: "https:///workflow-failure" }) ``` -------------------------------- ### Format Code with Ruff Source: https://upstash.com/docs/workflow/sdk/workflow-py Apply code formatting rules to the entire project using Ruff. ```bash poetry run ruff format . ``` -------------------------------- ### Define a Workflow with Steps Source: https://upstash.com/docs/workflow/features/flow-control Define a workflow with sequential steps using `serve`. Each step is executed within the workflow context. ```typescript export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload await context.run("step-1", () => { ... }); await context.run("step-2", () => { ... }); await context.run("step-3", () => { ... }); }) ``` -------------------------------- ### Pause Workflow Execution for One Day in TypeScript Source: https://upstash.com/docs/workflow/basics/context/sleep Use `await context.sleep()` to pause workflow execution. This example pauses for one day using a human-readable string format. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail} from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { const signedInUser = await signIn(userData); return signedInUser; }); // 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d"); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` -------------------------------- ### Create and Activate Virtual Environment Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Create a Python virtual environment to manage project dependencies and activate it. This ensures a clean and isolated environment for your Python backend. ```bash python -m venv venv source venv/bin/activate ``` -------------------------------- ### Add ngrok Authtoken Source: https://upstash.com/docs/workflow/howto/local-development/local-tunnel Connect your ngrok CLI to your account by adding your authentication token. Replace `` with your actual token from the ngrok dashboard. ```bash ngrok config add-authtoken ``` -------------------------------- ### Create Stripe Customer in Python Source: https://upstash.com/docs/workflow/howto/use-webhooks Perform additional operations like creating a Stripe customer using context.run in Python, following initial webhook data extraction. Ensure user data is available before proceeding. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Previous validation and user data extraction if not user: return async def _create_stripe_customer(): return await stripe.customers.create( email=user["email"], name=f"{user['first_name']} {user['last_name']}", metadata={"user_id": user["user_id"]}, ) customer = await context.run("create-stripe-customer", _create_stripe_customer) # ... Additional steps ``` -------------------------------- ### Sequential Execution with context.run() in Python Source: https://upstash.com/docs/workflow/basics/context/run Run steps sequentially in Python using context.run() with async functions. Requires FastAPI and Upstash Workflow imports. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1(): return some_work(input) result1 = await context.run("step-1", _step1) async def _step2(): return some_other_work(result1) await context.run("step-2", _step2) ``` -------------------------------- ### Define Workflow with Failure Function Source: https://upstash.com/docs/workflow/features/failure-callback Example of defining a workflow endpoint with a failure function in TypeScript. The failure function is called when the workflow fails, allowing for custom error handling logic. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // 👇 Log error to monitoring system await logToSentry(...); // 👇 Send alert to team await sendSlackAlert(...); // 👇 Perform cleanup operations await cleanupWorkflowResources(...); }, } ); ``` -------------------------------- ### Programmatically Schedule Weekly User Summaries (TypeScript) Source: https://upstash.com/docs/workflow/howto/schedule Use this TypeScript code to schedule a weekly summary report for each user. Ensure you have a QStash client initialized with your token. The schedule ID should be unique per user for better management. ```typescript import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` -------------------------------- ### Use Workflow Hook in a React Component Source: https://upstash.com/docs/workflow/howto/realtime/basic A React component that utilizes the `useWorkflow` hook to trigger a workflow and display its steps in real-time. It includes a button to start the workflow and renders the steps as they complete. ```tsx "use client"; import { useWorkflow } from "@/hooks/useWorkflow"; export default function WorkflowPage() { const { trigger, steps, isRunFinished } = useWorkflow(); return (
{isRunFinished &&

✅ Workflow Finished!

}

Workflow Steps:

{steps.map((step, index) => (
{step.stepName} {Boolean(step.result) && : {JSON.stringify(step.result)}}
))}
); } ``` -------------------------------- ### POST /api/workflow - Environment Configuration Source: https://upstash.com/docs/workflow/basics/serve/advanced Explains the `env` option for providing environment variables manually, useful in environments where `process.env` is not available. ```APIDOC ## POST /api/workflow ### Description Allows manual injection of environment variables using the `env` option. Useful in environments where `process.env` is not available or for testing purposes. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **env** (object) - Optional - An object containing environment variables to be used by Workflow. ### Request Example ```json { "env": { "QSTASH_TOKEN": "", "QSTASH_CURRENT_SIGNING_KEY": "" } } ``` ### Response #### Success Response (200) - **(No specific fields mentioned)** #### Response Example ```json { "message": "Workflow scheduled with custom environment variables" } ``` ``` -------------------------------- ### Trigger Workflow with Redacted Body and Headers (TypeScript) Source: https://upstash.com/docs/workflow/howto/redact-fields Use this snippet to trigger a workflow run and redact the request body and specific headers like 'Authorization'. Ensure you have the '@upstash/workflow' package installed. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https://my-app.com/api/workflow", body: { hello: "world" }, redact: { body: true, header: ["Authorization"] // or `header: true` to redact all headers }, }); ``` -------------------------------- ### Run Project Tests Source: https://upstash.com/docs/workflow/sdk/workflow-py Execute all tests for the project using Poetry to manage the execution environment. ```bash poetry run pytest ``` -------------------------------- ### Send Trial Warning Email (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook This Python code waits for a specified duration, checks if the user has upgraded, and sends a trial warning email if they haven't. The workflow terminates early if the user is already upgraded. ```python await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) ``` -------------------------------- ### Deploy Vercel Project to Production Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Execute this command to deploy your Upstash Workflow project to production on Vercel. Preview deployments are not supported. ```bash vercel --prod ``` -------------------------------- ### Serve Workflow with Context (FastAPI) Source: https://upstash.com/docs/workflow/basics/context Integrate Upstash Workflows with FastAPI using the `Serve` class. The `AsyncWorkflowContext` is typed with the expected payload, and the `@serve.post` decorator defines the workflow endpoint. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Create .dev.vars file Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Create a .dev.vars file in your project root to store environment variables for local development. ```bash touch .dev.vars ``` -------------------------------- ### Execute Steps in Parallel with Promise.all Source: https://upstash.com/docs/workflow/features/parallel-steps Use Promise.all to run multiple context.run() steps concurrently. Ensure all promises are awaited to avoid unexpected behavior. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (context) => { // 👇 Execute steps in parallel const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ context.run("check-coffee-beans", () => checkInventory("coffee-beans")), context.run("check-cups", () => checkInventory("cups")), context.run("check-milk", () => checkInventory("milk")), ]); }); ``` -------------------------------- ### Execute Business Logic in context.run (TypeScript) Source: https://upstash.com/docs/workflow/basics/caveats Place your business logic inside the `context.run` function for each step. Code outside `context.run` is only for connecting steps and will execute multiple times. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) ``` -------------------------------- ### Get Flow Control Key Source: https://upstash.com/docs/workflow/api-reference/flow-control/get-flow-control-key-1 Retrieves details of a specific Flow Control key, including the current waitlist size. This information is crucial for understanding and managing concurrency and rate limiting for workflow steps. ```APIDOC ## GET /v2/keys/rotate ### Description Get details of a specific Flow Control key. Flow Control keys are used to manage concurrency and rate limiting for workflow steps. This endpoint returns the current waitlist size for the specified flow control key, which indicates how many workflow steps are currently waiting due to flow control constraints. ### Method GET ### Endpoint /v2/keys/rotate ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The Flow Control key to retrieve. ### Responses #### Success Response (200) - **flowControlKey** (string) - The flow control key name - **waitlistSize** (integer) - The number of messages waiting due to flow control configuration. #### Error Response (400, 401, 404, 500) - **error** (string) - Error message ``` -------------------------------- ### Set Local QStash Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Add the QStash URL and Token obtained from the local QStash CLI to your .env file. This configures your application to use the local QStash instance. ```dotenv export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN="" ``` -------------------------------- ### Configure Upstash Realtime Instance Source: https://upstash.com/docs/workflow/howto/realtime/basic Set up a Realtime instance with a defined schema for event types and a Redis client. This configuration is essential for managing real-time events. ```typescript import { InferRealtimeEvents, Realtime } from "@upstash/realtime"; import { Redis } from "@upstash/redis"; import z from "zod/v4"; const redis = Redis.fromEnv(); const schema = { workflow: { runFinish: z.object({}), stepFinish: z.object({ stepName: z.string(), result: z.unknown().optional(), }), }, }; export const realtime = new Realtime({ schema, redis }); export type RealtimeEvents = InferRealtimeEvents; ``` -------------------------------- ### Configure QStash Client with Vercel Bypass Header Source: https://upstash.com/docs/workflow/troubleshooting/vercel Configure the QStash client with the bypass header to automatically include it in all workflow steps and context.invoke calls. Ensure the VERCEL_AUTOMATION_BYPASS_SECRET environment variable is set. ```typescript import { Client } from '@upstash/qstash' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { // your workflow logic }, { qstashClient: new Client({ token: process.env.QSTASH_TOKEN!, headers: { "x-vercel-protection-bypass": process.env.VERCEL_AUTOMATION_BYPASS_SECRET! } }) }) ``` -------------------------------- ### POST /api/workflow - Receiver Configuration Source: https://upstash.com/docs/workflow/basics/serve/advanced Details how to pass an explicit QStash Receiver to verify incoming requests, ensuring they originate from QStash. ```APIDOC ## POST /api/workflow ### Description Allows passing an explicit QStash Receiver to verify incoming requests. Ensures that only QStash can trigger your workflow. Useful when managing multiple QStash projects. ### Method POST ### Endpoint /api/workflow ### Parameters #### Request Body - **receiver** (object) - Optional - A QStash Receiver instance. - **currentSigningKey** (string) - Required - The current signing key for QStash verification. - **nextSigningKey** (string) - Required - The next signing key for QStash verification. ### Request Example ```json { "receiver": { "currentSigningKey": "", "nextSigningKey": "" } } ``` ### Response #### Success Response (200) - **(No specific fields mentioned)** #### Response Example ```json { "message": "Workflow scheduled with custom receiver" } ``` ``` -------------------------------- ### Import serve function in Javascript/Typescript Source: https://upstash.com/docs/workflow/quickstarts/platforms Import the `serve` function for use with Javascript or Typescript projects. ```javascript import { serve } from "@upstash/workflow"; ``` -------------------------------- ### Accessing Context in Route Function Source: https://upstash.com/docs/workflow/basics/serve The route function receives a context object. Use it to access request payload, headers, and define workflow steps using context.run(). This example demonstrates accessing userId and defining a step. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // 👇 Access context properties const { userId } = context.requestPayload; // 👇 Define a workflow step await context.run("step-1", async () => {}) } ); ``` ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` -------------------------------- ### Create Stripe Customer in TypeScript Source: https://upstash.com/docs/workflow/howto/use-webhooks After initial webhook processing, use context.run to perform subsequent operations like creating a Stripe customer. This requires the user data extracted in the previous step. ```typescript export const { POST } = serve(async (context) => { // ... Previous validation and user data extraction if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); ``` -------------------------------- ### Define a Basic Workflow Endpoint in FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi This snippet shows how to set up a basic workflow endpoint in FastAPI. It defines two sequential steps, 'initial-step' and 'second-step', each printing a message. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` -------------------------------- ### Define a Basic Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/flask Create a Flask route that defines a workflow with sequential steps. Each step is executed using `context.run`. ```python from flask import Flask from upstash_workflow.flask import Serve app = Flask(__name__) serve = Serve(app) @serve.route("/api/workflow") def workflow(context) -> None: def _step1() -> None: print("initial step ran") context.run("initial-step", _step1) def _step2() -> None: print("second step ran") context.run("second-step", _step2) ``` -------------------------------- ### Create Workflow Endpoint Source: https://upstash.com/docs/workflow/howto/realtime/basic Defines the main workflow logic, including steps and middleware. Import `serve` from `@upstash/workflow/nextjs` and pass any necessary middlewares. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { realtimeMiddleware } from "@/lib/middleware"; type WorkflowPayload = { userId: string; action: string; }; export const { POST } = serve( async (context) => { const { userId, action } = context.requestPayload; await context.run("validate-data", async () => { return { valid: true, userId, action }; }); await context.run("process-action", async () => { // Your business logic here return { processed: true, userId, action }; }); return { success: true, workflowRunId: context.workflowRunId }; }, { middlewares: [realtimeMiddleware], } ); ``` -------------------------------- ### List DLQ Messages with Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client/dlq/list Initializes the Upstash Workflow client and retrieves all DLQ messages. Ensure you replace `` with your actual token. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // List all DLQ messages const { messages, cursor } = await client.dlq.list(); ``` -------------------------------- ### Verify Production Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers After deployment, test your production workflow endpoint by making a POST request to your production URL. Replace with your actual deployment URL. ```bash curl -X POST https:/// -D '{"text": "hello world!"}' ``` -------------------------------- ### Move configuration from serve to trigger Source: https://upstash.com/docs/workflow/howto/migrations Migrate configuration options like `retries`, `flowControl`, `retryDelay`, and `failureUrl` from `serve()` to `client.trigger()`. This allows for per-run configuration. ```typescript // Old export const { POST } = serve( async (context) => { ... }, { retries: 3, retryDelay: "1000 * (1 + retried)", flowControl: { key: "my-key", rate: 10 } } ); // Trigger call await client.trigger({ url: "..." }); // New export const { POST } = serve( async (context) => { ... } // No configuration here anymore ); // Configuration in trigger call await client.trigger({ url: "...", retries: 3, retryDelay: "1000 * (1 + retried)", flowControl: { key: "my-key", rate: 10 } }); ``` -------------------------------- ### Serve Workflow with Context (Next.js) Source: https://upstash.com/docs/workflow/basics/context Use the `serve` function from `@upstash/workflow/nextjs` to create a workflow endpoint. The context object is passed to the route function, providing access to workflow functionalities. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( // 👇 the workflow context async (context) => { // ... } ); ``` -------------------------------- ### Configure Local Environment Variables Source: https://upstash.com/docs/workflow/howto/local-development Sets the required environment variables to redirect Upstash SDK requests from the production service to the local development server. ```bash QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7RvLjqfZBvP5KEUimQCE1pvpLuou" QSTASH_NEXT_SIGNING_KEY="sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE" ``` -------------------------------- ### Define and Run Multi-Agent Workflow Source: https://upstash.com/docs/workflow/agents/features Sets up a multi-agent workflow with a researcher and a mathematician agent. The researcher uses Wikipedia, and the mathematician uses a calculator tool. This is useful for tasks requiring information retrieval and computation. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; import * as mathjs from 'mathjs' import { tool } from "ai"; import { z } from "zod"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-4o'); const researcherAgent = agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const mathAgent = agents.agent({ model, name: "mathematician", maxSteps: 2, tools: { calculate: tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." + "only call this tool if you need to calculate a mathematical expression." ``` ```typescript parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }), }, background: "You are a mathematician agent which can utilize" ``` ```typescript agents: [researcherAgent, mathAgent], prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations", }); const { text } = await task.run(); console.log("result:", text) }) ``` -------------------------------- ### Custom Middleware with Init Function Source: https://upstash.com/docs/workflow/howto/middlewares Implement a custom middleware using an init function to initialize resources like database connections before defining callbacks. This is useful for middlewares requiring external dependencies. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const databaseMiddleware = new WorkflowMiddleware({ name: "database-logger", init: async () => { // Initialize your resources const db = await connectToDatabase(); // Return the callbacks that use the initialized resources return { runStarted: async ({ context }) => { await db.insert({ workflowRunId: context.workflowRunId, status: 'started' }); }, runCompleted: async ({ context, result }) => { await db.update({ workflowRunId: context.workflowRunId, status: 'completed', result }); }, onError: async ({ workflowRunId, error }) => { await db.insert({ workflowRunId, level: 'error', message: error.message }); } }; } }); ``` -------------------------------- ### Trigger Workflow Endpoint in Production Source: https://upstash.com/docs/workflow/quickstarts/nuxt After deploying your application, verify the workflow endpoint by making a POST request to your production URL. Replace `` with your actual domain. ```bash curl -X POST https:///api/workflow ``` -------------------------------- ### Include at Least One Step in Python Workflow Source: https://upstash.com/docs/workflow/basics/caveats Every Python workflow must include at least one `context.run` call. Workflows without any steps will fail authentication. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 At least one step is required async def _dummy_step(): return await context.run("dummy-step", _dummy_step) ``` -------------------------------- ### Serve API Requests in Next.js Source: https://upstash.com/docs/workflow/basics/context/call Use `serve` to create an API endpoint in Next.js that can call external services. Ensure environment variables like `OPENAI_API_KEY` are set. ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const { userId, name } = context.requestPayload; const { status, headers, body } = await context.call("sync-user-data", { url: "https://my-third-party-app", // Endpoint URL method: "POST", body: JSON.stringify({ userId, name }), headers: { authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); }); ``` -------------------------------- ### Trigger Single Workflow Source: https://upstash.com/docs/workflow/basics/client/trigger Starts a single workflow run. Provide the workflow URL, an optional request payload, headers, a custom workflow run ID, retry configurations, delay, failure URL, and flow control settings. A unique `workflowRunId` is returned upon successful triggering. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3, // optional retries retryDelay: "1000 * (1 + retried)", // optional delay between retries delay: "10s" // optional delay value failureUrl: "https://", // optional failure url flowControl: { // optional flow control key: "USER_GIVEN_KEY", rate: 10, parallelism: 5, period: "10m" }, }) ``` -------------------------------- ### Configure OpenAI Compatible Provider Source: https://upstash.com/docs/workflow/agents/features Use an OpenAI compatible provider by specifying the baseURL and apiKey. This allows integration with services like Deepseek. ```typescript const model = agents.openai('deepseek-chat', { baseURL: "https://api.deepseek.com", apiKey: process.env.DEEPSEEK_API_KEY }) ``` -------------------------------- ### Charge Customer in Python Source: https://upstash.com/docs/workflow/examples/paymentRetry Attempts to charge a customer using a helper async function. Includes error handling to prevent retries. ```python async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) ``` -------------------------------- ### Configure Workflow with Environment Variables Source: https://upstash.com/docs/workflow/basics/serve/advanced Expose environment variables to your workflow. These values are accessible via `context.env` in TypeScript. Ensure all required Qstash environment variables are set. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // the env option will be available in the env field of the context: const env = context.env; }, { env: { QSTASH_URL: "", QSTASH_TOKEN: "", QSTASH_CURRENT_SIGNING_KEY: "", QSTASH_NEXT_SIGNING_KEY: "", } } ); ``` ```python import os from upstash_workflow import AsyncWorkflowContext, serve @serve.post( "/api/example", env={ "QSTASH_CURRENT_SIGNING_KEY": os.environ["QSTASH_CURRENT_SIGNING_KEY"], "QSTASH_NEXT_SIGNING_KEY": os.environ["QSTASH_NEXT_SIGNING_KEY"], }, ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Custom Initial Payload Parsing (Python) Source: https://upstash.com/docs/workflow/basics/serve/advanced Implement `initial_payload_parser` in Python to parse and type the initial request payload. The parsed payload is then available in the workflow context. ```python @dataclass class InitialPayload: foo: str bar: int def initial_payload_parser(initial_payload: str) -> InitialPayload: return parse_payload(initial_payload) @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[InitialPayload]) -> None: payload: InitialPayload = context.request_payload ``` -------------------------------- ### Set QStash Token Source: https://upstash.com/docs/workflow/sdk/workflow-py Export your QStash token as an environment variable. This is required for Upstash services. ```bash export QSTASH_TOKEN= ``` -------------------------------- ### Source Environment File Source: https://upstash.com/docs/workflow/quickstarts/fastapi This command sources the environment file, making environment variables available in the current shell session. This is typically required before running the FastAPI application. ```bash source .env ``` -------------------------------- ### Create Realtime API Endpoint Source: https://upstash.com/docs/workflow/howto/realtime/basic Set up an API route to handle Realtime connections using Server-Sent Events (SSE). This endpoint is crucial for enabling real-time communication between the server and the frontend. ```typescript import { handle } from "@upstash/realtime"; import { realtime } from "@/lib/realtime"; export const GET = handle({ realtime }); ``` -------------------------------- ### Call OpenAI Compatible Provider (Deepseek) Source: https://upstash.com/docs/workflow/integrations/openai Use `context.api.openai.call` with a custom `baseURL` to interact with OpenAI-compatible providers like Deepseek. ```typescript const { status, body } = await context.api.openai.call( "Call Deepseek", { baseURL: "https://api.deepseek.com", token: process.env.DEEPSEEK_API_KEY, operation: "chat.completions.create", body: { model: "deepseek-chat", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); ``` -------------------------------- ### Workflow with Sleep and Sequential Steps Source: https://upstash.com/docs/workflow/quickstarts/flask Implement a workflow that includes timed sleeps and processes data through multiple steps. Use `context.sleep_until` for specific wake-up times and `context.sleep` for relative delays. ```python from flask import Flask import time from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/sleep") def sleep(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) context.sleep_until("sleep1", time.time() + 3) def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = context.run("step2", _step2) context.sleep("sleep2", 2) def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) context.run("step3", _step3) ``` -------------------------------- ### Sync User in Database (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Defines an asynchronous function to create a user in the database, used within the workflow context. ```python async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] ``` -------------------------------- ### Download Dataset with context.call Source: https://upstash.com/docs/workflow/examples/allInOne Retrieves a dataset URL and then downloads the dataset using `context.call` for HTTP requests. This method is suitable for requests that might exceed standard serverless execution limits. ```typescript const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) ``` ```python async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body ``` -------------------------------- ### Update Environment Variables for Local QStash Source: https://upstash.com/docs/workflow/howto/local-development/development-server Configure these environment variables to direct QStash requests to your local development server. Ensure these match the output from the QStash CLI. ```env QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7RvLjqfZBvP5KEUimQCE1pvpLuou" QSTASH_NEXT_SIGNING_KEY="sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE" ``` -------------------------------- ### Update serve method imports Source: https://upstash.com/docs/workflow/howto/migrations Change imports from '@upstash/qstash/nextjs' to '@upstash/workflow/nextjs' for Next.js applications. For other platforms using h3, use '@upstash/workflow/h3'. ```typescript // old import { serve } from "@upstash/qstash/nextjs" // new import { serve } from "@upstash/workflow/nextjs" ``` -------------------------------- ### Basic Notification with Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client/notify Use this snippet to notify workflows waiting for a specific event. Ensure you have initialized the Client with your QSTASH_TOKEN. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "my-event-id", eventData: "my-data", // data passed to the workflow run }); ``` -------------------------------- ### Set Production QStash and Workflow URLs Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Configure your .env file with your QStash token and the public URL of your Upstash Workflow endpoint, typically provided by a local tunnel service. ```dotenv export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### Create New User in Stripe (TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook Creates a new user entry in Stripe. This is typically done after user creation in the main database. ```typescript await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); ``` -------------------------------- ### context.createWebhook API Source: https://upstash.com/docs/workflow/basics/context/createWebhook The context.createWebhook() function creates a unique webhook URL that can be called by external services to trigger workflow continuation. This webhook can be invoked multiple times to resume multiple context.waitForWebhook steps. ```APIDOC ## POST /context.createWebhook ### Description Creates a unique webhook that can be called by external services to trigger workflow continuation. The generated webhook URL can be called multiple times to resume multiple `context.waitForWebhook` steps. ### Method POST ### Endpoint /context.createWebhook ### Parameters #### Request Body - **stepName** (string) - Required - Name of the step. ### Response #### Success Response (200) - **webhookUrl** (string) - The unique webhook URL that external services should call to resume the workflow. Can be called multiple times to resume multiple `context.waitForWebhook` steps. - **eventId** (string) - The internal event identifier associated with this webhook. This is primarily used internally by `context.waitForWebhook`. ### Request Example ```json { "stepName": "create webhook" } ``` ### Response Example ```json { "webhookUrl": "https://example.com/webhook/unique-id", "eventId": "internal-event-id" } ``` ### Usage Example (TypeScript) ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const webhook = await context.createWebhook("create webhook"); console.log(webhook.webhookUrl); // Use this URL with external services }); ``` ``` -------------------------------- ### Handle Upstash QStash Callback in Next.js Source: https://upstash.com/docs/workflow/howto/failures This Next.js API route demonstrates how to receive and process callbacks from Upstash QStash, including signature verification. Ensure `bodyParser` is disabled for the API route. ```javascript // pages/api/callback.js import { verifySignature } from "@upstash/qstash/nextjs"; function handler(req, res) { // responses from qstash are base64-encoded const decoded = atob(req.body.body); console.log(decoded); return res.status(200).end(); } export default verifySignature(handler); export const config = { api: { bodyParser: false, }, }; ``` -------------------------------- ### Workflow with Authentication Check Source: https://upstash.com/docs/workflow/quickstarts/flask Implement a workflow that checks for a specific authentication header before executing its steps. If authentication fails, the workflow exits early. ```python from flask import Flask from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/auth") def auth(context: WorkflowContext[str]) -> None: if context.headers.get("Authentication") != "Bearer secret_password": print("Authentication failed.") return def _step1() -> str: return "output 1" context.run("step1", _step1) def _step2() -> str: return "output 2" context.run("step2", _step2) ``` -------------------------------- ### Deploy Cloudflare Worker to Production Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Use this command to deploy your Cloudflare Worker to production. Ensure all necessary environment variables are set in your Cloudflare Worker project settings. ```bash wrangler deploy ``` -------------------------------- ### Trigger Workflow Endpoint in Production Source: https://upstash.com/docs/workflow/quickstarts/astro Makes a POST request to the deployed workflow endpoint to initiate a workflow run. Replace with your actual production URL. Ensure the Content-Type header is set to application/json. ```bash curl -X POST /api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Sequential Execution with context.run() in TypeScript Source: https://upstash.com/docs/workflow/basics/context/run Use context.run() to execute steps sequentially by awaiting each call. Ensure imports from '@upstash/workflow/nextjs'. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const input = context.requestPayload; const result1 = await context.run("step-1", async () => { return someWork(input); }); await context.run("step-2", async () => { someOtherWork(result1); }); }); ``` -------------------------------- ### Serve Upstash Workflow with Next.js Source: https://upstash.com/docs/workflow/examples/dynamicWorkflow Sets up the Upstash Workflow endpoint for a Next.js application. It uses the `serve` function to handle incoming workflow requests and defines the workflow logic within the provided async callback. ```typescript import { WorkflowNonRetryableError } from "@upstash/workflow" import { serve } from "@upstash/workflow/nextjs" const addOne = (data: number): number => { return data + 1 } const multiplyWithTwo = (data: number): number => { return data * 2 } type FunctionName = "AddOne" | "MultiplyWithTwo" const functions: Record number> = { AddOne: addOne, MultiplyWithTwo: multiplyWithTwo, } interface WorkflowPayload { version: string functions: FunctionName[] } export const { POST } = serve(async (context) => { const { functions: steps } = context.requestPayload let lastResult = 0 for (let i = 0; i < steps.length; i++) { const stepName = steps[i] lastResult = await context.run( `step-${i}:${stepName}`, async () => { const fn = functions[stepName] if (!fn) throw new WorkflowNonRetryableError("Unknown step") return fn(lastResult) } ) } }) ``` -------------------------------- ### Wait for Order Processing Completion (Python) Source: https://upstash.com/docs/workflow/features/wait Implement order processing waits in Python using `wait_for_event`. Includes logic for handling timeouts and processing completed orders. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/order-processing") async def order_processing(context: AsyncWorkflowContext[str]) -> None: order_id = context.request_payload["order_id"] user_email = context.request_payload["user_email"] # Send order processing request async def _request_order_processing(): return await request_order_processing(order_id) await context.run("request-order-processing", _request_order_processing) # Wait for order processing completion result = await context.wait_for_event( "wait-for-order-processing", f"order-{order_id}", timeout="10m" # 10 minutes timeout ) if result["timeout"]: # Handle timeout scenario async def _handle_timeout(): return await handle_order_timeout(order_id, user_email) await context.run("handle-timeout", _handle_timeout) return # Process the completed order async def _process_completed_order(): return await process_completed_order(order_id, result["event_data"]) await context.run("process-completed-order", _process_completed_order) ``` -------------------------------- ### Sync User in Database (TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook Creates a new user in the database. This is part of the user synchronization process. ```typescript const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); ``` -------------------------------- ### Configure Local Tunnel Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/astro Configure environment variables for local development when using a local tunnel. Update your .env file with your QStash token and the public URL from your local tunnel. ```dotenv QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL="" ``` -------------------------------- ### Create Custom Fetch for OpenAI Client in Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk Create a custom fetch implementation for the OpenAI client within Upstash Workflow. This ensures that HTTP requests are made using `context.call`, allowing Upstash Workflow to manage long-running requests and provide durability. ```typescript import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, }); }; ``` -------------------------------- ### POST /api/workflow - URL Configuration Source: https://upstash.com/docs/workflow/basics/serve/advanced Demonstrates how to configure the `url` option to specify the full endpoint URL for the workflow, overriding the default inference. ```APIDOC ## POST /api/workflow ### Description Specifies the full endpoint URL of the workflow, including the route path. Useful when `request.url` is not reliable, such as behind a proxy. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters - **url** (string) - Optional - The full endpoint URL of the workflow. ### Request Example ```json { "url": "https://.com/api/workflow" } ``` ### Response #### Success Response (200) - **(No specific fields mentioned)** #### Response Example ```json { "message": "Workflow scheduled" } ``` ``` -------------------------------- ### Paginate Workflow Logs with Cursor Source: https://upstash.com/docs/workflow/basics/client/logs Fetch all workflow runs by repeatedly calling the logs method and using the returned cursor for subsequent requests. This ensures all available logs are retrieved. ```typescript const allRuns = []; let cursor: string | undefined; do { const result = await client.logs({ cursor }); allRuns.push(...result.runs); cursor = result.cursor; } while (cursor); ``` -------------------------------- ### Add RealtimeProvider to Root Layout Source: https://upstash.com/docs/workflow/howto/realtime/basic Wrap your application with the RealtimeProvider in the root layout to enable real-time capabilities across your application. This component manages the Realtime connection. ```tsx "use client"; import { RealtimeProvider } from "@upstash/realtime/client"; export default function RootLayout({ children, }: { children: React.ReactNode }) { return ( {children} ); } ``` -------------------------------- ### QStash Authentication Source: https://upstash.com/docs/workflow/api-reference/notify/notify-event QStash supports authentication via Bearer Tokens in the Authorization header or as a query parameter. ```APIDOC ## QStash Authentication ### Description QStash authentication can be performed using a JWT Bearer token provided in the `Authorization` header or as a `qstash_token` query parameter. ### Authentication Methods #### 1. Bearer Token Authentication - **Scheme**: `bearer` - **Format**: `JWT` - **Description**: Provide your QStash authentication token in the `Authorization` header. **Example Header:** `Authorization: Bearer YOUR_QSTASH_TOKEN` #### 2. Query Parameter Authentication - **Type**: `apiKey` - **In**: `query` - **Name**: `qstash_token` - **Description**: Provide your QStash authentication token as a query parameter. **Example URL:** `https://your-api.com/endpoint?qstash_token=YOUR_QSTASH_TOKEN` ``` -------------------------------- ### Call OpenAI Chat Completions Source: https://upstash.com/docs/workflow/integrations/openai Use `context.api.openai.call` for type-safe interaction with the OpenAI chat completions endpoint. Ensure you have your OPENAI_API_KEY configured. ```typescript const { status, body } = await context.api.openai.call( "Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); // get text: console.log(body.content[0].text) ``` -------------------------------- ### Initialize Upstash Workflow Client Source: https://upstash.com/docs/workflow/basics/client/logs Instantiate the Upstash Workflow client with your authentication token. This client is used to interact with the workflow API. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { runs, cursor } = await client.logs() ``` -------------------------------- ### Include at Least One Step in TypeScript Workflow Source: https://upstash.com/docs/workflow/basics/caveats Every TypeScript workflow must include at least one `context.run` call. Workflows without any steps will fail authentication. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 At least one step is required await context.run("dummy-step", async () => { return }) }) ``` -------------------------------- ### Run Multiple Workflow Steps in Parallel Source: https://upstash.com/docs/workflow/howto/parallel-runs Execute several workflow steps simultaneously using `Promise.all`. This is useful for independent tasks that can be processed concurrently. Ensure the `ctx.run` function is available in your context. ```typescript const [result1, result2, result3] = await Promise.all([ ctx.run("parallel-step-1", async () => { ... }), ctx.run("parallel-step-2", async () => { ... }), ctx.run("parallel-step-3", async () => { ... }), ]) ``` -------------------------------- ### Define Workflows with createWorkflow() Source: https://upstash.com/docs/workflow/features/invoke/serveMany Use `createWorkflow()` to define workflows as objects. This function accepts the same arguments as `serve()` but does not expose the workflow directly as an HTTP endpoint. It initializes a workflow object that can be invoked later. ```typescript const workflowOne = createWorkflow( // 👇 Request Payload Type async (context: WorkflowContext) => { await context.sleep("wait 1 second", 1) // 👇 Workflow Response Type return { message: "This is the data returned by the workflow" }; } ); const workflowTwo = createWorkflow(async (context) => { // 👇 Invoke the workflow with type-safe call const { body } = await context.invoke( "invoke workflowOne", { workflow: workflowOne, body: "user-1" } ), }); ``` -------------------------------- ### Check Code with Ruff Source: https://upstash.com/docs/workflow/sdk/workflow-py Perform code linting and style checks across the project using Ruff. ```bash poetry run ruff check . ``` -------------------------------- ### Parallel Execution with context.run() in TypeScript Source: https://upstash.com/docs/workflow/basics/context/run Execute steps in parallel by calling context.run() for each and awaiting Promise.all(). Requires imports from '@upstash/workflow/nextjs'. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve< string >( async (context) => { const input = context.requestPayload; const promise1 = context.run("step-1", async () => { return someWork(input); }); const promise2 = context.run("step-2", async () => { return someOtherWork(input); }); await Promise.all([promise1, promise2]); }, ); ``` -------------------------------- ### Create New User in Stripe (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Defines an asynchronous function to create a new user in Stripe, to be executed within the workflow context. ```python async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) ``` -------------------------------- ### Create Realtime Middleware for Workflow Events Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop Implement a custom middleware to automatically emit Realtime events during workflow execution. This middleware handles `workflow.waitingForInput`, `workflow.inputResolved`, and `workflow.stepFinish` events, centralizing Realtime logic. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; import { realtime } from "./realtime"; export const realtimeMiddleware = new WorkflowMiddleware({ name: "realtime-events", callbacks: { beforeExecution: async ({ context, stepName }) => { const channel = realtime.channel(context.workflowRunId); // Detect wait-for-event steps and emit waitingForInput if (stepName === "wait-for-approval") { await channel.emit("workflow.waitingForInput", { eventId: `approval-${context.workflowRunId}`, message: `Waiting for approval`, }); } }, afterExecution: async ({ context, stepName, result }) => { const channel = realtime.channel(context.workflowRunId); // Emit inputResolved after wait-for-event steps complete if (stepName === "wait-for-approval") { await channel.emit("workflow.inputResolved", { eventId: `approval-${context.workflowRunId}`, }); } // Emit stepFinish for all steps await channel.emit("workflow.stepFinish", { stepName, result, }); }, runCompleted: async ({ context }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.runFinish", {}); }, }, }); ``` -------------------------------- ### Create SvelteKit Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/svelte Define a workflow endpoint in SvelteKit by creating a +server.ts file within the routes/api/workflow directory. This endpoint uses the serve function from @upstash/workflow/svelte. ```typescript import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ``` -------------------------------- ### Run Non-Idempotent Code within context.run (Python) Source: https://upstash.com/docs/workflow/basics/caveats Wrap non-idempotent functions like database calls within `context.run` in Python to maintain workflow stability and prevent authentication errors. ```python async def _get_result_from_db(): return await get_result_from_db(entry_id) result = await context.run("get-result-from-db", _get_result_from_db) if result.should_return: return ``` -------------------------------- ### Create App Router Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Implement a workflow endpoint using the `serve` function for Next.js App Router. This endpoint defines and runs workflow steps. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) ``` -------------------------------- ### Initialize Receiver for Request Verification Source: https://upstash.com/docs/workflow/howto/security Instantiate a `Receiver` object with your signing keys to manually verify request signatures when environment variables are not suitable. This is an alternative to setting environment variables for verification. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ currentSigningKey: "", nextSigningKey: "", }), } ); ``` ```python from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Replace logger with middleware system Source: https://upstash.com/docs/workflow/howto/migrations Replace the old logging system with the new middleware system. Logging is now handled via middleware, and the `verbose` option is removed from `serve`. ```typescript // Old // Logging was automatic or controlled via verbose option export const { POST } = serve( async (context) => { ... }, { verbose: true } ); // New import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { ... }, { ``` -------------------------------- ### Restart All DLQ Entries Source: https://upstash.com/docs/workflow/basics/client/dlq/restart Restart all entries in the Dead Letter Queue by setting the `all` option to `true`. This is useful for reprocessing all failed workflow runs. ```typescript let cursor: string | undefined; do { const result = await client.dlq.restart({ all: true, cursor }); cursor = result.cursor; } while (cursor); ``` -------------------------------- ### Configure Workflow Retries (Python) Source: https://upstash.com/docs/workflow/basics/serve/advanced Set the `retries` parameter to define the number of times a workflow step should be retried upon failure. The default is 3. ```python @serve.post("/api/example", retries=3) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Create a Webhook in Upstash Workflow Source: https://upstash.com/docs/workflow/basics/context/createWebhook Use `context.createWebhook` to generate a unique URL. This URL can be called by external services to resume workflow execution at a specific step. The generated URL can be invoked multiple times. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const webhook = await context.createWebhook("create webhook"); console.log(webhook.webhookUrl); // Use this URL with external services }); ``` -------------------------------- ### Migrate onStepFinish to Custom Middleware Source: https://upstash.com/docs/workflow/howto/migrations Replace the removed onStepFinish callback with a custom WorkflowMiddleware. The afterExecution callback in the new middleware mirrors the functionality of onStepFinish. ```typescript // Old export const { POST } = serve( async (context) => { ... }, { onStepFinish: (stepName, result) => { console.log(`Step ${stepName} finished with:`, result); } } ); // New import { WorkflowMiddleware } from "@upstash/workflow"; const stepFinishMiddleware = new WorkflowMiddleware({ name: "step-finish", callbacks: { afterExecution: async ({ stepName, result }) => { console.log(`Step ${stepName} finished with:`, result); } } }); export const { POST } = serve( async (context) => { ... }, { middlewares: [stepFinishMiddleware] } ); ``` -------------------------------- ### Integrate Anthropic Model using AI SDK Source: https://upstash.com/docs/workflow/agents/features Integrate other AI providers, such as Anthropic, by importing their respective create methods from their SDK packages and passing them to AISDKModel. ```typescript import { createAnthropic } from "@ai-sdk/anthropic" const model = agents.AISDKModel({ context, provider: createAnthropic, providerParams: { apiKey: "", }, }); ``` -------------------------------- ### Send Invoice Email in Python Source: https://upstash.com/docs/workflow/examples/paymentRetry Sends an invoice email to the user upon successful payment, including invoice ID and total cost. Uses an async helper function. ```python async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) ``` -------------------------------- ### Generate and Send Report in TypeScript Source: https://upstash.com/docs/workflow/examples/allInOne Use `context.run` to execute asynchronous tasks for report generation and sending. This pattern is suitable for serverless functions where non-blocking operations are crucial. ```typescript const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) ``` -------------------------------- ### Integrate Multiple Middlewares in Next.js Source: https://upstash.com/docs/workflow/howto/middlewares Configure multiple middlewares, including logging, error tracking, and performance monitoring, for a Next.js workflow. Middlewares are executed in the order they appear in the array. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { // Your workflow logic }, { middlewares: [ loggingMiddleware, errorTrackingMiddleware, performanceMiddleware ] } ); ``` -------------------------------- ### Execute Dynamic Workflow Steps Source: https://upstash.com/docs/workflow/examples/dynamicWorkflow Iterates through a list of function names provided in the payload, executing each step sequentially using `context.run`. Each `context.run` call represents an isolated, resumable operation, ensuring exactly-once execution semantics and safe retries. ```typescript let lastResult = 0 for (let i = 0; i < steps.length; i++) { const stepName = steps[i] lastResult = await context.run( `step-${i}:${stepName}`, async () => { const fn = functions[stepName] if (!fn) throw new WorkflowNonRetryableError("Unknown step") return fn(lastResult) } ) } ``` -------------------------------- ### Safely retrieve context.requestPayload using context.run (Python) Source: https://upstash.com/docs/workflow/troubleshooting/general In Python, retrieve context.request_payload within a context.run step to guarantee it is never None. This pattern is recommended before executing context.call. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _get_payload() -> str: return context.request_payload # Payload will never be None payload = await context.run("get payload", _get_payload) # ... steps or any other code context.call( ... ) ``` -------------------------------- ### E-commerce Order Fulfillment Workflow (Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Implement an e-commerce order fulfillment workflow using Upstash Workflow with FastAPI. Ensure all necessary utility functions are imported and available. ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Configure Retry Attempt Count Source: https://upstash.com/docs/workflow/features/retries Use this TypeScript snippet to configure the number of times a step should be retried upon failure when triggering a workflow. Ensure you replace placeholders with your actual QStash token and workflow endpoint. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", retries: 3 }) ``` -------------------------------- ### Resume DLQ entries by ID Source: https://upstash.com/docs/workflow/basics/client/dlq/resume Resume one or more DLQ entries by providing their DLQ IDs. This is the most direct way to resume specific failed workflow runs. ```typescript await client.dlq.resume("dlq-12345"); ``` ```typescript await client.dlq.resume(["dlq-12345", "dlq-67890"]); ``` -------------------------------- ### Advanced Text Generation with Tools in Upstash Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk Implement advanced text generation by defining and using tools within your Upstash Workflow. Ensure `maxSteps` is greater than 1 when using tools. Each tool execution must be wrapped in a workflow step. ```typescript import { z } from 'zod'; import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError, tool } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) }), }, maxSteps: 2, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` -------------------------------- ### Authentication Source: https://upstash.com/docs/workflow/api-reference/dlq/list-failed-workflow-runs Details on how to authenticate API requests using bearer tokens or query parameters. ```APIDOC ## Authentication ### Description API requests must be authenticated. Two methods are supported: Bearer Token in the Authorization header or a `stash_token` query parameter. ### Security Schemes #### Bearer Token Authentication - **Type**: HTTP - **Scheme**: Bearer - **Format**: JWT - **Description**: Provide your QStash authentication token in the `Authorization` header as a Bearer token. **Example Header:** `Authorization: Bearer YOUR_QSTASH_TOKEN` #### Query Parameter Authentication - **Type**: API Key - **In**: Query - **Name**: `stash_token` - **Description**: Provide your QStash authentication token as a query parameter named `stash_token`. **Example URL:** `/your/endpoint?stash_token=YOUR_QSTASH_TOKEN` ``` -------------------------------- ### Call OpenAI API Source: https://upstash.com/docs/workflow/basics/context/api Use context.api.openai.call to interact with OpenAI services. Ensure you provide your OPENAI_API_KEY and specify the operation and body for the request. ```typescript const { status, body } = await context.api.openai.call("Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" }, ], }, }); ``` -------------------------------- ### POST /v2/workflows/dlq/resume Source: https://upstash.com/docs/workflow/api-reference/dlq/bulk-resume-workflows-from-dlq Bulk resumes workflows from the Dead Letter Queue (DLQ). This allows you to continue a failed workflow run from the point of failure without re-executing successful steps. ```APIDOC ## POST /v2/workflows/dlq/resume ### Description Bulk resumes workflows from the Dead Letter Queue (DLQ). This allows you to continue a failed workflow run from the point of failure without re-executing successful steps. ### Method POST ### Endpoint /v2/workflows/dlq/resume ### Parameters #### Query Parameters - **dlqIds** (array[string]) - Optional - List of specific DLQ IDs to resume. If provided, other filters are ignored. - **cursor** (string) - Optional - Pagination cursor for resuming workflows in batches. - **count** (integer) - Optional - Maximum number of workflows to resume. If not provided, all matching workflows will be resumed. - **fromDate** (integer[int64]) - Optional - Filter workflows by starting date, in milliseconds (Unix timestamp). This is inclusive. - **toDate** (integer[int64]) - Optional - Filter workflows by ending date, in milliseconds (Unix timestamp). This is inclusive. - **workflowUrl** (string) - Optional - Filter workflows by workflow URL. - **workflowRunId** (string) - Optional - Filter workflows by workflow run ID. - **workflowCreatedAt** (integer[int64]) - Optional - Filter workflows by creation timestamp in milliseconds (Unix timestamp). - **label** (string) - Optional - Filter workflows by label assigned by the user. - **failureFunctionState** (string) - Optional - Filter workflows by failure function state. Possible values: CALLBACK_INPROGRESS, CALLBACK_SUCCESS, CALLBACK_FAIL, CALLBACK_CANCELED. - **callerIp** (string) - Optional - Filter workflows by IP address of the publisher. - **flowControlKey** (string) - Optional - Filter workflows by Flow Control Key. #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries for the remaining workflow steps. - **Upstash-Delay** (string) - Optional - ### Request Example ```json { "dlqIds": ["dlq_id_1", "dlq_id_2"], "count": 10 } ``` ### Response #### Success Response (200) - **resumedCount** (integer) - The number of workflows successfully resumed. - **nextCursor** (string) - A cursor for fetching the next batch of workflows, if available. #### Response Example ```json { "resumedCount": 5, "nextCursor": "some_pagination_cursor" } ``` #### Error Handling - **400 Bad Request**: Invalid input parameters. - **401 Unauthorized**: Authentication failed. - **404 Not Found**: Workflow or DLQ entry not found. - **500 Internal Server Error**: Server error. ``` -------------------------------- ### Custom Middleware with Direct Callbacks Source: https://upstash.com/docs/workflow/howto/middlewares Create a custom middleware by providing direct callbacks for various lifecycle and debug events. This is suitable for simple logging or event handling. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const customMiddleware = new WorkflowMiddleware({ name: "custom-logger", callbacks: { // Lifecycle events runStarted: async ({ context }) => { console.log(`Workflow ${context.workflowRunId} started`); }, beforeExecution: async ({ context, stepName }) => { console.log(`Executing step: ${stepName}`); }, afterExecution: async ({ context, stepName, result }) => { console.log(`Step ${stepName} completed with result:`, result); }, runCompleted: async ({ context, result }) => { console.log(`Workflow ${context.workflowRunId} completed:`, result); }, // Debug events onError: async ({ workflowRunId, error }) => { console.error(`Error in ${workflowRunId}:`, error); }, onWarning: async ({ workflowRunId, warning }) => { console.warn(`Warning in ${workflowRunId}:`, warning); }, onInfo: async ({ workflowRunId, info }) => { console.info(`Info from ${workflowRunId}:`, info); } } }); ``` -------------------------------- ### Check Upgraded Plan Source: https://upstash.com/docs/workflow/examples/authWebhook Checks if a user has upgraded their plan. This function is a placeholder and should be implemented with actual logic. ```python async def check_upgraded_plan(email: str) -> bool: # Implement logic to check if the user has upgraded the plan print("Checking if the user has upgraded the plan", email) return False ``` -------------------------------- ### Create Realtime Middleware for Workflow Events Source: https://upstash.com/docs/workflow/howto/realtime/basic Implement a custom WorkflowMiddleware to automatically emit events to Upstash Realtime after each step execution and upon workflow completion. This middleware uses the workflow run ID to create a unique channel for each run. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; import { realtime } from "./realtime"; export const realtimeMiddleware = new WorkflowMiddleware({ name: "realtime-events", callbacks: { afterExecution: async ({ context, stepName, result }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.stepFinish", { stepName, result, }); }, runCompleted: async ({ context }) => { const channel = realtime.channel(context.workflowRunId); await channel.emit("workflow.runFinish", {}); }, }, }); ``` -------------------------------- ### Provide Custom QStash Receiver Source: https://upstash.com/docs/workflow/basics/serve/advanced Pass a QStash Receiver explicitly via the `receiver` option to verify incoming requests originate from QStash. This is useful for managing multiple QStash projects or when default environment variable initialization is not suitable. ```typescript import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ currentSigningKey: "", nextSigningKey: "", }) } ); ``` ```python from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Trigger Workflow Endpoint with Client Source: https://upstash.com/docs/workflow/agents/getting-started Use the Upstash Workflow Client to trigger a workflow endpoint. Replace `process.env.QSTASH_URL` and `process.env.QSTASH_TOKEN` with your actual Upstash credentials. The `url` should point to your locally running Next.js application. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ baseUrl: process.env.QSTASH_URL, token: process.env.QSTASH_TOKEN!, }) const workflowRunId = await client.trigger({ url: "http://127.0.0.1:3000/workflow", body: { prompt: "Explain the future of space exploration" } }) console.log(workflowRunId); ``` -------------------------------- ### Create Custom React Hook for Realtime Workflow Updates Source: https://upstash.com/docs/workflow/howto/realtime/basic A custom React hook to manage Realtime subscriptions for workflow updates. It subscribes to `workflow.stepFinish` and `workflow.runFinish` events using the `useRealtime` hook. ```typescript "use client"; import { useRealtime } from "@/lib/realtime-client"; import { useState, useCallback } from "react"; interface WorkflowStep { stepName: string; result?: unknown; } export function useWorkflow() { const [workflowRunId, setWorkflowRunId] = useState(null); const [steps, setSteps] = useState([]); const [isRunFinished, setIsRunFinished] = useState(false); useRealtime({ enabled: Boolean(workflowRunId), channels: workflowRunId ? [workflowRunId] : [], events: ["workflow.stepFinish", "workflow.runFinish"], onData({ event, data }) { if (event === "workflow.stepFinish") { setSteps((prev) => [...prev, data]); } if (event === "workflow.runFinish") { setIsRunFinished(true); } }, }); const trigger = () => { setSteps([]); setIsRunFinished(false); const response = await fetch("/api/trigger", { method: "POST", }); const data = await response.json(); setWorkflowRunId(data.workflowRunId); }; return { trigger, workflowRunId, steps, isRunFinished, }; } ``` -------------------------------- ### Execute Tool with context.call Source: https://upstash.com/docs/workflow/agents/features Configure a WorkflowTool to use context.call for making HTTP requests within its invoke function. Set executeAsStep to false. ```typescript import { WorkflowTool } from '@upstash/workflow' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { const tool = new WorkflowTool({ description: '...', schema: '...', invoke: ( ... ) => { // make HTTP call inside the tool with context.call: await context.call( ... ) }, executeAsStep: false }) // pass the tool to agent }) ``` -------------------------------- ### POST /api/workflow - Base URL Configuration Source: https://upstash.com/docs/workflow/basics/serve/advanced Shows how to use the `baseUrl` option to override only the base portion of the inferred URL, preserving the route structure. ```APIDOC ## POST /api/workflow ### Description Overrides the base portion of the inferred URL, useful for changing the host or scheme while keeping the route structure. Can also be configured globally via `UPSTASH_WORKFLOW_URL` environment variable. ### Method POST ### Endpoint /api/workflow ### Parameters #### Query Parameters - **baseUrl** (string) - Optional - Overrides the base URL for the workflow endpoint. ### Request Example ```json { "baseUrl": "" } ``` ### Response #### Success Response (200) - **(No specific fields mentioned)** #### Response Example ```json { "message": "Workflow scheduled with custom base URL" } ``` ``` -------------------------------- ### QStash Authentication Source: https://upstash.com/docs/workflow/api-reference/dlq/get-failed-workflow-run QStash supports authentication using JWT bearer tokens. These can be provided either via the standard 'Authorization: Bearer ' header or as a 'qstash_token' query parameter. ```APIDOC ## QStash Authentication ### Description QStash authentication token. ### Method All ### Endpoint / ### Parameters #### Header Parameters - **Authorization** (string) - Required - Bearer token for authentication. Example: `Bearer ` #### Query Parameters - **qstash_token** (string) - Required - QStash authentication token passed as a query parameter. ### Request Example ```json { "example": "Request with Authorization header or qstash_token query parameter" } ``` ### Response #### Success Response (200) - **status** (string) - Indicates success. #### Response Example ```json { "example": "{\"status\": \"ok\"}" } ``` ``` -------------------------------- ### Dynamically Set Base URL for Workflow Trigger Source: https://upstash.com/docs/workflow/howto/local-development/development-server This pattern dynamically sets the base URL for triggering workflows, using a production URL when available (e.g., VERCEL_URL) and falling back to a local development URL. ```javascript const BASE_URL = process.env.VERCEL_URL ? `https://${process.env.VERCEL_URL}` : `http://localhost:3000` const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, retries: 3 }); ``` -------------------------------- ### POST /v2/workflows/dlq/restart/{dlqId} Source: https://upstash.com/docs/workflow/api-reference/dlq/restart-workflow-from-dlq Restarts a failed workflow run from the DLQ. This operation initiates a new execution of the entire workflow from the beginning, generating a new workflow run ID. ```APIDOC ## POST /v2/workflows/dlq/restart/{dlqId} ### Description Restart a failed workflow run from the DLQ. The workflow will start from the beginning. Unlike resume, which continues from where the workflow failed, restart executes the entire workflow from the first step. A new workflow run ID is generated and all steps will be executed again. ### Method POST ### Endpoint /v2/workflows/dlq/restart/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the workflow run to restart. #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries for the workflow steps. - **Upstash-Delay** (string) - Optional - Override the delay before executing the workflow. Format is `` (e.g., "10s", "5m"). - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression for the workflow steps. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key for the workflow. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration in the format `parallelism=, rate=, period=`. - **Upstash-Label** (string) - Optional - Override the label for the workflow. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL for the workflow. ### Responses #### Success Response (200) - **workflowRunId** (string) - The ID of the restarted workflow run (a new ID is generated for the restarted run). - **workflowCreatedAt** (integer) - The timestamp when the restarted workflow run was created (Unix timestamp in milliseconds). #### Error Responses - **400** - Bad Request - Invalid DLQ ID, not a workflow message, or workflow doesn't support restart. - **401** - Unauthorized - **404** - DLQ message not found - **500** - Internal Server Error ``` -------------------------------- ### Define and Run a Workflow in Next.js Source: https://upstash.com/docs/workflow/examples/paymentRetry This TypeScript snippet shows how to define a workflow using `@upstash/workflow/nextjs`. It includes logic for charging a customer, handling payment failures with retries, and managing user suspension. Use this for server-side workflow definitions in a Next.js application. ```typescript import { serve } from "@upstash/workflow/nextjs"; type ChargeUserPayload = { email: string; }; export const { POST } = serve(async (context) => { const { email } = context.requestPayload; for (let i = 0; i < 3; i++) { // attempt to charge the user const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Unsuspend User const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } // send invoice email await context.run("send invoice email", async () => { await sendEmail( email, `Payment successful. Invoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); // by returning, we end the workflow run return; } } // suspend user if the user isn't suspended const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } }); async function sendEmail(email: string, content: string) { // Implement the logic to send an email console.log("Sending email to", email, "with content:", content); } async function checkSuspension(email: string) { // Implement the logic to check if the user is suspended console.log("Checking suspension status for", email); return true; } async function suspendUser(email: string) { // Implement the logic to suspend the user console.log("Suspending the user", email); } async function unsuspendUser(email: string) { // Implement the logic to unsuspend the user console.log("Unsuspending the user", email); } async function chargeCustomer(attempt: number) { // Implement the logic to charge the customer console.log("Charging the customer"); if (attempt <= 2) { throw new Error("Payment failed"); } return { invoiceId: "INV123", totalCost: 100, } as const; } ``` -------------------------------- ### client.dlq.resume Source: https://upstash.com/docs/workflow/basics/client/dlq/resume Resumes one or more workflow runs from the Dead Letter Queue (DLQ) at the point where they previously failed. ```APIDOC ## POST /v1/dlq/resume ### Description Resumes one or more workflow runs from the Dead Letter Queue (DLQ) at the point where they previously failed. ### Method POST ### Endpoint /v1/dlq/resume ### Parameters #### Request Body - **dlqIds** (string[]) - Optional - An array of DLQ IDs to resume. - **filter** (object) - Optional - Filters to select DLQ entries for resumption. - **workflowUrl** (string) - Optional - Filter by exact workflow URL. - **workflowRunId** (string) - Optional - Filter by workflow run ID. - **label** (string) - Optional - Resume workflows with this label. - **fromDate** (Date | number) - Optional - Resume workflows created after this date. - **toDate** (Date | number) - Optional - Resume workflows created before this date. - **callerIp** (string) - Optional - Filter by the IP address that triggered the workflow. - **flowControlKey** (string) - Optional - Filter by flow control key. - **workflowCreatedAt** (number) - Optional - Filter by workflow creation time (Unix timestamp in ms). - **failureFunctionState** (string) - Optional - Filter by failure callback state. - **all** (boolean) - Optional - Set to `true` to resume all DLQ entries. - **count** (number) - Optional - Maximum number of messages to process per call. Defaults to `100`. - **cursor** (string) - Optional - A pagination cursor from a previous request. - **flowControl** (object) - Optional - Flow control configuration. - **key** (string) - Required - A logical grouping key. - **rate** (number) - Required - Maximum allowed resumption requests per second. - **parallelism** (number) - Required - Maximum concurrent resumed runs. - **period** (string|number) - Optional - Time window for rate limit. Defaults to `1s`. - **retries** (number) - Optional - Number of retry attempts. Defaults to `3`. ### Request Example ```json { "dlqIds": ["dlq-12345", "dlq-67890"], "filter": { "label": "my-label", "workflowUrl": "https://example.com/workflow" }, "all": false, "count": 100, "cursor": "some-cursor", "flowControl": { "key": "my-flow-control-key", "rate": 10, "parallelism": 5, "period": "1s" }, "retries": 3 } ``` ### Response #### Success Response (200) - **cursor** (string) - A pagination cursor. If not returned, all matching entries have been processed. - **workflowRuns** (object[]) - An array of resumed workflow runs. - **workflowRunId** (string) - The ID of the workflow run resumed from the DLQ message. - **workflowCreatedAt** (number) - The Unix timestamp (in milliseconds) when the resumed run was created. #### Response Example ```json { "cursor": "next-cursor", "workflowRuns": [ { "workflowRunId": "run-abc123", "workflowCreatedAt": 1678886400000 } ] } ``` ``` -------------------------------- ### Initial Waiting Period in Workflow Source: https://upstash.com/docs/workflow/examples/customerOnboarding Pauses the workflow execution for 3 days using `context.sleep` to allow users time to interact with the platform before further actions are taken. ```typescript await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` ```python await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` -------------------------------- ### Define Custom AI SDK Tool Source: https://upstash.com/docs/workflow/agents/features Define a custom tool compatible with the AI SDK. The 'execute' function handles the tool's logic. ```typescript import { z } from 'zod' import { tool } from 'ai' import * as mathjs from 'mathjs' const mathTool = tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }) ``` -------------------------------- ### POST /websites/upstash_workflow Source: https://upstash.com/docs/workflow/api-reference/dlq/bulk-restart-workflows-from-dlq Restarts workflow runs, allowing for overrides of retry delay, flow control, and labels. ```APIDOC ## POST /websites/upstash_workflow ### Description Restarts workflow runs. This endpoint allows for overriding various configurations such as retry delay, flow control settings, and labels. ### Method POST ### Endpoint /websites/upstash_workflow ### Parameters #### Header Parameters - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression for the workflow steps. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key for the workflows. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration in the format `parallelism=, rate=, period=`. - **Upstash-Label** (string) - Optional - Override the label for the workflows. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL for the workflows. ### Responses #### Success Response (200) - **workflowRuns** (array) - Array of restarted workflow runs. - **workflowRunId** (string) - The ID of the restarted workflow run (a new ID is generated for each restarted run). - **workflowCreatedAt** (integer) - The timestamp when the restarted workflow run was created (Unix timestamp in milliseconds). - **cursor** (string) - Pagination cursor to use in subsequent requests. If empty, all matching workflows have been processed. #### Error Responses - **400** - Bad Request - **401** - Unauthorized - **404** - Some DLQ messages were not found - **500** - Internal Server Error *Error Response Body Example (for 400, 401, 404, 500): ```json { "error": "Error message" } ``` ``` -------------------------------- ### Source Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Load the environment variables defined in your .env file into your current shell session. This makes the variables accessible to your running application. ```bash source .env ``` -------------------------------- ### Implement Logging Middleware (TypeScript) Source: https://upstash.com/docs/workflow/basics/serve/advanced Integrate `loggingMiddleware` to intercept workflow lifecycle and debug events. This allows for custom logging and debugging. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve< string >( async (context) => { ... }, { middlewares: [loggingMiddleware] } ); ``` -------------------------------- ### Resume DLQ entries by filters Source: https://upstash.com/docs/workflow/basics/client/dlq/resume Resume DLQ entries that match specific criteria such as workflow URL, run ID, label, or creation date. This is useful for resuming a batch of related failed runs. ```typescript let cursor: string | undefined; do { const result = await client.dlq.resume({ filter: { label: "my-label", workflowUrl: "https://example.com/workflow", }, cursor, }); cursor = result.cursor; } while (cursor); ``` -------------------------------- ### Resume a Failed Workflow Run Source: https://upstash.com/docs/workflow/features/dlq/resume Use the `dlq.resume` method to resume a workflow run from its Dead Letter Queue (DLQ). Specify the DLQ ID and the number of retries. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.dlq.resume({ dlqId: "dlq-12345", retries: 3, }); ``` -------------------------------- ### Accessing context.requestPayload before context.call (Python) Source: https://upstash.com/docs/workflow/troubleshooting/general In Python, context.request_payload can be None when accessed before context.call. Use context.run to ensure the payload is available. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # Will print None while executing context.call payload = context.request_payload print(payload) # ... steps or any other code context.call( ... ) ``` -------------------------------- ### Verify Stock Availability Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Checks if items are in stock before proceeding. Halts the process if stock is unavailable. ```typescript const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } ``` ```python async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ``` -------------------------------- ### Resize Image to Multiple Resolutions Source: https://upstash.com/docs/workflow/examples/imageProcessing Resizes an image to specified resolutions (640, 1280, 1920) by calling an external image processing service. Uses Promise.all for parallel execution. ```typescript const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: JSON.stringify({ imageUrl, width: resolution, }) } ) )) ``` ```python resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] ``` -------------------------------- ### Generate Text with OpenAI Client in Upstash Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk Use this snippet to create a workflow endpoint that generates text using the OpenAI client. Ensure a step precedes `generateText` and handle potential `WorkflowAbort` errors. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` -------------------------------- ### Apply Filters to Resized Images Source: https://upstash.com/docs/workflow/examples/imageProcessing Applies various filters (grayscale, sepia, contrast) to resized images by calling an external service. Processes multiple filter and image combinations in parallel. ```typescript const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) ``` ```python filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] ``` -------------------------------- ### Implement Custom Authorization in Python Source: https://upstash.com/docs/workflow/howto/security Secure your workflow by extracting and validating a Bearer token from the `authorization` header using FastAPI and Upstash Workflow. This approach ensures custom authentication checks are performed. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: auth_header = context.headers.get("authorization") bearer_token = auth_header.split(" ")[1] if auth_header else None if not is_valid(bearer_token): print("Authentication failed.") return # Your workflow steps... ``` -------------------------------- ### Skip Steps with Conditional Execution Source: https://upstash.com/docs/workflow/features/retries/prevent-retries Use guard conditions to skip steps and exit early without errors or cancellations. The workflow completes successfully if no error is raised. ```typescript export const { POST } = serve<{ data: any }>(async (context) => { const { data } = context.requestPayload; // Check if order is still valid const orderStatus = await context.run("check-order-status", async () => { return await getOrderStatus(orderId); }); if (orderStatus === "not-found") { // Stop execution without error return; } // Continue processing if order is valid await context.run("process-order", async () => { return await processOrder(orderId); }); }); ``` ```python @serve.post("/conditional-execution") async def conditional_execution(context: AsyncWorkflowContext[dict]) -> None: data = context.request_payload["data"] async def _validate_data(): return validate_input_data(data) # Validate data first validation_result = await context.run("validate-data", _validate_data) if not validation_result["is_valid"]: # Log the validation failure async def _log_validation_failure(): await log_validation_error(validation_result["errors"]) await context.run("log-validation-failure", _log_validation_failure) # Stop execution without error return # Only execute if validation passes async def _process_valid_data(): return await process_data(data) await context.run("process-valid-data", _process_valid_data) ``` -------------------------------- ### Auth Provider Webhook Handler Source: https://upstash.com/docs/workflow/examples/authWebhook Handles incoming webhooks from an authentication provider to synchronize user data, create Stripe accounts, send welcome emails, and manage trial periods. ```python @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) # get user stats and send email with them async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) # wait until there are two days to the end of trial period and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` -------------------------------- ### Handle Conditional Logic in Python Workflow Source: https://upstash.com/docs/workflow/troubleshooting/general Implement conditional logic in Python workflows using `context.run` to manage non-deterministic conditions. This approach treats the condition as a distinct workflow step, avoiding authentication errors. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _check_condition() -> bool: return some_condition() should_return = await context.run("check condition", _check_condition) if should_return: return # rest of the workflow ``` -------------------------------- ### Retry Payment Logic in Python Source: https://upstash.com/docs/workflow/examples/paymentRetry Implements a loop for payment retries with a 24-hour sleep between attempts. Exits the workflow upon successful payment. ```python for i in range(3): # attempt to charge the customer if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Payment succeeded # Unsuspend user, send invoice email # end the workflow: return ``` -------------------------------- ### Lifecycle Event: runStarted Callback Source: https://upstash.com/docs/workflow/howto/middlewares Callback function executed when a workflow run begins. It receives the workflow context. ```typescript runStarted: async ({ context }) => { // Handle run start } ``` -------------------------------- ### Implement Evaluator-Optimizer Workflow Source: https://upstash.com/docs/workflow/agents/patterns/evaluator-optimizer Use this TypeScript code to set up a workflow where a generator agent creates content and an evaluator agent refines it through iterative feedback. Ensure the 'context' object is properly initialized for agent workflows. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ``` -------------------------------- ### Send Trial Warning Email (TypeScript) Source: https://upstash.com/docs/workflow/examples/authWebhook Checks if a user has upgraded their plan and sends a warning email if their trial is ending soon. This function includes a delay and a conditional check. ```typescript await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); ``` -------------------------------- ### Using Built-in Logging Middleware Source: https://upstash.com/docs/workflow/howto/middlewares Integrate the built-in logging middleware into your Next.js workflow to automatically log execution details to the console. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { loggingMiddleware } from "@upstash/workflow"; export const { POST } = serve( async (context) => { await context.run("step-1", () => { return "Hello World"; }); }, { middlewares: [loggingMiddleware] } ); ``` -------------------------------- ### Notify Workflow with External Event Data Source: https://upstash.com/docs/workflow/examples/waitForEvent Use the `client.notify` method to send event data to a waiting workflow. Ensure the `Client` is initialized with your QSTASH_TOKEN. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const orderId = "1324"; await client.notify({ eventId: `order-${orderId}`, eventData: { deliveryTime: "2 days" } }); ``` -------------------------------- ### Update context.call method signature Source: https://upstash.com/docs/workflow/howto/migrations The context.call method now accepts an options object for URL, method, and other parameters, instead of separate arguments. It also returns status, headers, and body, and does not fail the workflow if the request fails. ```javascript // old const result = await context.call("call step", "", "POST", ...) // new const { status, // response status headers, // response headers body // response body } = await context.call("call step", { url: "", method: "POST", ... }) ``` -------------------------------- ### Create Error Tracking Middleware Source: https://upstash.com/docs/workflow/howto/middlewares Create a custom WorkflowMiddleware to track errors and send them to an external monitoring service. Ensure the monitoring service endpoint and request body are correctly configured. ```typescript import { WorkflowMiddleware } from "@upstash/workflow"; const errorTrackingMiddleware = new WorkflowMiddleware({ name: "error-tracking", callbacks: { onError: async ({ workflowRunId, error }) => { await fetch("https://your-monitoring-service.com/errors", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ workflowRunId, error: error.message, stack: error.stack, timestamp: new Date().toISOString() }) }); } } }); ``` -------------------------------- ### Custom Initial Payload Parsing (TypeScript) Source: https://upstash.com/docs/workflow/basics/serve/advanced Use `initialPayloadParser` in TypeScript to transform incoming request payloads into a strongly-typed object before workflow execution. This is useful for non-standard payload formats. ```typescript type InitialPayload = { foo: string; bar: number; }; // 👇 1: provide initial payload type export const { POST } = serve( async (context) => { // 👇 3: parsing result is available as requestPayload const payload: InitialPayload = context.requestPayload; }, { // 👇 2: custom parsing for initial payload initialPayloadParser: (initialPayload) => { const payload: InitialPayload = parsePayload(initialPayload); return payload; }, } ); ``` -------------------------------- ### Configure Failure Handling with TypeScript Source: https://upstash.com/docs/workflow/howto/failures Use the `failureFunction` parameter in the `serve` method to define custom logic for handling workflow failures, such as logging errors to Sentry. The function can optionally return a string for UI display and logs. ```typescript export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // Handle error, i.e. log to Sentry console.error("Workflow failed:", failResponse); // You can optionally return a string that will be visible // in the UI (coming soon) and in workflow logs return `Workflow failed with status ${failStatus}: ${failResponse}`; }, } ); ``` -------------------------------- ### Type Check Code with Mypy Source: https://upstash.com/docs/workflow/sdk/workflow-py Run static type checking on the project using Mypy to catch type errors. ```bash poetry run mypy --show-error-codes . ``` -------------------------------- ### Basic Notification with context.notify Source: https://upstash.com/docs/workflow/basics/context/notify Use this snippet to send a basic notification to waiting workflows. Ensure the eventId matches the one used in context.waitForEvent. ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload; const { notifyResponse, // result of notify, which is a list of notified waiters } = await context.notify("notify step", "my-event-Id", payload); }); ``` -------------------------------- ### Create Human-in-the-Loop Workflow Endpoint Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop This TypeScript code defines a Next.js API route for a human-in-the-loop workflow. It uses `serve` from `@upstash/workflow/nextjs` to handle workflow execution, including running steps, waiting for external events, and processing results based on approval. It also integrates a `realtimeMiddleware` for event emissions. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { realtimeMiddleware } from "@/lib/middleware"; type WorkflowPayload = { userId: string; action: string; }; export const { POST } = serve( async (context) => { const { userId, action } = context.requestPayload; // Step 1: Initial Processing await context.run("initial-processing", async () => { // Your processing logic return { preprocessed: true, userId, action, requiresApproval: true, }; }); // Step 2: Wait for Human Approval const eventId = `approval-${context.workflowRunId}`; const { eventData, timeout } = await context.waitForEvent<{ approved: boolean; }>("wait-for-approval", eventId, { timeout: "5m" }); // Handle timeout if (timeout) { return { success: false, reason: "timeout" }; } const status = eventData.approved ? "approved" : "rejected"; // Step 3: Process based on approval await context.run(`process-${status}`, async () => { return { status, processedAt: Date.now(), action, userId, }; }); // Step 4: Finalize (only if approved) if (eventData.approved) { // Additional steps... } return { success: true, approved: eventData.approved, workflowRunId: context.workflowRunId, }; }, { middlewares: [realtimeMiddleware], } ); ``` -------------------------------- ### Send Batch Emails with Resend API Source: https://upstash.com/docs/workflow/integrations/resend Leverage `context.api.resend.call` with `batch: true` to send multiple emails efficiently. The `body` should be an array of email objects. ```typescript const { status, body } = await context.api.resend.call( "Call Resend", { batch: true, token: "", body: [ { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, ], headers: { "content-type": "application/json", }, } ); ``` -------------------------------- ### E-commerce Order Fulfillment Workflow (TypeScript) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Implement an e-commerce order fulfillment workflow using Upstash Workflow in a Next.js application. Ensure all necessary utility functions are imported and available. ```typescript import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` -------------------------------- ### Trigger Upstash Workflow Source: https://upstash.com/docs/workflow/integrations/aisdk This code snippet demonstrates how to trigger a deployed Upstash Workflow endpoint using the Upstash Workflow client. Replace placeholders with your actual QStash token and workflow endpoint URL. ```ts import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: { "prompt": "How is the weather in San Francisco around this time?" } }); ``` -------------------------------- ### Resume all DLQ entries Source: https://upstash.com/docs/workflow/basics/client/dlq/resume Resume all entries currently in the Dead Letter Queue. This is typically used when you want to clear the DLQ and retry all failed workflows. ```typescript let cursor: string | undefined; do { const result = await client.dlq.resume({ all: true, cursor }); cursor = result.cursor; } while (cursor); ``` -------------------------------- ### Trigger Workflow Endpoint Locally Source: https://upstash.com/docs/workflow/quickstarts/astro Makes a POST request to the local workflow endpoint to initiate a workflow run. A unique workflow run ID is returned upon success. Requires the Content-Type header to be application/json. ```bash curl -X POST http://localhost:3000/api/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### Resume Workflow Steps Source: https://upstash.com/docs/workflow/api-reference/dlq/bulk-resume-workflows-from-dlq This endpoint allows you to resume workflow steps, overriding various execution parameters like delay, retry logic, and flow control. ```APIDOC ## POST /websites/upstash_workflow ### Description Resume workflow steps, overriding delay, retry, and flow control settings. ### Method POST ### Endpoint /websites/upstash_workflow ### Parameters #### Header Parameters - **Upstash-Next-Delay** (string) - Optional - Override the delay before executing the next workflow step. Format is `` (e.g., "10s", "5m"). - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression for the remaining workflow steps. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key for the remaining workflow steps. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration in the format `parallelism=, rate=, period=`. - **Upstash-Label** (string) - Optional - Override the label for the remaining workflow steps. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL for the remaining workflow steps. ### Response #### Success Response (200) - **workflowRuns** (array) - Array of resumed workflow runs. - **workflowRunId** (string) - The ID of the resumed workflow run. - **workflowCreatedAt** (integer) - The timestamp when the resumed workflow run was created (Unix timestamp in milliseconds). - **cursor** (string) - Pagination cursor to use in subsequent requests. If empty, all matching workflows have been processed. #### Response Example (200) ```json { "workflowRuns": [ { "workflowRunId": "run-123", "workflowCreatedAt": 1678886400000 } ], "cursor": "next-cursor-value" } ``` #### Error Responses - **400** - Bad Request - **401** - Unauthorized - **404** - Some DLQ messages were not found - **500** - Internal Server Error ``` -------------------------------- ### Dynamic Base URL for Workflow Trigger Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Dynamically set the base URL for triggering workflows based on the environment (local or production). This helps avoid hardcoding URLs. ```javascript const BASE_URL = process.env.VERCEL_URL ? `https://${process.env.VERCEL_URL}` : `http://localhost:3000` const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, retries: 3 }); ``` -------------------------------- ### Retry Payment Logic in TypeScript Source: https://upstash.com/docs/workflow/examples/paymentRetry Implements a loop to retry payment charging up to 3 times with a 24-hour delay between attempts. Returns early if payment succeeds. ```typescript for (let i = 0; i < 3; i++) { // attempt to charge the customer if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Payment succeeded // Unsuspend user, send invoice email // end the workflow: return; } } ``` -------------------------------- ### Handle Webhook Event in Python Source: https://upstash.com/docs/workflow/howto/use-webhooks Process webhook events using context.run in Python. This snippet handles 'user.created' events, extracting user data. It's crucial to validate and parse the incoming event data correctly. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Parse and validate the incoming request async def _handle_webhook_event(): if event.type == "user.created": clerk_user_id = event.data["id"] email_addresses = event.data["email_addresses"] first_name = event.data["first_name"] primary_email = next( ( email for email in email_addresses if email.id == event.data["primary_email_address_id"] ), None, ) if not primary_email: return False return { "event": event.type, "user_id": clerk_user_id, "email": primary_email["email_address"], "first_name": first_name, } return False user = await context.run("handle-webhook-event", _handle_webhook_event) ``` -------------------------------- ### Workflow Endpoint with External Call in FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi This snippet illustrates how to make an external HTTP call from within a workflow step using `context.call`. It also shows how to process the response from the external service. ```python from fastapi import FastAPI from typing import Dict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.post("/get-data") async def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.post("/call") async def call(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) response: CallResponse[Dict[str, str]] = await context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) async def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output await context.run("step2", _step2) ``` -------------------------------- ### Safely retrieve context.requestPayload using context.run (TypeScript) Source: https://upstash.com/docs/workflow/troubleshooting/general To ensure context.requestPayload is never undefined, use context.run to retrieve it before other operations. This is particularly useful before calling context.call. ```typescript export const { POST } = serve(async (context) => { // Payload will never be undefined const payload = await context.run("get payload", () => context.requestPayload) // ... steps or any other code context.call( ... ) }) ``` -------------------------------- ### Verify Production Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/express Use this cURL command to verify that your deployed workflow endpoint is accessible and functioning correctly. Ensure the Content-Type header is set to application/json. ```bash curl -X POST /workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Send Email Source: https://upstash.com/docs/workflow/examples/authWebhook Sends an email to a specified address with given content. This function is a placeholder and should be implemented with actual logic. ```python async def send_email(email: str, content: str) -> None: # Implement logic to send an email print("Sending email to", email, content) ``` -------------------------------- ### Workflow Context Object Functions Source: https://upstash.com/docs/workflow/basics/context Functions available on the context object to define and manage workflow steps, including running tasks, managing time, and handling events. ```APIDOC ## Context Object Functions You can use the functions exposed by context object to define workflow steps. * [context.run](/workflow/basics/context/run) * [context.sleep](/workflow/basics/context/sleep) * [context.sleepUntil](/workflow/basics/context/sleepUntil) * [context.waitForEvent](/workflow/basics/context/waitForEvent) * [context.createWebhook](/workflow/basics/context/createWebhook) * [context.waitForWebhook](/workflow/basics/context/waitForWebhook) * [context.notify](/workflow/basics/context/notify) * [context.invoke](/workflow/basics/context/invoke) * [context.call](/workflow/basics/context/call) * [context.cancel](/workflow/basics/context/cancel) * [context.api](/workflow/basics/context/api) ``` -------------------------------- ### Restart DLQ Entries with Options Source: https://upstash.com/docs/workflow/basics/client/dlq/restart Restart DLQ entries with advanced options, including flow control to manage concurrency and execution rate, and specifying the number of retries for the restarted workflow invocations. ```typescript const { messages } = await client.dlq.list(); const response = await client.dlq.restart(messages[0].dlqId, { flowControl: { key: "my-flow-control-key", parallelism: 10, }, retries: 3, }); ``` -------------------------------- ### Test Local Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Make a POST request to your local workflow endpoint to test it. A unique workflow run ID is returned upon successful execution. ```bash curl -X POST https://localhost:8787/ -D '{"text": "hello world!"}' # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### Send Confirmation and Notification Emails Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Sends an order confirmation email to the customer and a dispatch notification. ```typescript await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) ``` ```python async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Send Reminder Email (Python) Source: https://upstash.com/docs/workflow/examples/authWebhook This Python snippet waits for 7 days, retrieves user statistics, and then conditionally sends an email based on whether problems have been solved. It's designed for asynchronous workflows. ```python await context.sleep("wait", 7 * 24 * 60 * 60) async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) ``` -------------------------------- ### Resume Workflow from DLQ Source: https://upstash.com/docs/workflow/api-reference/dlq/resume-workflow-from-dlq This endpoint allows you to resume a workflow that has been moved to the Dead Letter Queue (DLQ). You can optionally override several parameters for the remaining steps of the workflow. ```APIDOC ## POST /v2/workflows/dlq/resume/{dlqId} ### Description Resume a workflow from the Dead Letter Queue (DLQ). ### Method POST ### Endpoint /v2/workflows/dlq/resume/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ ID of the workflow run to resume. #### Query Parameters None #### Header Parameters - **Upstash-Retries** (integer) - Optional - Override the number of retries for the remaining workflow steps. - **Upstash-Delay** (string) - Optional - Override the delay before executing the next workflow step. Format is `` (e.g., "10s", "5m"). - **Upstash-Retry-Delay** (string) - Optional - Override the retry delay expression for the remaining workflow steps. - **Upstash-Flow-Control-Key** (string) - Optional - Override the flow control key for the remaining workflow steps. - **Upstash-Flow-Control-Value** (string) - Optional - Override the flow control configuration in the format `parallelism=, rate=, period=`. - **Upstash-Label** (string) - Optional - Override the label for the remaining workflow steps. - **Upstash-Failure-Callback** (string) - Optional - Override the failure callback URL for the remaining workflow steps. ### Request Body None ### Response #### Success Response (200) - **workflowRunId** (string) - The ID of the resumed workflow run (a new ID is generated for the resumed run). - **workflowCreatedAt** (integer) - The timestamp when the resumed workflow run was created (Unix timestamp in milliseconds). #### Response Example ```json { "workflowRunId": "wr_abc123", "workflowCreatedAt": 1678886400000 } ``` #### Error Responses - **400 Bad Request**: Invalid DLQ ID, not a workflow message, or workflow doesn't support resume. - **401 Unauthorized**: Authentication failed. - **404 Not Found**: DLQ message not found. - **500 Internal Server Error**: An unexpected error occurred on the server. ``` -------------------------------- ### Parallel Agent Execution and Aggregation Source: https://upstash.com/docs/workflow/agents/patterns/parallelization Use this pattern to run multiple agents concurrently for different tasks and then aggregate their responses. Ensure all necessary imports are included. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { agentWorkflow } from "@upstash/workflow-agents"; export const { POST } = serve(async (context) => { const agents = agentWorkflow(context); const model = agents.openai('gpt-3.5-turbo'); // Define worker agents const worker1 = agents.agent({ model, name: 'worker1', maxSteps: 1, background: 'You are an agent that explains quantum physics.', tools: {} }); const worker2 = agents.agent({ model, name: 'worker2', maxSteps: 1, background: 'You are an agent that explains relativity.', tools: {} }); const worker3 = agents.agent({ model, name: 'worker3', maxSteps: 1, background: 'You are an agent that explains string theory.', tools: {} }); // Await results const [result1, result2, result3] = await Promise.all([ agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(), agents.task({ agent: worker2, prompt: "Explain relativity." }).run(), agents.task({ agent: worker3, prompt: "Explain string theory." }).run(), ]); // Aggregating results const aggregator = agents.agent({ model, name: 'aggregator', maxSteps: 1, background: 'You are an agent that summarizes multiple answers.', tools: {} }); const task = await agents.task({ agent: aggregator, prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}` }) const finalSummary = await task.run(); console.log(finalSummary.text); }); ``` -------------------------------- ### Configure Failure Handling with Python Source: https://upstash.com/docs/workflow/howto/failures Define a separate `failure_function` and associate it with your workflow endpoint using the `failure_function` parameter in the `@serve.post` decorator for custom error handling. This function receives context and failure details. ```python async def failure_function( context, # context during failure fail_status, # failure status fail_response, # failure message fail_headers # failure headers ): # handle the failure pass @serve.post("/api/example", failure_function=failure_function) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Set Signing Keys for Built-in Verification Source: https://upstash.com/docs/workflow/howto/security Configure environment variables with your Upstash signing keys to enable built-in request signature verification. These keys are essential for ensuring that only Upstash Workflow can send requests to your endpoint. ```bash QSTASH_CURRENT_SIGNING_KEY=xxxxxxxxx QSTASH_NEXT_SIGNING_KEY=xxxxxxxxx ``` -------------------------------- ### Send Trial Ended Email in Python Source: https://upstash.com/docs/workflow/examples/authWebhook This Python snippet sends an email to users whose trial has expired, prompting them to upgrade. It incorporates a delay to wait for the trial to end. ```python await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` -------------------------------- ### Make Third-Party API Call to OpenAI Source: https://upstash.com/docs/workflow/examples/customRetry Sends a request to OpenAI using `context.api.openai.call` or `context.call`. This method bypasses platform-specific timeout limits for long-running requests. Ensure your API key is set in the environment variables. ```typescript const response = await context.api.openai.call(`call-openai`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [createSystemMessage(), createUserMessage(userData)], max_completion_tokens: 150, }, }) ``` ```python response: CallResponse[Dict[str, Any]] = await context.call( "call-openai", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [create_system_message(), create_user_message(user_data)], "max_tokens": 150, }, ) ``` -------------------------------- ### Restart DLQ Entries by Filters Source: https://upstash.com/docs/workflow/basics/client/dlq/restart Filter DLQ entries based on various criteria such as workflow URL, label, creation date, or caller IP before restarting. This allows for targeted reprocessing of specific groups of failed runs. ```typescript let cursor: string | undefined; do { const result = await client.dlq.restart({ filter: { label: "my-label" }, cursor, }); cursor = result.cursor; } while (cursor); ``` -------------------------------- ### context.call() - HTTP Request Source: https://upstash.com/docs/workflow/basics/context/call The context.call() function allows you to perform HTTP requests as a workflow step. It supports long-running requests and handles retries and flow control. ```APIDOC ## POST context.call() ### Description Performs an HTTP request as a workflow step, supporting longer response times up to 12 hours. The request is executed by Upstash on your behalf. If the endpoint responds with a non-success status code, `context.call()` still returns the response and the workflow continues, allowing for inspection and handling of failure cases. ### Method POST (implicitly, as it's a function call within a workflow) ### Endpoint N/A (Function call within the workflow runtime) ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body This function takes arguments directly, not a request body in the traditional HTTP sense. - **url** (string) - Required - The URL of the HTTP endpoint to call. - **method** (string) - Optional - The HTTP method to use (`GET`, `POST`, `PUT`, etc.). Defaults to `GET`. - **body** (string) - Optional - The request body as a string. - **headers** (object) - Optional - A map of headers to include in the request. - **retries** (number) - Optional - Number of retry attempts if the request fails. Defaults to `0`. - **retryDelay** (string|number) - Optional - Delay between retries (in milliseconds). Defaults to exponential backoff. Can use mathematical expressions and the `retried` variable. - **flowControl** (object) - Optional - Throttle outbound requests. - **key** (string) - A logical grouping key for shared flow control limits. - **rate** (number) - The maximum number of allowed requests per second. - **parallelism** (number) - The maximum number of concurrent requests allowed. - **period** (string|number) - The time window for rate limiting. Defaults to `1s`. - **timeout** (number) - Optional - Maximum time (in seconds) to wait for a response. Applies individually to each attempt if retries are enabled. - **workflow** (any) - Optional - When using `serveMany`, allows calling another workflow defined in the same `serveMany`. ### Request Example ```javascript await context.call({ url: "https://api.example.com/data", method: "POST", body: JSON.stringify({ key: "value" }), headers: { "Content-Type": "application/json" }, retries: 3, retryDelay: "max(1000, pow(2, retried))", timeout: 60 }); ``` ### Response #### Success Response (200-299) - **status** (number) - The HTTP response status code. - **body** (string) - The response body. Attempts to parse as JSON; returns raw string if parsing fails. - **headers** (dictionary) - The response headers. #### Response Example ```json { "status": 200, "body": "{\"message\": \"Success\"}", "headers": { "Content-Type": "application/json" } } ``` #### Error Handling If the endpoint responds with a non-success status code (outside 200-299), `context.call()` returns the response, allowing you to inspect the `status` field and handle failures. ### Tip In TypeScript, you can declare the expected result type for strong typing: ```typescript type ResultType = { field1: string, field2: number }; const result = await context.call( ... ); ``` ``` -------------------------------- ### Suspend User on Payment Failure (Python) Source: https://upstash.com/docs/workflow/examples/paymentRetry This Python code checks for user suspension and proceeds to suspend the user and send an email notification if the account is not already suspended. The helper functions '_check_suspension', '_suspend_user', and '_send_suspended_email' encapsulate the respective operations. ```python async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` -------------------------------- ### Trigger Workflow with Vercel Bypass Header Source: https://upstash.com/docs/workflow/troubleshooting/vercel When triggering a workflow, pass the Vercel bypass secret as a header to ensure the request is not blocked by deployment protection. This is required in addition to configuring the QStash client. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.trigger({ url: "https://vercel-preview.com/workflow", headers: { "x-vercel-protection-bypass": process.env.VERCEL_AUTOMATION_BYPASS_SECRET! }, body: "Hello world!" }) ``` ```bash curl -X POST \ 'https://vercel-preview.com/workflow' \ -H 'x-vercel-protection-bypass: ' \ -d 'Hello world!' ``` -------------------------------- ### Customize OpenAI Request/Response Types Source: https://upstash.com/docs/workflow/integrations/openai Override default request and response types for `context.api.openai.call` to match your specific data structures. ```typescript type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.openai.call< ResponseBodyType, RequestBodyType >( "Call OpenAI", { ... } ); ``` -------------------------------- ### Post Request to Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/hono Make a POST request to your workflow endpoint using curl to initiate a workflow run. The response will include a unique workflow run ID. ```bash curl -X POST https://localhost:8787/workflow # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### Process Payment Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Processes the payment for the order after stock verification. ```typescript await context.run("process-payment", async () => { return await processPayment(orderId) }) ``` ```python async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) ``` -------------------------------- ### Trigger a Workflow Run with Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Initiate a workflow run using the `trigger()` function. Ensure this is called from a server-side action to protect credentials. ```javascript import { Client } from "@upstash/workflow"; const client = Client() const { workflowRunId } = await client.trigger({ url: `http://localhost:3000/api/workflow`, body: "Hello World!", retries: 3 }); ``` -------------------------------- ### StepInfo Object Structure Source: https://upstash.com/docs/workflow/api-reference/logs/list-workflow-run-logs Defines the structure of the StepInfo object, which contains details about each step in a workflow. ```APIDOC ## StepInfo Object ### Description Represents a single step within an Upstash workflow, detailing its execution status, type, and associated data. ### Properties - **stepId** (integer, int64) - The unique identifier for this step. - **stepName** (string) - The name of the step. - **stepType** (string) - The type of step (e.g., call, wait, sleep, invoke). - **callType** (string) - The call type (e.g., step, parallelPlan, parallelResult). - **messageId** (string) - The message ID for this step. - **state** (string) - The state of the step (STEP_SUCCESS, STEP_RETRY, STEP_FAILED, STEP_PROGRESS, STEP_CANCELED). - **createdAt** (integer, int64) - When the step was created (Unix timestamp in milliseconds). - **out** (string) - The output/result of the step. - **callUrl** (string) - The URL called in this step. - **callMethod** (string) - The HTTP method used. - **callBody** (string) - The request body sent. - **callHeaders** (object) - The request headers sent. Additional properties are arrays of strings. - **callResponseStatus** (integer, int64) - The HTTP status code received. - **callResponseBody** (string) - The response body received. - **callResponseHeaders** (object) - The response headers received. Additional properties are arrays of strings. - **waitEventId** (string) - The event ID being waited for (wait steps only). - **waitTimeout** (boolean) - Whether the wait timed out. - **invokedWorkflowRunId** (string) - The run ID of the invoked workflow (invoke steps only). - **invokedWorkflowUrl** (string) - The URL of the invoked workflow. - **retries** (integer) - Maximum number of retries configured. - **nextDeliveryTime** (integer, int64) - Next scheduled retry time (Unix timestamp in milliseconds). ``` -------------------------------- ### Retrieve Image URL Source: https://upstash.com/docs/workflow/examples/imageProcessing Fetches the URL of an uploaded image using its ID. This is the first step in the image processing pipeline. ```typescript const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) ``` ```python async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) ``` -------------------------------- ### Default Backoff Algorithm Source: https://upstash.com/docs/workflow/features/retries This JavaScript code illustrates the default exponential backoff algorithm used for retries. The delay increases with each retry attempt, capped at 86400 seconds (24 hours). ```javascript // n = how many times this request has been retried delay = min(86400, e ** (2.5*n)) // in seconds ``` -------------------------------- ### Handle Workflow Info Messages Source: https://upstash.com/docs/workflow/howto/middlewares Implement the onInfo callback to handle informational messages logged during workflow execution. This function receives the workflow run ID and the info message. ```typescript onInfo: async ({ workflowRunId, info }) => { // Handle info } ``` -------------------------------- ### Gracefully Cancel Workflow with context.cancel() Source: https://upstash.com/docs/workflow/features/retries/prevent-retries Cancel a workflow run explicitly using `context.cancel()`. This labels the run as canceled, bypassing the failure handler and DLQ. ```typescript export const { POST } = serve<{ orderId: string }>(async (context) => { const { orderId } = context.requestPayload; // Check if order is still valid const orderStatus = await context.run("check-order-status", async () => { return await getOrderStatus(orderId); }); if (orderStatus === "cancelled") { // Stop execution gracefully without error await context.cancel(); return; } // Continue processing if order is valid await context.run("process-order", async () => { return await processOrder(orderId); }); }); ``` ```python @serve.post("/graceful-cancellation") async def graceful_cancellation(context: AsyncWorkflowContext[dict]) -> None: order_id = context.request_payload["order_id"] async def _check_order_status(): return await get_order_status(order_id) # Check if order is still valid order_status = await context.run("check-order-status", _check_order_status) if order_status == "cancelled": # Stop execution gracefully without error await context.cancel() return # Continue processing if order is valid async def _process_order(): return await process_order(order_id) await context.run("process-order", _process_order) ``` -------------------------------- ### React UI for Workflow Approval Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop This component utilizes the `useWorkflowWithRealtime` hook to display workflow steps and present an approval UI when the workflow is waiting for user input. It allows users to approve or reject a workflow step. ```typescript "use client"; import { useWorkflowWithRealtime } from "@/hooks/useWorkflowWithRealtime"; export default function WorkflowPage() { const { trigger, isTriggering, steps, isRunFinished, waitingState, continueWorkflow, } = useWorkflowWithRealtime(); return (
{isRunFinished && (

✅ Workflow Finished!

)} {/* Show workflow steps */}

Workflow Steps:

{steps.map((step, index) => (
{step.stepName} {Boolean(step.result) && ( : {JSON.stringify(step.result)} )}
))}
{/* Show approval UI when waiting for input */} {waitingState && (

{waitingState.message}

)}
); } ``` -------------------------------- ### Workflow Run Schema Source: https://upstash.com/docs/workflow/api-reference/dlq/list-failed-workflow-runs Defines the structure of a workflow run, including its ID, creation timestamp, and details of each step. ```APIDOC ## Workflow Run Details ### Description Provides detailed information about a specific workflow run, including its status, execution steps, and associated metadata. ### Endpoint `/websites/upstash_workflow` (Conceptual - This describes the data structure, not a specific endpoint) ### Parameters #### Request Body Fields (Conceptual) - **workflowRunId** (string) - The ID of the workflow run. - **workflowCreatedAt** (integer) - The timestamp when the workflow run was created (Unix timestamp in milliseconds). - **url** (string) - The URL of the failed workflow step. - **method** (string) - The HTTP method used for the workflow step. - **header** (object) - The HTTP headers sent to the workflow step. Additional properties are arrays of strings. - **body** (string) - The body of the message if it is composed of utf8 chars only, empty otherwise. - **bodyBase64** (string) - The base64 encoded body if the body contains a non-utf8 char only, empty otherwise. - **maxRetries** (integer) - The number of retries that should be attempted in case of delivery failure. - **createdAt** (integer) - The unix timestamp in milliseconds when the message was created. - **failureCallback** (string) - The url where we send a callback to after the workflow fails. - **callerIP** (string) - IP address of the publisher of this workflow. - **label** (string) - The label assigned to the workflow run. - **flowControlKey** (string) - The flow control key used for rate limiting. - **failureFunctionState** (string) - The state of the failure function if applicable. - **responseStatus** (integer) - The HTTP status code received from the destination API. - **responseHeader** (object) - The HTTP response headers received from the destination API. Additional properties are arrays of strings. - **responseBody** (string) - The body of the response if it is composed of utf8 chars only, empty otherwise. - **responseBodyBase64** (string) - The base64 encoded body of the response if the body contains a non-utf8 char only, empty otherwise. ### Response #### Success Response (200) - **Workflow Run Object** (object) - Contains the details of the workflow run as described in the parameters section. #### Response Example ```json { "workflowRunId": "wr_12345", "workflowCreatedAt": 1678886400000, "url": "https://api.example.com/step1", "method": "POST", "header": { "Content-Type": ["application/json"] }, "body": "{\"key\": \"value\"}", "bodyBase64": "", "maxRetries": 3, "createdAt": 1678886405000, "failureCallback": "https://callback.example.com/fail", "callerIP": "192.168.1.1", "label": "my-workflow", "flowControlKey": "fc_abcde", "failureFunctionState": "completed", "responseStatus": 200, "responseHeader": { "X-Request-ID": ["req_zyxw"] }, "responseBody": "{\"status\": \"success\"}", "responseBodyBase64": "" } ``` ``` -------------------------------- ### Update Next.js serve method export Source: https://upstash.com/docs/workflow/howto/migrations The Next.js serve method now returns an object with exported handlers, requiring a change from `export const POST = serve(...)` to `export const { POST } = serve(...)`. ```javascript // old export const POST = serve(...); // new export const { POST } = serve(...); ``` -------------------------------- ### POST /v2/batch/trigger Source: https://upstash.com/docs/workflow/api-reference/runs/batch-trigger-workflow-runs Initiates multiple workflow runs concurrently. This endpoint is useful for triggering several workflows with a single API call, improving efficiency. ```APIDOC ## POST /v2/batch/trigger ### Description Start multiple workflow runs in a single request. ### Method POST ### Endpoint /v2/batch/trigger ### Parameters #### Request Body - **destination** (string) - Required - The URL of the workflow to trigger. - **body** (string) - Optional - The raw request payload passed to the workflow endpoint as is. You can access it via request payload parameter on the context object. - **headers** (object) - Optional - HTTP headers to forward to the workflow. You can pass any headers supported in the single trigger API. - Additional Properties: (string) ### Request Example ```json [ { "destination": "https://example.com/workflow1", "body": "{\"key\": \"value\"}", "headers": { "X-Custom-Header": "MyValue" } }, { "destination": "https://example.com/workflow2" } ] ``` ### Response #### Success Response (201) - **workflowRunId** (string) - The ID of the triggered workflow run. - **workflowCreatedAt** (integer) - The unix timestamp in milliseconds when the workflow run was created. - **deduplicated** (boolean) - Whether the workflow run was deduplicated (i.e. a run with the same ID already existed and a new one was not created). #### Response Example ```json [ { "workflowRunId": "wr_abc123", "workflowCreatedAt": 1678886400000, "deduplicated": false }, { "workflowRunId": "wr_def456", "workflowCreatedAt": 1678886401000, "deduplicated": true } ] ``` #### Error Response - **400** Bad Request - **401** Unauthorized - **500** Internal Server Error ``` -------------------------------- ### Define Realtime Event Schema Source: https://upstash.com/docs/workflow/howto/realtime/human-in-the-loop Extend your Realtime schema with `waitingForInput` and `inputResolved` event types for human-in-the-loop workflows. These events signal when the workflow needs user input and when that input has been processed. ```typescript const schema = { workflow: { runFinish: z.object({}), stepFinish: z.object({ stepName: z.string(), result: z.unknown().optional(), }), waitingForInput: z.object({ eventId: z.string(), message: z.string(), }), inputResolved: z.object({ eventId: z.string(), }), }, }; ``` -------------------------------- ### Restart DLQ Entries by ID Source: https://upstash.com/docs/workflow/basics/client/dlq/restart Restart a single DLQ entry or multiple entries by providing their IDs directly. This is useful for reprocessing specific failed workflow runs. ```typescript await client.dlq.restart("dlq-12345"); ``` ```typescript await client.dlq.restart(["dlq-12345", "dlq-67890"]); ```