### Install Upstash Workflow Packages Source: https://upstash.com/docs/workflow/agents/getting-started Installs the necessary Upstash Workflow packages along with AI and Zod for the project using npm. ```bash npm i @upstash/workflow ai zod ``` -------------------------------- ### Start Local QStash Server Source: https://upstash.com/docs/workflow/agents/getting-started Commands to start the local QStash server using either npm or pnpm. This is essential for local development and testing of Upstash Workflow. ```bash npx @upstash/qstash-cli dev ``` ```bash pnpm dlx @upstash/qstash-cli dev ``` -------------------------------- ### Run Next.js App Source: https://upstash.com/docs/workflow/agents/getting-started This command starts the Next.js development server, which is required to run the Upstash Workflow endpoint locally. Ensure you have Node.js and npm installed. ```bash npm run dev ``` -------------------------------- ### Create Next.js Project Source: https://upstash.com/docs/workflow/agents/getting-started Command to create a new Next.js project. Ensure you replace '[project-name]' with your desired project name and '[options]' with any desired Next.js configuration flags. ```bash npx create-next-app@latest [project-name] [options] ``` -------------------------------- ### QStash CLI Development Server Output Example (Plaintext) Source: https://upstash.com/docs/workflow/howto/local-development This is an example of the output you will see when the QStash CLI development server starts. It provides the local server URL and necessary credentials (QSTASH_TOKEN, QSTASH_CURRENT_SIGNING_KEY, QSTASH_NEXT_SIGNING_KEY) for authorization and sample cURL requests. ```plaintext Upstash QStash development server is runnning at http://127.0.0.1:8080 A default user has been created for you to authorize your requests. QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= QSTASH_CURRENT_SIGNING_KEY=sig_7RvLjqfZBvP5KEUimQCE1pvpLuou QSTASH_NEXT_SIGNING_KEY=sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE Sample cURL request: curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" Check out documentation for more details: https://upstash.com/docs/qstash/howto/local-development ``` -------------------------------- ### Start QStash CLI Development Server (JavaScript) Source: https://upstash.com/docs/workflow/howto/local-development This command starts the local Upstash QStash development server. It's essential for local development and testing of Upstash Workflow applications. Ensure you have the QStash CLI installed. ```javascript npx @upstash/qstash-cli dev ``` -------------------------------- ### Set Environment Variables for Upstash Workflow Source: https://upstash.com/docs/workflow/agents/getting-started Configuration for the .env.local file, setting essential environment variables for QStash URL, QStash Token, and OpenAI API Key required by Upstash Workflow. ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= OPENAI_API_KEY= ``` -------------------------------- ### Python Environment Setup and Package Installation Source: https://upstash.com/docs/workflow/quickstarts/flask This snippet demonstrates how to create a Python virtual environment and install the required packages for Upstash Workflow and Flask. It ensures a clean and isolated environment for the project. Dependencies include 'fastapi', 'uvicorn', and 'upstash-workflow'. ```bash python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Start Local QStash Server (pnpm) Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Starts a local QStash server using pnpx with pnpm. This command is equivalent to the npm version for starting the local development server. ```bash pnpx @upstash/qstash-cli dev ``` -------------------------------- ### Next.js Workflow Quickstart (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt A basic example of initializing and running a workflow in a Next.js application using TypeScript. This requires the @upstash/workflow package to be installed. ```typescript import { Workflow } from '@upstash/workflow'; const workflow = new Workflow({ // Configuration options }); // Define workflow steps workflow.step('step1', async (input) => { // ... logic ... return { output: 'processed' }; }); workflow.run('myWorkflow', { initialInput: 'data' }); ``` -------------------------------- ### QStash Development Server Output Example (Text) Source: https://upstash.com/docs/workflow/llms-txt An example of the output logs from the QStash development server when it starts. It includes the local server URL and authentication tokens required for configuring your application. ```text Upstash QStash development server is runnning at http://127.0.0.1:8080 A default user has been created for you to authorize your requests. QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= QSTASH_CURRENT_SIGNING_KEY=sig_7RvLjqfZBvP5KEUimQCE1pvpLuou QSTASH_NEXT_SIGNING_KEY=sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE Sample cURL request: curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" Check out documentation for more details: https://upstash.com/docs/qstash/howto/local-development ``` -------------------------------- ### Clone Next.js & Flask Example Project Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Clones the Upstash Workflow example repository for Next.js and Flask. This sets up the project structure for further configuration. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-flask ``` -------------------------------- ### Trigger Workflow with TypeScript Client Source: https://upstash.com/docs/workflow/agents/getting-started This TypeScript code demonstrates how to initialize the Upstash Workflow client and trigger a workflow. It requires environment variables for QSTASH_URL and QSTASH_TOKEN. The client sends a POST request to the specified URL with a JSON body and logs the returned workflow run ID. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ baseUrl: process.env.QSTASH_URL, token: process.env.QSTASH_TOKEN!, }) const workflowRunId = await client.trigger({ url: "http://127.0.0.1:3000/workflow", body: { prompt: "Explain the future of space exploration" }, }) console.log(workflowRunId); ``` -------------------------------- ### Project Setup and Build with Bun Source: https://upstash.com/docs/workflow/llms-txt Installs project dependencies and builds the project using Bun, a fast JavaScript runtime and toolkit. Bun is a required tool for this project. ```Bash bun install bun run build ``` ```Bash bun install bun run build ``` -------------------------------- ### Triggering Upstash Workflow (Python SDK) Source: https://upstash.com/docs/workflow/getstarted Shows how to initiate an Upstash Workflow using the Python SDK with QStash. This snippet includes the import statement for `AsyncQStash` and the client initialization using an authentication token. Further details on triggering specific workflows would follow this setup. ```python from qstash import AsyncQStash client = AsyncQStash("") ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Installs the Upstash Workflow SDK for TanStack Start applications. Supports pnpm, npm, and bun package managers. ```bash pnpm install @upstash/workflow ``` ```bash npm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Clone Next.js & FastAPI Example Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Clones the Upstash Workflow Next.js and FastAPI example from GitHub. This is the initial step to get the project structure. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-fastapi ``` -------------------------------- ### Start ngrok Tunnel Source: https://upstash.com/docs/workflow/howto/local-development This command starts an ngrok tunnel to make your local server publicly accessible. Replace `` with your server's port number. This is essential for integrating with QStash during local development. ```bash ngrok http ``` ```bash ngrok http 3000 ``` -------------------------------- ### Install FastAPI and Upstash Workflow SDK (Bash) Source: https://upstash.com/docs/workflow/quickstarts/fastapi Installs the necessary Python packages for FastAPI, Uvicorn, and the Upstash Workflow SDK using pip. Ensure you are in a virtual environment before running this command. ```bash pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Start Next.js Development Server Source: https://upstash.com/docs/workflow/llms-txt Starts the Next.js development server. Requires Node.js and npm to be installed and project dependencies to be set up. This command is essential for running Next.js applications locally. ```Shell npm run dev ``` ```Bash npm run dev ``` -------------------------------- ### Define Next.js Endpoint for Upstash Workflow Source: https://upstash.com/docs/workflow/agents/getting-started TypeScript code defining a Next.js API route for Upstash Workflow. It configures an agent with OpenAI, a communication tool, and sets up a task to process a prompt. ```typescript import { z } from "zod"; import { tool } from "ai"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ prompt: string }>(async (context) => { const prompt = context.requestPayload.prompt const model = context.agents.openai('gpt-3.5-turbo') const communicatorAgent = context.agents.agent({ model, name: 'communicatorAgent', maxSteps: 2, tools: { communicationTool: tool({ description: 'A tool for informing the caller about your inner thoughts', parameters: z.object({ message: z.string() }), execute: async ({ message }) => { console.log("Inner thought:", message) return "success" } }) }, background: 'Answer questions directed towards you.' + ' You have access to a tool to share your inner thoughts' + ' with the caller. Utilize this tool at least once before' + ' answering the prompt. In your inner thougts, briefly' + ' explain what you will talk about and why. Keep your' + ' answers brief.', }) const task = context.agents.task({ agent: communicatorAgent, prompt }) const { text } = await task.run() console.log("Final response:", text); }) ``` -------------------------------- ### Development Setup and Testing for workflow-py (Shell) Source: https://upstash.com/docs/workflow/sdk/workflow-py This section outlines the development setup for the `workflow-py` project, including cloning the repository, installing dependencies with Poetry, setting environment variables, and running tests, formatting, and type checking. ```shell poetry install cp .env.example .env poetry run pytest poetry run ruff format . poetry run ruff check . poetry run mypy --show-error-codes . ``` -------------------------------- ### Install Dependencies and Build Project with Bun Source: https://upstash.com/docs/workflow/sdk/workflow-js These bash commands are used for setting up the project environment. `bun install` installs all necessary dependencies, and `bun run build` compiles the project, typically for production. ```bash bun install bun run build ``` -------------------------------- ### Source Environment Variables and Start Development Server Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask This snippet shows how to load environment variables from a .env file using the 'source' command and then start the development server for your application using 'npm run dev'. These commands are essential for local development with Upstash Workflow, ensuring your application has the necessary configurations to run. ```bash source .env npm run dev ``` -------------------------------- ### Run FastAPI Application with Uvicorn Source: https://upstash.com/docs/workflow/quickstarts/fastapi This command starts the FastAPI application for running workflows. It uses `uvicorn` to serve the `main` application, enabling live reloading for development. Ensure your environment variables are sourced before running. ```bash uvicorn main:app --reload ``` -------------------------------- ### Install Node.js Dependencies Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Installs the necessary Node.js dependencies for the Next.js frontend using npm. This command should be run in the project's root directory. ```bash npm install ``` -------------------------------- ### Create TanStack Start Project Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Command to initiate a new TanStack Start project using pnpm. This sets up the basic project structure for a new application. ```bash pnpm create @tanstack/start@latest ``` -------------------------------- ### Clone Upstash Workflow Example Repository Source: https://upstash.com/docs/workflow/llms-txt Clones the Upstash Workflow Python example repository from GitHub and navigates into the Next.js & FastAPI example directory. This is the first step to set up the project locally. ```bash git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-fastapi ``` ```shell git clone https://github.com/upstash/workflow-py.git cd workflow-py/examples/nextjs-fastapi ``` -------------------------------- ### Configure and Start Local QStash Server (Bash & .env) Source: https://upstash.com/docs/workflow/llms-txt These snippets show how to start a local QStash server for development and configure the necessary environment variables. The `npx @upstash/qstash-cli dev` command starts the server, and the provided `.env` format shows the `QSTASH_URL` and `QSTASH_TOKEN` that should be set. This is crucial for local testing of Upstash Workflows. ```Bash npx @upstash/qstash-cli dev ``` ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` -------------------------------- ### Python Virtual Environment Setup Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Creates and activates a Python virtual environment to manage project dependencies. This ensures a clean isolated environment for the Flask backend. ```bash python -m venv venv source venv/bin/activate ``` -------------------------------- ### Start Trial in Stripe (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Initiates a trial period for a user in Stripe. This function is typically called during user sign-up or onboarding. It relies on an external service or function `startTrialInStripe` and requires the user's email address. ```typescript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` ```python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` -------------------------------- ### Customer Onboarding Workflow (FastAPI Python) Source: https://upstash.com/docs/workflow/getstarted Implements a customer onboarding workflow using Upstash Workflow within a FastAPI application. This Python example mirrors the TypeScript functionality, handling welcome emails, timed delays, AI-driven personalization, and follow-up emails. It utilizes the `upstash_workflow.fastapi` library. ```python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from email_utils import send_email app = FastAPI() serve = Serve(app) # Type-safety for starting our workflow class InitialData(TypedDict): user_id: str email: str name: str @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", }, ], }, ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ``` -------------------------------- ### Customer Onboarding Workflow (Next.js TypeScript) Source: https://upstash.com/docs/workflow/getstarted Implements a customer onboarding workflow using Upstash Workflow within a Next.js application. It handles sending welcome emails, scheduling follow-ups with a delay, and generating personalized messages using OpenAI. This example leverages the `@upstash/workflow/nextjs` library for seamless integration. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { sendEmail } from "./emailUtils"; // Type-safety for starting our workflow interface InitialData { userId: string email: string name: string } export const { POST } = serve(async (context) => { const { userId, email, name } = context.requestPayload; // Step 1: Send welcome email await context.run("send-welcome-email", async () => { await sendEmail(email, "Welcome to our service!"); }); // Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3); // Step 3: AI-generate personalized follow-up message const { body: aiResponse } = await context.api.openai.call( "generate-personalized-message", { token: "", operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [ { role: "system", content: "You are an assistant creating personalized follow-up messages." }, { role: "user", content: `Create a short, friendly follow-up message for ${name} who joined our service 3 days ago.` } ] }, } ); const personalizedMessage = aiResponse.choices[0].message.content; // Step 4: Send personalized follow-up email await context.run("send-follow-up-email", async () => { await sendEmail(email, personalizedMessage); }); }); ``` -------------------------------- ### Setup Flask Environment with Upstash Workflow (Python) Source: https://upstash.com/docs/workflow/llms-txt Sets up a Python virtual environment, activates it, and installs required packages (FastAPI, Uvicorn, Upstash Workflow) for a Flask project using Upstash Workflow. ```bash python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Local QStash Server Configuration Source: https://upstash.com/docs/workflow/quickstarts/flask This code sets up a local QStash server for development purposes. It involves starting the CLI tool and then configuring environment variables with the provided QStash URL and token. This method allows for local testing without impacting production billing or logs. ```bash npx @upstash/qstash-cli dev # Add the following to your .env file: export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= ``` -------------------------------- ### Install Upstash Workflow SDK for Express.js Source: https://upstash.com/docs/workflow/quickstarts/express Installs the Upstash Workflow SDK for Node.js projects using npm, pnpm, or bun. This is the initial step to integrate workflow capabilities into your Express.js application. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Configure Local QStash Server in .env.local Source: https://upstash.com/docs/workflow/quickstarts/nuxt Configures the `.env.local` file with connection details for a local QStash server. It requires the QSTASH_URL and QSTASH_TOKEN obtained from running the local QStash CLI. This setup is for local testing only. ```txt QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` -------------------------------- ### Start TanStack Start Development Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start These are shell commands used to start the TanStack Start development server. They provide options for package managers pnpm and npm. Running one of these commands will launch the development server, typically accessible at http://localhost:3000, allowing you to test your application and workflow endpoints locally. ```bash pnpm dev ``` ```bash npm run dev ``` -------------------------------- ### Run Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Starts a local QStash server for development purposes using either pnpm or npm. This server mimics production QStash behavior without external network calls. ```bash pnpx @upstash/qstash-cli dev ``` ```bash npx @upstash/qstash-cli dev ``` -------------------------------- ### Define TanStack Start API Route for Upstash Workflow Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start This TypeScript code defines an API route for Upstash Workflow within a TanStack Start application. It uses the `serve` function to expose a workflow endpoint that processes string inputs through two sequential steps. The function `someWork` is a helper for processing the input, and the `context.run` method executes distinct workflow steps, logging their inputs and outputs. Ensure `@upstash/workflow/tanstack` is installed. ```typescript import { createFileRoute } from '@tanstack/react-router' import { serve } from '@upstash/workflow/tanstack' const someWork = (input: string) => { return `processed '${JSON.stringify(input)}'` } export const Route = createFileRoute('/api/workflow')({ server: { handlers: serve(async (context) => { const input = context.requestPayload const result1 = await context.run('step1', () => { const output = someWork(input) console.log('step 1 input', input, 'output', output) return output }) await context.run('step2', () => { const output = someWork(result1) console.log('step 2 input', result1, 'output', output) }) }), }, }) ``` -------------------------------- ### Run Flask Application and Trigger Workflow Source: https://upstash.com/docs/workflow/quickstarts/flask Provides bash commands to set up and run a Flask application that utilizes Upstash Workflow. It includes sourcing environment variables, starting the Flask development server, and triggering the workflow endpoint using cURL. ```bash source .env ``` ```bash flask --app main run -p 8000 ``` ```bash curl -X POST https://localhost:8000/api/workflow ``` -------------------------------- ### Customer Onboarding Workflow with Upstash Source: https://upstash.com/docs/workflow/examples/customerOnboarding Implements a customer onboarding workflow that registers users, sends welcome emails, waits for a period, and then periodically checks user activity to send appropriate follow-up emails. This workflow handles user state and email notifications. Dependencies include Upstash Workflow SDK and a web framework (Next.js or FastAPI). ```typescript import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ``` ```python from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### Example Workflow Run Result Source: https://upstash.com/docs/workflow/llms-txt This snippet shows an example of the JSON result returned when starting a workflow, containing a workflowRunId. This ID is used to track the workflow's progress and view its details in the QStash workflow dashboard. ```JSON { "workflowRunId": "wfr_xxxxxx" } ``` -------------------------------- ### Download Dataset with Context Call (Python) Source: https://upstash.com/docs/workflow/examples/allInOne Fetches a dataset URL and downloads the dataset via an HTTP GET request using `context.call`. This approach is designed to handle potentially long-running downloads. It requires a dataset ID and returns the dataset content. ```python async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body ``` -------------------------------- ### Configure QStash Environment Variables Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start Sets up essential environment variables in a .env file to connect a TanStack Start application to a local QStash server. These variables include the QStash URL, token, and signing keys. ```text QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN="eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" QSTASH_CURRENT_SIGNING_KEY="sig_7kYjw48mhY7kAjqNGcy6cr29RJ6r" QSTASH_NEXT_SIGNING_KEY="sig_5ZB6DVzB1wjE8S6rZ7eenA8Pdnhs" ``` -------------------------------- ### Example Quantum Mechanics Explanation Source: https://upstash.com/docs/workflow/llms-txt Demonstrates an example of AI-generated text explaining quantum mechanics. This output showcases the successful operation of a generator-evaluator workflow within Upstash. ```plain text Quantum mechanics is a branch of physics that describes the behavior of particles at the smallest scales, such as atoms and subatomic particles. It introduces the concept of quantized energy levels, wave-particle duality, and probabilistic nature of particles. In quantum mechanics, particles can exist in multiple states simultaneously until measured, and their behavior is governed by mathematical equations known as wave functions. This theory has revolutionized our understanding of the fundamental building blocks of the universe and has led to the development of technologies like quantum computing and quantum cryptography. ``` -------------------------------- ### Triggering Upstash Workflow (TypeScript SDK) Source: https://upstash.com/docs/workflow/getstarted Demonstrates how to trigger an Upstash Workflow using the official TypeScript SDK. This code snippet shows the necessary imports, client initialization with authentication, and the `client.trigger` method for initiating a workflow run with optional parameters like body, headers, and retries. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` -------------------------------- ### New User Signup Notification with Upstash Source: https://upstash.com/docs/workflow/examples/customerOnboarding Handles the initial step of a new user signup by sending a welcome email. This function is part of a larger onboarding workflow and uses the Upstash Workflow context to run the email sending task. Dependencies include the Upstash Workflow SDK. ```typescript await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` ```python async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` -------------------------------- ### Install Project Dependencies (Poetry) Source: https://upstash.com/docs/workflow/llms-txt Installs project dependencies defined in pyproject.toml using Poetry. This command is essential for setting up a Python development environment. ```bash poetry install ``` -------------------------------- ### Start Local QStash Server (npm) Source: https://upstash.com/docs/workflow/llms-txt Starts the local QStash server using the @upstash/qstash-cli tool, which is required for local development and testing of Upstash Workflow endpoints. It provides local QSTASH_URL and QSTASH_TOKEN values. ```bash npx @upstash/qstash-cli dev ``` -------------------------------- ### Install Upstash Workflow SDK and FastAPI (pip) Source: https://upstash.com/docs/workflow/llms-txt Installs essential Python packages for building web APIs with FastAPI and integrating Upstash Workflow. This command uses pip to install `fastapi`, `uvicorn` (for running the ASGI application), and the `upstash-workflow` SDK. ```bash pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Auth Provider Webhook Handler - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Handles incoming webhooks from an authentication provider. It synchronizes the user, creates them in Stripe, starts their trial, sends a welcome email, and then schedules subsequent emails based on trial progress and user statistics. It utilizes `context.run` for managing asynchronous operations and `context.sleep` for timed events. ```python @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) # get user stats and send email with them async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) # wait until there are two days to the end of trial period and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` -------------------------------- ### Install Upstash Workflow SDK with FastAPI (Python) Source: https://upstash.com/docs/workflow/llms-txt Installs Python packages for Upstash Workflow, FastAPI, and Uvicorn. It also demonstrates setting up a virtual environment and configuring the QStash token, essential for secure communication. ```shell python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` ```shell export QSTASH_TOKEN= ``` -------------------------------- ### Upstash Workflow Setup with Retry Loop (Python) Source: https://upstash.com/docs/workflow/llms-txt Initializes an Upstash Workflow endpoint and sets up a loop for retrying external API calls up to 10 times. This example uses Python and the `serve.post` decorator, with a placeholder for the actual API call. ```python @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: for attempt in range(10): # TODO: call API in here ``` -------------------------------- ### Fetch Workflow Logs with Python requests Source: https://upstash.com/docs/workflow/rest/runs/logs This Python example demonstrates fetching workflow logs using the `requests` library. It constructs the necessary headers, including the Bearer token, and sends a GET request to the Upstash Workflows logs endpoint. ```python import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/logs', headers=headers ) ``` -------------------------------- ### Install Upstash Workflow SDK for Python (FastAPI) Source: https://upstash.com/docs/workflow/llms-txt Sets up a Python virtual environment and installs the necessary packages for Upstash Workflow with FastAPI. This includes fastapi, uvicorn for the ASGI server, and the upstash-workflow SDK. ```bash python -m venv venv source venv/bin/activate pip install fastapi uvicorn upstash-workflow ``` -------------------------------- ### Create Minimal Workflow Endpoint with Hono Source: https://upstash.com/docs/workflow/quickstarts/hono Defines a basic workflow endpoint using Hono and the Upstash Workflow Hono adapter. It sets up two sequential steps for the workflow. No external dependencies beyond Hono and Upstash Workflow are required for this minimal setup. ```typescript import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` -------------------------------- ### Upstash Workflow Setup with Retry Loop (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Initializes an Upstash Workflow endpoint and sets up a loop for retrying external API calls up to 10 times. This example uses TypeScript and the `serve` function, with a placeholder for the actual API call. ```typescript export const { POST } = serve<{ userData: string }>(async (context) => { for (let attempt = 0; attempt < 10; attempt++) { // TODO: call API in here } }) ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/llms-txt Installs the Upstash Workflow SDK using npm, pnpm, or bun. This is a prerequisite for using Upstash Workflow in your projects and makes the necessary libraries available. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Implement Workflow Step (Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates implementing a workflow step in Python using `context.run`. This example includes a dummy step for authentication and execution validation. ```APIDOC ## POST /api/example ### Description Implements a workflow step in Python using `context.run`. ### Method POST ### Endpoint `/api/example` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **request_payload** (str) - The input payload for the workflow. ### Request Example ```python from upstash_workflow import AsyncWorkflowContext, serve @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # πŸ‘‡ At least one step is required async def _dummy_step(): return await context.run("dummy-step", _dummy_step) ``` ### Response #### Success Response (200) - **status** (string) - Indicates the successful execution of the workflow step. #### Response Example ```json { "status": "step executed" } ``` ``` -------------------------------- ### Send Welcome Email (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a welcome email to a new user, informing them about their successful registration and the duration of their free trial. This function requires the user's email and utilizes a `sendEmail` function. ```typescript await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` ```python async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### Install @upstash/workflow Package Source: https://upstash.com/docs/workflow/migration Installs the new @upstash/workflow package using different package managers (npm, pnpm, bun). This is the first step in migrating your project. ```bash npm install @upstash/workflow ``` ```bash pnpm install @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Fetch Workflow Logs: Go and Python Source: https://upstash.com/docs/workflow/llms-txt Demonstrates fetching workflow logs using Go and Python. Both examples utilize the `requests` library or standard HTTP packages to make a GET request to the workflow logs endpoint, including an Authorization header with a bearer token. ```go req, err := http.NewRequest("GET", "https://qstash.upstash.io/v2/workflows/messageLogs", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } def resp.Body.Close() ``` ```python import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/logs', headers=headers ) ``` -------------------------------- ### Post Request to Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/hono Makes a POST request to a locally running Hono workflow endpoint. This is used to initiate a workflow execution and test the endpoint. The example uses `curl` and assumes the endpoint is accessible at `https://localhost:8787/workflow`. ```bash curl -X POST https://localhost:8787/workflow ``` -------------------------------- ### Install Upstash Workflow Packages (npm) Source: https://upstash.com/docs/workflow/llms-txt Installs the necessary Upstash Workflow, AI, and Zod packages for a Next.js project using npm. Ensure you have Node.js and npm installed. ```bash npx create-next-app@latest [project-name] [options] npm i @upstash/workflow ai zod ``` -------------------------------- ### List Message Logs with curl Source: https://upstash.com/docs/workflow/llms-txt An example using curl to request the message logs for workflow runs from the Upstash QStash API. This command sends an HTTP GET request to the specified endpoint with an authorization token. ```curl curl https://qstash.upstash.io/v2/workflows/messageLogs \ -H "Authorization: Bearer " ``` -------------------------------- ### Full Upstash Workflow Example: Data Processing Source: https://upstash.com/docs/workflow/llms-txt A comprehensive TypeScript example demonstrating a data processing and reporting workflow using Upstash. It includes downloading data, processing it in chunks with OpenAI, generating a report, and sending it. Utilizes `serve`, `context.run`, and `context.call` for workflow management. ```typescript import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }> ( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call (`process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", ``` -------------------------------- ### Implement Workflow Step (Python) Source: https://upstash.com/docs/workflow/llms-txt Python example demonstrating how to implement a workflow step using `context.run`. It includes a dummy step to ensure workflow authentication and successful execution within an asynchronous function decorated with `serve.post`. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # πŸ‘‡ At least one step is required async def _dummy_step(): return await context.run("dummy-step", _dummy_step) ``` -------------------------------- ### Workflow Setup with API Call Loop (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Illustrates the initial setup for a workflow that includes a loop for making API calls. It defines the `POST` endpoint using Upstash Workflow's `serve` function and sets up a loop to attempt an API call up to 10 times. The `TODO` comment indicates where the actual API call logic should be implemented. ```typescript export const { POST } = serve<{ userData: string }>(async (context) => { for (let attempt = 0; attempt < 10; attempt++) { // TODO: call API in here } }) ``` -------------------------------- ### Install Upstash Workflow SDK (bun) Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Installs the Upstash Workflow SDK using bun. This is another package manager option for setting up your worker project. ```bash bun add @upstash/workflow ``` -------------------------------- ### Run Workflow Endpoint (Bash) Source: https://upstash.com/docs/workflow/quickstarts/express Demonstrates how to start the Express.js application and trigger the defined workflow endpoint using `curl`. This involves running the development server and then sending a POST request with a JSON payload to 'http://localhost:3000/workflow'. The output includes a unique workflow run ID. ```bash npm run dev curl -X POST http://localhost:3000/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### Missing Workflow Step Error Example (Python) Source: https://upstash.com/docs/workflow/llms-txt Illustrates an incorrect Python workflow setup where `context.run` is omitted, leading to a 'Failed to authenticate Workflow request.' error. Upstash workflows require at least one step execution for successful authentication. ```python from upstash_workflow import AsyncWorkflowContext, serve @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # πŸ‘‡ Problem: No context.run call print("Processing input:", input) # This workflow will fail with "Failed to authenticate Workflow request." ``` -------------------------------- ### Start Local QStash Server (pnpm) Source: https://upstash.com/docs/workflow/llms-txt Starts the local QStash server using pnpm dlx for users preferring pnpm. This provides the necessary local environment for Upstash Workflow development and testing, yielding local QSTASH_URL and QSTASH_TOKEN values. ```bash pnpm dlx @upstash/qstash-cli dev ``` -------------------------------- ### Bash: Configure QStash Environment Variables Source: https://upstash.com/docs/workflow/llms-txt Sets up the .env file for local QStash development, including the QStash URL and token. It also shows how to start a local QStash server using the QStash CLI. ```bash touch .env npx @upstash/qstash-cli dev ``` -------------------------------- ### Get User Stats - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously retrieves user statistics, such as the total number of problems solved and the most interested topic. It takes a user ID as input and returns a UserStats object. This is used for personalized user communication. ```python async def get_user_stats(userid: str) -> UserStats: # Implement logic to get user stats print("Getting user stats for", userid) return {"total_problems_solved": 10000, "most_interested_topic": "Python"} ``` -------------------------------- ### GET /v1/workflows/{workflow_id}/executions/{execution_id} Source: https://upstash.com/docs/workflow/llms-txt Retrieves the status and results of a specific workflow execution. This endpoint allows you to monitor the progress and outcomes of a previously started workflow. ```APIDOC ## GET /v1/workflows/{workflow_id}/executions/{execution_id} ### Description Retrieves the status and results of a specific workflow execution. This endpoint allows you to monitor the progress and outcomes of a previously started workflow. ### Method GET ### Endpoint /v1/workflows/{workflow_id}/executions/{execution_id} ### Parameters #### Path Parameters - **workflow_id** (string) - Required - The ID of the workflow. - **execution_id** (string) - Required - The ID of the workflow execution. #### Headers - **Authorization** (string) - Required - Bearer token for authentication (e.g., `Bearer `) ### Response #### Success Response (200 OK) - **execution_id** (string) - The ID of the workflow execution. - **workflow_id** (string) - The ID of the workflow. - **status** (string) - The current status of the execution (`completed`, `running`, or `failed`). - **result** (any) - Optional - The final result of the workflow if completed. - **error** (string) - Optional - Error message if the execution failed. #### Response Example ```json { "execution_id": "", "workflow_id": "", "status": "completed", "result": { "message": "Workflow finished successfully." } } ``` #### Error Responses - **401 Unauthorized**: Invalid API key. - **404 Not Found**: Workflow or execution ID not found. ``` -------------------------------- ### Install Upstash Workflow SDK with pnpm (Bash) Source: https://upstash.com/docs/workflow/llms-txt This command installs the Upstash Workflow SDK into your project using the `pnpm` package manager. `pnpm` is an alternative package manager known for its efficient disk space usage and fast installations. ```bash pnpm install @upstash/workflow ``` -------------------------------- ### Fetch Message Logs using Node.js Source: https://upstash.com/docs/workflow/rest/runs/message-logs This Node.js example shows how to retrieve message logs from the Upstash Workflow API. It utilizes the fetch API to make a GET request to the /v2/workflows/messageLogs endpoint, including an Authorization header. ```javascript const response = await fetch( "https://qstash.upstash.io/v2/workflows/messageLogs", { headers: { Authorization: "Bearer ", }, } ); ``` -------------------------------- ### Add Agent Examples (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Includes new examples demonstrating the use of agents within the workflow system, showcasing advanced use cases and integration patterns. This commit adds specific functionality related to agent capabilities. ```JavaScript DX-1580: Add Agent Examples (#59) ``` -------------------------------- ### Install Upstash Workflow and Flask with pip Source: https://upstash.com/docs/workflow/llms-txt Installs the Upstash Workflow SDK and the Flask framework using pip. This command also typically sets up a virtual environment for the project's dependencies. ```bash pip install @upstash/workflow flask ``` -------------------------------- ### Install Upstash Workflow SDK and Dependencies (Python/Bun) Source: https://upstash.com/docs/workflow/llms-txt Installs necessary packages for building web APIs and integrating Upstash Workflow. This includes FastAPI and Uvicorn for Python, and the Upstash Workflow SDK for Node.js/Bun projects. ```bash pip install fastapi uvicorn upstash-workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Implement Sleep and Sequential Steps in FastAPI Workflow Source: https://upstash.com/docs/workflow/quickstarts/fastapi This example shows how to incorporate sleep functionalities and sequential step execution within a FastAPI workflow. It defines three steps that process input, use `sleep_until` and `sleep` for timed delays, and return processed strings. It requires `fastapi`, `time`, and `upstash_workflow` libraries. ```python from fastapi import FastAPI import time from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/sleep") async def sleep(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) await context.sleep_until("sleep1", time.time() + 3) async def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = await context.run("step2", _step2) await context.sleep("sleep2", 2) async def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) await context.run("step3", _step3) ``` -------------------------------- ### Install Node.js Packages (npm) Source: https://upstash.com/docs/workflow/llms-txt Installs all required Node.js packages and their dependencies from the 'package.json' file using npm. This command is crucial for ensuring that all necessary libraries for applications, such as Next.js, are available. ```bash npm install ``` -------------------------------- ### Setup FastAPI Workflow with Upstash (Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates the initial setup for integrating Upstash Workflow with a FastAPI application in Python. It imports necessary modules and initializes the FastAPI app and Upstash Workflow Serve instance. ```Python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from email_utils import send_email app = FastAPI() serve = Serve(app) ``` -------------------------------- ### Configure Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/express Sets up environment variables for local development using a local QStash server. This involves creating a .env file and adding QSTASH_URL and QSTASH_TOKEN, which are obtained by running the QStash CLI. ```bash touch .env ``` ```bash npx @upstash/qstash-cli dev ``` ```text QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` -------------------------------- ### Trigger SolidJS Workflow Endpoint with cURL Source: https://upstash.com/docs/workflow/quickstarts/solidjs Demonstrates how to trigger the previously defined SolidJS workflow endpoint using a cURL command. This example sends a POST request to `https://localhost:3000/api/workflow` and shows the expected JSON response containing a unique `workflowRunId`. ```bash curl -X POST https://localhost:3000/api/workflow # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### Fetch Upstash Workflow Logs with cURL (Shell) Source: https://upstash.com/docs/workflow/llms-txt Retrieves workflow execution logs using a cURL command. This example shows the necessary GET request to the QStash API logs endpoint, including the required Authorization header with a bearer token. ```sh curl https://qstash.upstash.io/v2/workflows/logs \ -H "Authorization: Bearer " ``` -------------------------------- ### Run Nuxt.js App and Trigger Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/nuxt This section provides bash commands to run a Nuxt.js development server and then trigger a POST request to the defined workflow endpoint. The `npm run dev` command starts the local development environment. A `curl` command is used to send a POST request to `https://localhost:3000/api/workflow`, which returns a unique `workflowRunId` upon successful execution. ```bash npm run dev ``` ```bash curl -X POST https://localhost:3000/api/workflow # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### Run Upstash Workflow Server with ngrok and uvicorn Source: https://upstash.com/docs/workflow/llms-txt Instructions to run the Upstash Workflow server. This involves creating a public URL using ngrok, setting the UPSTASH_WORKFLOW_URL environment variable, and starting the server with uvicorn. ```bash ngrok http localhost:8000 ``` ```bash export UPSTASH_WORKFLOW_URL= ``` ```bash source .env ``` ```bash uvicorn main:app --reload ``` -------------------------------- ### Define Researcher Agent with OpenAI and Wikipedia Tool (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Defines a `researcherAgent` using an OpenAI model and a Wikipedia tool, specifying its name, maximum steps, and a background prompt to guide its behavior. This setup allows the agent to perform research tasks using Wikipedia. ```typescript import { serve } from "@upstash/workflow/nextjs" import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run" export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' 'Utilize Wikipedia as much as possible for correct information', }) }) ``` ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { ``` -------------------------------- ### Install Upstash Workflow and AI SDK Packages Source: https://upstash.com/docs/workflow/llms-txt Installs the necessary npm packages for Upstash Workflow and AI SDK integration, including `@ai-sdk/openai` and `ai`. This is a prerequisite for implementing AI features with Upstash Workflow. ```bash npm install @ai-sdk/openai ai zod ``` -------------------------------- ### Implement Authentication in FastAPI Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/fastapi This code example demonstrates how to add authentication to a FastAPI workflow endpoint. It checks for a specific 'authentication' header before proceeding with the workflow steps. The workflow consists of two simple steps. This requires `fastapi` and `upstash_workflow`. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ``` -------------------------------- ### Define Multi-Step Onboarding Workflow (Python) Source: https://upstash.com/docs/workflow/llms-txt Defines an asynchronous onboarding workflow using Upstash Workflow's AsyncWorkflowContext. It orchestrates steps like sending a welcome email, pausing, generating a personalized follow-up message via an AI API call, and sending the follow-up email. This example demonstrates context.run, context.sleep, and context.call. It requires the 'upstash_workflow' library. ```Python from typing import TypedDict, Dict from upstash_workflow import AsyncWorkflowContext, serve, CallResponse class InitialData(TypedDict): user_id: str email: str name: str @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", } ] } ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ``` -------------------------------- ### Install Upstash Workflow SDK (npm) Source: https://upstash.com/docs/workflow/llms-txt Installs the `@upstash/workflow` SDK into a Next.js project using `npm`. This command enables the development and deployment of Upstash Workflows within a Next.js application. ```bash npm install @upstash/workflow ``` -------------------------------- ### Python: FastAPI with Upstash Workflow for Event Handling Source: https://upstash.com/docs/workflow/examples/authWebhook This Python example shows how to integrate Upstash Workflow with FastAPI to handle asynchronous tasks triggered by events. It defines types for payloads and includes placeholder functions for database operations, Stripe integration, and email sending, similar to the TypeScript example. It requires 'fastapi', 'typing', and 'upstash_workflow.fastapi'. ```python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: # Implement logic to create a new user in Stripe print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: # Implement logic to start a trial in Stripe print("Starting a trial of 14 days in Stripe for", email) ``` -------------------------------- ### Browser Automation Example (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Provides a conceptual example of browser automation using Upstash Agents in JavaScript. It outlines functions for navigating websites, extracting content, filling forms, and submitting them, demonstrating autonomous web interaction capabilities. ```JavaScript /* * Example: Browser Automation * This example demonstrates how to automate web navigation and interaction. */ // Assume 'agent' is an instance of the Upstash Agent // const agent = new UpstashAgent(); async function automateBrowser() { console.log('Starting browser automation...'); // Example: Navigate to a website // await agent.navigate('https://example.com'); // Example: Extract content from a page // const pageContent = await agent.extractContent('h1'); // console.log('Page Title:', pageContent); // Example: Fill out a form // await agent.fillForm('input[name="username"]', 'testuser'); // await agent.fillForm('input[name="password"]', 'password123'); // await agent.submitForm('form'); console.log('Browser automation finished.'); } // automateBrowser(); ``` -------------------------------- ### Notify Workflow Request Examples (cURL, JavaScript, Python, Go) Source: https://upstash.com/docs/workflow/rest/runs/notify Demonstrates how to notify workflows across different programming languages and tools. Includes examples for cURL, Node.js, Python, and Go, showing how to send a POST request with necessary headers and body. ```sh curl -X POST https://qstash.upstash.io/v2/notify/myEvent \ -H "Authorization: Bearer " \ -d "Hello World!" ``` ```js const response = await fetch('https://qstash.upstash.io/v2/notify/myEvent', { method: 'POST', body: "Hello world!", headers: { 'Authorization': 'Bearer ' } }); ``` ```python import requests headers = { 'Authorization': 'Bearer ', } response = requests.post( 'https://qstash.upstash.io/v2/notify/myEvent', headers=headers ) ``` ```go req, err := http.NewRequest("POST", "https://qstash.upstash.io/v2/notify/myEvent", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` -------------------------------- ### Create Flask Workflow Endpoint with External Call Source: https://upstash.com/docs/workflow/quickstarts/flask Illustrates a Flask workflow endpoint '/call' that makes an external HTTP POST request using `context.call`. This example demonstrates calling a separate '/get-data' endpoint, processing its response, and continuing the workflow. It requires Flask, typing, and Upstash Workflow libraries. ```python from flask import Flask from typing import Dict from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext, CallResponse app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.route("/get-data", methods=["POST"]) def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.route("/call") def call(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) response: CallResponse[Dict[str, str]] = context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output context.run("step2", _step2) ``` -------------------------------- ### Register New User and Send Welcome Email Source: https://upstash.com/docs/workflow/llms-txt Demonstrates registering a new user and sending a welcome email as the initial step in a customer onboarding workflow. It uses `context.run` to execute an asynchronous function for sending the email, ensuring tracking and retries by the workflow engine. ```typescript await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` -------------------------------- ### TypeScript: Handle User Creation Event with Upstash Workflow Source: https://upstash.com/docs/workflow/examples/authWebhook This TypeScript example demonstrates setting up a Next.js API route to handle a 'UserCreated' event using Upstash Workflow. It outlines a workflow that creates a user in a database, sets them up in Stripe, sends welcome and trial-ending emails, and updates user stats. It includes placeholder functions for external service interactions. Dependencies include '@upstash/workflow/nextjs' and '@upstash/qstash/workflow'. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10_000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` -------------------------------- ### Install Upstash Workflow SDK with pnpm Source: https://upstash.com/docs/workflow/llms-txt Installs the new `@upstash/workflow` package using the pnpm package manager. This method is recommended for managing dependencies and migrating from older SDK versions. ```bash pnpm install @upstash/workflow ``` -------------------------------- ### Get Image URL with context.run (Python) Source: https://upstash.com/docs/workflow/llms-txt Shows how to retrieve an image URL in Python using Upstash Workflow's `context.run`. It defines an asynchronous helper function to fetch the URL and then executes it within the workflow context. This example assumes the existence of a `get_image_url` function and an `image_id` variable. ```python async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) ``` -------------------------------- ### Register User and Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates user registration and sending a welcome email using Python. It utilizes `context.run` for asynchronous execution and workflow tracking, and assumes `send_email` and `context` are available within the environment. ```python async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` -------------------------------- ### Dynamically Set Workflow Endpoint URL Source: https://upstash.com/docs/workflow/quickstarts/tanstack-start This JavaScript example shows how to dynamically set the URL for triggering an Upstash Workflow run, adapting to the environment. It defines a `BASE_URL` constant that checks the `NODE_ENV` to use either a production URL or a local development URL. This approach prevents hardcoding URLs and ensures the correct endpoint is targeted in different deployment stages. The `client.trigger` method then uses this constructed URL. ```javascript const BASE_URL = process.env.NODE_ENV === 'production' ? 'https://yourapp.com' : 'http://localhost:3000' const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, body: "Hello World!", retries: 3, }); ``` -------------------------------- ### Start Flask Application with Upstash Workflow Source: https://upstash.com/docs/workflow/llms-txt Starts a Flask application using the Flask development server on port 8000. The `--app main` flag points to the main application file. ```bash flask --app main run -p 8000 ``` -------------------------------- ### Python Customer Onboarding Workflow with Upstash Source: https://upstash.com/docs/workflow/llms-txt A comprehensive Python workflow example using Upstash. It sends a welcome email, pauses for three days, generates a personalized message using OpenAI, and sends a follow-up email. Utilizes context.run for retries and context.call for external API integration. ```Python class InitialData(TypedDict): user_id: str email: str name: str @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", } ] } ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ``` -------------------------------- ### Basic Webhook Endpoint Setup (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Sets up a basic webhook endpoint using the `serve` function from `@upstash/workflow/nextjs`. This function handles incoming requests and allows for custom logic within the callback. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` -------------------------------- ### Install Upstash Workflow SDK (bun) Source: https://upstash.com/docs/workflow/llms-txt Installs the Upstash Workflow SDK using the `bun` package manager. This command is suitable for projects using bun, such as Express.js or Nuxt.js applications, to integrate Upstash Workflow capabilities. ```bash bun add @upstash/workflow ``` -------------------------------- ### Define Workflow Endpoint with Request Object (Next.js App Router) Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs This example illustrates how to create a Next.js App Router workflow endpoint while retaining access to the native `NextRequest` object. It allows for pre-processing or inspecting the incoming request before invoking the workflow logic defined by Upstash Workflow. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { NextRequest } from "next/server"; export const POST = async (request: NextRequest) => { // do something with the native request object const { POST: handler } = serve(async (context) => { // Your workflow steps }); return await handler(request); } ``` -------------------------------- ### Deploy Hono App to Production with Cloudflare CLI Source: https://upstash.com/docs/workflow/quickstarts/hono This command initiates the deployment process for a Hono application to a production environment using the Cloudflare CLI. Ensure your environment variables and configurations are correctly set before running this command. ```bash wrangler deploy ``` -------------------------------- ### Run Next.js Development Server Source: https://upstash.com/docs/workflow/llms-txt Starts the Next.js development server using npm. This command is used to run the application locally after setting up the workflow endpoint. ```bash npm run dev ``` -------------------------------- ### Serve Method in Python for Upstash Workflow Source: https://upstash.com/docs/workflow/llms-txt Illustrates the fundamental `serve` method in Python for integrating Upstash Workflow with custom platforms. This method is adaptable for platforms not covered in official quickstarts. ```python from upstash_workflow import serve ``` -------------------------------- ### Register User and Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates registering a new user and sending a welcome email as the initial step in a customer onboarding workflow. It uses context.run to execute the email sending operation, ensuring it's tracked and retried by the workflow engine. ```Python async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` -------------------------------- ### Install Upstash Workflow SDK (pnpm) Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Installs the Upstash Workflow SDK using pnpm. This command is an alternative to npm for package management in your worker project. ```bash pnpm install @upstash/workflow ``` -------------------------------- ### Bulk Restart Workflow Runs Response Example (JSON) Source: https://upstash.com/docs/workflow/rest/dlq/bulk-restart Example JSON response for a bulk workflow run restart. It includes a cursor for pagination and a list of resumed workflow runs with their new IDs and creation timestamps. ```json { "cursor": "", "workflowRuns": [ { "workflowRunId": "wfr_resumed_A", "workflowCreatedAt": 1748527971000 }, { "workflowRunId": "wfr_resumed_B", "workflowCreatedAt": 1748527971000 } ] } ``` -------------------------------- ### TypeScript: Start Trial in Stripe Step Source: https://upstash.com/docs/workflow/llms-txt Represents the 'start trial in Stripe' step within the Upstash Workflow. It calls a function to initiate a 14-day free trial for the user in Stripe. ```TypeScript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` -------------------------------- ### Install Workflow Packages (npm) Source: https://upstash.com/docs/workflow/llms-txt Installs essential packages for Upstash Workflow Agents API, including AI SDK, MathJS, Zod, and LangChain integrations. These are required for building and running AI-powered workflows. ```shell npm i ai mathjs zod @agentic.ai-sdk @agentic.ai-weather @langchain/core @langchain/community ``` -------------------------------- ### Install AI SDK Packages (npm, pnpm, bun) Source: https://upstash.com/docs/workflow/integrations/aisdk These commands install the necessary packages for integrating Upstash Workflow with the Vercel AI SDK using different package managers. Ensure you have the correct version of Vercel AI SDK (4.0.12 or higher) for ToolExecutionError handling. ```bash npm install @ai-sdk/openai ai zod ``` ```bash pnpm install @ai-sdk/openai ai zod ``` ```bash bun install @ai-sdk/openai ai zod ``` -------------------------------- ### Install Upstash Workflow SDK Source: https://upstash.com/docs/workflow/llms-txt Installs the Upstash Workflow SDK using different package managers. This is a prerequisite for integrating Upstash Workflows into a project. Supported managers include npm, pnpm, and bun. ```npm npm install @upstash/workflow ``` ```pnpm pnpm install @upstash/workflow ``` ```bun bun add @upstash/workflow ``` ```bash bun add @upstash/workflow ``` -------------------------------- ### Set Up Python Virtual Environment (Bash) Source: https://upstash.com/docs/workflow/llms-txt This command sequence demonstrates how to create and activate a Python virtual environment using the built-in `venv` module. This is a standard practice for Python development to manage project dependencies and isolate them from the global Python installation. ```bash python -m venv venv source venv/bin/activate ``` -------------------------------- ### Install Upstash Workflow SDK (npm) Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers Installs the Upstash Workflow SDK using npm. This is the first step in integrating Upstash Workflow into your Cloudflare Worker project. ```bash npm install @upstash/workflow ``` -------------------------------- ### Make REST API Call to Workflow Endpoint Source: https://upstash.com/docs/workflow/getstarted This snippet shows how to invoke a workflow endpoint using a cURL command. It demonstrates a POST request with a JSON payload. This is useful for testing or triggering workflows from the command line or other services. ```bash curl -X POST https:/// -b '{"hello": "there!"}' ``` -------------------------------- ### Start Stripe Trial (TypeScript/Python) Source: https://upstash.com/docs/workflow/llms-txt Initiates a user's trial period in Stripe using the Upstash Workflow context. The `startTrialInStripe` function is executed within `context.run` for reliable asynchronous operations. Requires the user's email as input. ```TypeScript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` ```Python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` -------------------------------- ### Initial Waiting Period with Upstash Sleep Source: https://upstash.com/docs/workflow/examples/customerOnboarding Implements an initial waiting period in the onboarding workflow using `context.sleep`. This pause allows time for the user to engage with the platform before further actions are taken. The duration is set to 3 days. Dependencies include the Upstash Workflow SDK. ```typescript await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` ```python await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` -------------------------------- ### Create New User in Stripe - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously creates a new user entry within the Stripe payment processing system. This function is essential for managing user subscriptions and payments. ```python async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) ``` -------------------------------- ### Python Customer Onboarding Workflow with FastAPI and Upstash Workflow Source: https://upstash.com/docs/workflow/llms-txt Implements a customer onboarding workflow in Python using FastAPI and Upstash Workflow. It handles user registration, welcome emails, and periodic activity checks. Uses `context.run` for atomic operations and `context.sleep` for non-blocking delays. Requires `fastapi` and `upstash-workflow`. ```python from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### Start Stripe Trial Source: https://upstash.com/docs/workflow/llms-txt Initiates a trial period for a user within the Stripe payment system. This function is typically wrapped in a context.run for reliable execution and may be followed by email notifications. ```TypeScript await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` ```Python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) ``` -------------------------------- ### Install Upstash Workflow and AI SDK Packages (npm) Source: https://upstash.com/docs/workflow/llms-txt Installs the Upstash Workflow SDK along with AI SDK and Zod for schema validation using npm. These packages are essential for integrating Upstash Workflow with AI functionalities. ```shell npm install @ai-sdk/openai ai zod ``` -------------------------------- ### Configure Environment Variables for Upstash Workflow Source: https://upstash.com/docs/workflow/llms-txt Copies the example environment file (`.env.example`) to `.env`. This `.env` file is used to store sensitive information and configuration settings required for the application's operation. ```bash cp .env.example .env ``` -------------------------------- ### Example Agent Output Logs Source: https://upstash.com/docs/workflow/llms-txt Illustrates the expected console output from a running workflow agent, detailing its internal thought process and the final response generated. ```text Inner thought: I will discuss the future of space exploration and the potential advancements in technology and missions. Final response: The future of space exploration holds exciting possibilities with advancements in technology, potential manned missions to Mars, increased commercial space travel, and exploration of distant celestial bodies. ``` -------------------------------- ### Configure .env.local for Local Tunnel (Text) Source: https://upstash.com/docs/workflow/llms-txt An example of the content for a `.env.local` file when utilizing a local tunnel. It requires the QSTASH_TOKEN from the Upstash Console and the public URL provided by the local tunnel service. ```text QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL= ``` -------------------------------- ### Default Qstash Client Initialization Source: https://upstash.com/docs/workflow/basics/serve Demonstrates the default initialization of a Qstash Client using base URL and token from environment variables. ```typescript new Client({ baseUrl: process.env.QSTASH_URL!, token: process.env.QSTASH_TOKEN!, }); ``` ```python AsyncQStash(os.environ["QSTASH_TOKEN"]) ``` -------------------------------- ### Install Workflow Agent Packages (Bash) Source: https://upstash.com/docs/workflow/agents/features Installs necessary npm packages for defining tools in the Workflow Agents API. Includes packages for AI, math, Zod, and specific SDKs. ```bash npm i ai mathjs zod @agentic/ai-sdk @agentic/weather @langchain/core @langchain/community ``` -------------------------------- ### JavaScript Social Media Manager Example with Upstash Agents Source: https://upstash.com/docs/workflow/llms-txt Illustrates a multi-platform social media management system for content moderation and engagement automation using Upstash Agents. This example is illustrative and requires an instantiated `UpstashAgent` object to function. ```javascript /* * Example: Social Media Manager * This example demonstrates social media management automation. */ // Assume 'agent' is an instance of the Upstash Agent // const agent = new UpstashAgent(); async function manageSocialMedia(postContent, platform) { console.log(`Managing social media on ${platform}...`); // Example: Post content to a platform // await agent.postContent(postContent, platform); // console.log('Content posted successfully.'); // Example: Moderate comments // const comments = await agent.getComments(platform); // const moderatedComments = await agent.moderateContent(comments); // console.log('Moderated comments:', moderatedComments); // Example: Automate engagement // await agent.automateEngagement(platform); // console.log('Engagement automated.'); console.log('Social media management complete.'); } // manageSocialMedia('Check out our new blog post!', 'twitter'); ``` -------------------------------- ### Configure ngrok Auth Token Source: https://upstash.com/docs/workflow/howto/local-development This command adds your ngrok authentication token to the configuration file. Replace '' with your actual token obtained from the ngrok dashboard. This step is crucial for authenticating ngrok with your account. ```bash ngrok config add-authtoken ``` -------------------------------- ### Get Flow-Control Information using Curl Source: https://upstash.com/docs/workflow/rest/flow-control/get This snippet demonstrates how to retrieve flow control information using a GET request with curl. It requires the flow control key and an authorization token. The response provides the flow control key and its wait list size. ```sh curl -X GET https://qstash.upstash.io/v2/flowControl/YOUR_FLOW_CONTROL_KEY -H "Authorization: Bearer " ``` -------------------------------- ### Publish JSON Message to Workflow Endpoint Source: https://upstash.com/docs/workflow/getstarted This snippet demonstrates how to publish a JSON message to a workflow endpoint using the Upstash client library. It specifies the target URL, request body, headers, and retry attempts. Ensure the workflow endpoint and route are correctly configured. ```javascript res = await client.message.publish_json( url="https:///", body={"hello": "there!"}, headers={}, retries=3, ) ``` -------------------------------- ### Create New User in Stripe - TypeScript Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously creates a new user entry within the Stripe payment processing system. This function is essential for managing user subscriptions and payments. ```typescript await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); ``` -------------------------------- ### Get Flow-Control Source: https://upstash.com/docs/workflow/rest/flow-control/get Retrieves information about a specific flow control setting using its key. ```APIDOC ## GET /v2/flowControl/{flowControlKey} ### Description Get Information on Flow-Control ### Method GET ### Endpoint /v2/flowControl/{flowControlKey} ### Parameters #### Path Parameters - **flowControlKey** (string) - Required - The key of the flow control. See the [flow control](/workflow/howto/flow-control) for more details. ### Request Example ```sh curl -X GET https://qstash.upstash.io/v2/flowControl/YOUR_FLOW_CONTROL_KEY -H "Authorization: Bearer " ``` ### Response #### Success Response (200) - **flowControlKey** (string) - The key of of the flow control. - **waitListSize** (integer) - The number of messages in the wait list that waits for `parallelism` set in the flow control. #### Response Example ```json { "flowControlKey": "YOUR_FLOW_CONTROL_KEY", "waitListSize": 10 } ``` ``` -------------------------------- ### Email Analyzer Example using Upstash Agents Source: https://upstash.com/docs/workflow/llms-txt Demonstrates an intelligent email processing system using Upstash Agents for content analysis, classification, and automated responses. This JavaScript example outlines the potential functionalities, though actual agent interactions are commented out. ```javascript /* * Example: Email Analyzer * This example demonstrates intelligent email processing. */ // Assume 'agent' is an instance of the Upstash Agent // const agent = new UpstashAgent(); async function analyzeEmail(emailContent) { console.log('Analyzing email content...'); // Example: Classify email content // const classification = await agent.classifyEmail(emailContent); // console.log('Email Classification:', classification); // Example: Extract key information // const extractedInfo = await agent.extractInformation(emailContent); // console.log('Extracted Information:', extractedInfo); // Example: Generate an automated response // const response = await agent.generateResponse(emailContent, classification); // console.log('Automated Response:', response); console.log('Email analysis complete.'); } // const sampleEmail = "Subject: Meeting Reminder\n\nHi Team, just a reminder about our meeting tomorrow at 10 AM."; // analyzeEmail(sampleEmail); ``` -------------------------------- ### Example Response from Multi-Agent Workflow Source: https://upstash.com/docs/workflow/llms-txt An example of a text response generated by a multi-agent workflow. This output details information about three Japanese cities, including their respective populations and a combined total, showcasing the workflow's ability to synthesize information. ```text Here is a brief overview of three cities in Japan: ``` -------------------------------- ### Image Processing Workflow in Python (FastAPI) Source: https://upstash.com/docs/workflow/examples/imageProcessing Implements an image processing workflow using Upstash Workflow with FastAPI. It handles image retrieval, resizing, filtering, and storage. This Python snippet showcases the use of `Serve`, `run`, and `call` for orchestrating tasks and interacting with external services. ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ``` -------------------------------- ### Add Email Analyzer Example Commit Source: https://upstash.com/docs/workflow/llms-txt This commit message indicates the addition of an example for an email analyzer, demonstrating how to process and analyze email data within the workflow system. ```git commit add email analyzer example (#52) ``` -------------------------------- ### Run Project Tests with Bun Source: https://upstash.com/docs/workflow/llms-txt Details the process for running project tests using the Bun runtime. It involves setting up environment variables by copying from a `.env.template` file to a `.env` file, and then executing the test command. This assumes a `test` script is defined in the project's `package.json`. ```bash bun run test ``` -------------------------------- ### GET /v2/workflows/dlq/{dlqId} Source: https://upstash.com/docs/workflow/rest/dlq/get Retrieves a single failed workflow run from the Dead Letter Queue (DLQ). ```APIDOC ## GET /v2/workflows/dlq/{dlqId} ### Description Get a single failed workflow run from the Dead Letter Queue (DLQ). ### Method GET ### Endpoint /v2/workflows/dlq/{dlqId} ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ id of the failed workflow run you want to retrieve. You will see this id when listing all workflow runs in the DLQ with the [/v2/workflows/dlq](/workflow/rest/dlq/list) endpoint. ### Request Example ```sh curl -X GET https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \ -H "Authorization: Bearer " ``` ### Response #### Success Response (200) - **messageId** (string) - The unique identifier for the message. - **url** (string) - The URL the workflow was directed to. - **method** (string) - The HTTP method used for the request. - **header** (object) - Headers associated with the workflow request. - **maxRetries** (integer) - The maximum number of retries allowed for the workflow. - **notBefore** (integer) - Timestamp indicating when the workflow can be retried. - **createdAt** (integer) - Timestamp when the workflow was created. - **failureCallback** (string) - URL for the failure callback. - **callerIP** (string) - The IP address of the caller. - **workflowRunId** (string) - The ID of the workflow run. - **workflowCreatedAt** (integer) - Timestamp when the workflow run was created. - **workflowUrl** (string) - The URL of the workflow. - **responseStatus** (integer) - The HTTP status code of the workflow response. - **responseHeader** (object) - Headers from the workflow response. - **responseBody** (string) - The body of the workflow response. - **failureCallbackInfo** (object) - Information about the failure callback. - **state** (string) - The state of the callback. - **responseStatus** (integer) - The HTTP status code of the callback response. - **responseBody** (string) - The body of the callback response. - **responseHeaders** (object) - Headers from the callback response. - **dlqId** (string) - The DLQ ID of the failed workflow run. #### Error Response (404) - **message** (string) - "Workflow run not found in DLQ." ### Response Example ```json { "messageId":"msg_26hZCxZCuWyyTWPmSVBrNC1RADwpgWxPcak2rQD51EMjFMuzcW7qYXpPiDyw8Gd", "url":"https://my.app/workflow", "method":"POST", "header":{ "Content-Type":[ "application/json" ] }, "maxRetries":10, "notBefore":1752829294505, "createdAt":1752829294505, "failureCallback":"https://my.app/workflow", "callerIP":"88.240.188.2", "workflowRunId":"wfr_5XAx4IJergqkGK1v23VzR", "workflowCreatedAt":1752829293531, "workflowUrl":"https://my.app/workflow", "responseStatus":489, "responseHeader":{ "Content-Type":[ "text/plain;charset=UTF-8" ] }, "responseBody":"{\"error\":\"WorkflowNonRetryableError\",\"message\":\"this workflow has stopped\"}", "failureCallbackInfo":{ "state":"CALLBACK_SUCCESS", "responseStatus":200, "responseBody":"{\"workflowRunId\":\"wfr_Q_khHG-a414M-xKRh2kNI\"}", "responseHeaders":{ "Content-Type":[ "text/plain;charset=UTF-8" ] } }, "dlqId":"1752829295505-0" } ``` ``` -------------------------------- ### Periodic User State Check and Emailing (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/customerOnboarding This code snippet demonstrates a recurring task that checks user engagement status every month. Based on the status ('active' or 'non-active'), it triggers different email notifications. The implementation uses a non-blocking sleep mechanism provided by `context.sleep` to optimize resource usage, making it suitable for long-running, serverless environments. ```typescript while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } ``` ```python while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` -------------------------------- ### Upstash Workflow Data Processing with Python (FastAPI) Source: https://upstash.com/docs/workflow/examples/allInOne This Python code defines an API endpoint for a FastAPI application using Upstash Workflow. It mirrors the functionality of the TypeScript example, orchestrating data download, chunking, OpenAI processing, and report generation/sending. It leverages Upstash Workflow's Python SDK and requires 'upstash-workflow' and 'fastapi'. ```python from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( ``` -------------------------------- ### Bulk Cancel Workflows Response Example Source: https://upstash.com/docs/workflow/llms-txt Example JSON response after successfully calling the Bulk Cancel Workflows API endpoint, indicating the number of workflow runs canceled. ```APIDOC ## Bulk Cancel Workflows Response Example ### Description The expected JSON response after successfully calling the Bulk Cancel Workflows API endpoint. It indicates the number of workflow runs that were canceled. ### Method (Not specified, assumed to be part of a larger API) ### Endpoint (Not specified) ### Parameters None ### Request Example (Not provided) ### Response #### Success Response (200) - **cancelled** (integer) - The number of workflow runs that were canceled. #### Response Example ```json { "cancelled": 10 } ``` ``` -------------------------------- ### Trigger Workflow via HTTP POST (curl) Source: https://upstash.com/docs/workflow/howto/start Starts a workflow by sending an HTTP POST request directly to the workflow endpoint using `curl`. This method is suitable for quick testing and requires specifying the endpoint URL, custom headers, and the request body as JSON. Note that this method is not supported if the endpoint is secured with signing keys. ```bash curl -X POST https:/// \ -H "my-header: foo" \ -d '{"foo": "bar"}' ``` -------------------------------- ### TypeScript: Customer Onboarding Workflow with Upstash Source: https://upstash.com/docs/workflow/llms-txt Demonstrates a multi-step customer onboarding workflow using Upstash Workflow. It sends a welcome email, pauses execution, generates a personalized follow-up message using OpenAI, and sends a follow-up email. It utilizes `context.run` for retries and `context.api.openai.call` for external service integration. ```TypeScript import { serve } from "@upstash/workflow/nextjs"; import { sendEmail } from "./emailUtils"; // Type-safety for starting our workflow interface InitialData { userId: string email: string name: string } export const { POST } = serve(async (context) => { const { userId, email, name } = context.requestPayload; // Step 1: Send welcome email await context.run("send-welcome-email", async () => { await sendEmail(email, "Welcome to our service!"); }); // Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3); // Step 3: AI-generate personalized follow-up message const { body: aiResponse } = await context.api.openai.call( "generate-personalized-message", { token: "", operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [ { role: "system", content: "You are an assistant creating personalized follow-up messages." }, { role: "user", content: `Create a short, friendly follow-up message for ${name} who joined our service 3 days ago.` } ] } } ); const personalizedMessage = aiResponse.choices[0].message.content; // Step 4: Send personalized follow-up email await context.run("send-follow-up-email", async () => { await sendEmail(email, personalizedMessage); }); }); ``` -------------------------------- ### Upstash Workflow Integration with Next.js and FastAPI (Python) Source: https://upstash.com/docs/workflow/llms-txt This Python example shows how to define and run a workflow using Upstash Workflow, intended for integration with a Next.js frontend and FastAPI backend. It includes an asynchronous handler function `process_data` and defines a simple workflow `my_python_workflow` with a single processing step. The example requires the `upstash_workflow` library and `asyncio` for asynchronous operations. ```Python from upstash_workflow import Workflow, Step import asyncio # Example workflow definition in Python async def process_data(data): print(f"Processing: {data}") # Simulate some processing await asyncio.sleep(1) return {"result": f"processed_{data}"} my_python_workflow = Workflow( name="my-python-fastapi-workflow", steps=[ Step(id="process", handler=process_data) ] ) # To run the workflow (example): # await my_python_workflow.run(data="sample input") ``` -------------------------------- ### Run Flask Application Source: https://upstash.com/docs/workflow/llms-txt Starts the Flask development server on port 8000. This command is used to run the application defined in 'main.py'. ```bash flask --app main run -p 8000 ``` -------------------------------- ### Bulk Restart Workflow Runs Request Example (cURL) Source: https://upstash.com/docs/workflow/rest/dlq/bulk-restart Example of how to restart multiple failed workflow runs using cURL. This command sends a POST request to the Upstash API, specifying various headers for flow control and retries. ```sh curl -X POST https://qstash.upstash.io/v2/workflows/dlq/restart \ -H "Authorization: Bearer " \ -H "Upstash-Flow-Control-Key: custom-key" \ -H "Upstash-Flow-Control-Value: parallelism=1" \ -H "Upstash-Retries: 3" ``` -------------------------------- ### Deploy Project with Vercel CLI Source: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi Deploys the project using the Vercel command-line interface. This command initiates the deployment process, preparing your project to be hosted on Vercel. ```bash vercel ``` -------------------------------- ### Deploy Project to Vercel Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask This snippet demonstrates the commands to deploy a project to Vercel. First, 'vercel' is used to set up the project for deployment, and then 'vercel --prod' is used to deploy the project to production. It's noted that the project requires production deployment for proper authentication and functionality. ```bash vercel vercel --prod ``` -------------------------------- ### JSON: Workflow Message Logs Example Source: https://upstash.com/docs/workflow/llms-txt This JSON example illustrates a successful response from the Upstash Workflow message logs API. It includes pagination details and event information such as state, IDs, and timestamps for a workflow run. ```json { "cursor": "", "events": [ { "time": 1738788333107, "state": "CREATED", "workflowRunId": "wfr_6MXE3GfddpBMWJM7s5WSRPqwcFm8", "workflowUrl": "http://my-workflow-server.com/workflowEndpoint", "workflowCreatedAt": 1738788333105, "stepInfo": { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_2KxeAKGVEjwDjNK1TVPormoRf7shRyNBpPThVbpvkuZNqri4cXp5nwSajNzAs6UWakvbco3qEPvtjQU3qxqjWarm2kisK", "concurrent": 1, "createdAt": 1738788333106 }, "nextDeliveryTime": 1738788333106 }, { } } ``` -------------------------------- ### Secure Workflow Serve Function (Python) Source: https://upstash.com/docs/workflow/howto/security Configure the Upstash workflow serve function using Python. This example shows how to use the `Receiver` with signing keys fetched from environment variables, integrated with a web framework like FastAPI. ```python from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` -------------------------------- ### Sync User - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously synchronizes a user by creating them in the database. It returns a dictionary containing the user's ID. This function is a prerequisite for subsequent user-related operations. ```python async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] ``` -------------------------------- ### Start ngrok HTTP Tunnel (Command Line) Source: https://upstash.com/docs/workflow/llms-txt Initiates an ngrok HTTP tunnel to expose a local server to the public internet. The `` parameter specifies the local port your application is running on, enabling ngrok to forward external requests to your local development environment. ```bash ngrok http ``` -------------------------------- ### Start Trial in Stripe with Upstash Workflow (Python) Source: https://upstash.com/docs/workflow/llms-txt Shows how to initiate a trial for a user in Stripe using Upstash Workflow. It defines an asynchronous function `_start_trial_in_stripe` that calls `start_trial_in_stripe` with the user's email, and then executes this function within the workflow context. This assumes `start_trial_in_stripe` and `context` are available. ```python async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` -------------------------------- ### Complete Workflow Example with Parallel Checks (TypeScript) Source: https://upstash.com/docs/workflow/howto/parallel-runs A full TypeScript example for a Next.js API route using Upstash Workflow. It showcases parallel execution of inventory checks for coffee beans, cups, and milk. If all items are available, it proceeds to brew coffee and print a receipt. Dependencies include `@upstash/workflow/nextjs` and utility functions `checkInventory`, `brewCoffee`, and `printReceipt`. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` -------------------------------- ### Set Up Basic Webhook Endpoint (Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates setting up a basic webhook endpoint using the `Serve` class in Python with FastAPI. It shows how to define an endpoint that accepts POST requests and uses `initial_payload_parser` for custom payload handling. This requires the `upstash-workflow` and `fastapi` libraries. ```Python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` -------------------------------- ### Trigger Multiple Workflows with Upstash Client (TypeScript) Source: https://upstash.com/docs/workflow/howto/start Initiates multiple workflow runs concurrently using the Upstash client's `trigger` method with an array of workflow configurations. Each configuration can specify the workflow URL and other optional parameters. This is useful for batch processing or starting several independent workflows simultaneously. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const results = await client.trigger([ { url: "", // other options... }, { url: "", // other options... }, ]) console.log(results[0].workflowRunId) // prints wfr_my-workflow ``` -------------------------------- ### Basic Webhook Endpoint Setup (Next.js) Source: https://upstash.com/docs/workflow/llms-txt Sets up a basic webhook endpoint in a Next.js application using `@upstash/workflow/nextjs`. The `serve` function handles incoming requests, and `initialPayloadParser` can be configured for custom payload processing. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` -------------------------------- ### Trigger Workflow via HTTP POST (requests Python) Source: https://upstash.com/docs/workflow/howto/start Starts a workflow by sending an HTTP POST request using the `requests` library in Python. This method requires the workflow endpoint URL, the request body formatted as JSON, and any custom headers. It's a common way to interact with external APIs, including workflow endpoints, from Python applications. ```python import requests requests.post( "https:///", json={"foo": "bar"}, headers={"my-header": "foo"} ) ``` -------------------------------- ### Fetch Workflow Logs with Go http client Source: https://upstash.com/docs/workflow/rest/runs/logs This Go code snippet illustrates how to fetch workflow logs using the standard `net/http` package. It creates a GET request, sets the Authorization header, sends the request, and handles the response. ```go req, err := http.NewRequest("GET", "https://qstash.upstash.io/v2/workflows/logs", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` -------------------------------- ### Generate and Send Report (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/allInOne This snippet demonstrates generating a report from aggregated data and sending it to a user. It utilizes asynchronous operations via `context.run` for background processing, suitable for serverless functions. Dependencies include `generateReport` and `sendReport` functions. ```typescript const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) ``` ```python async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` -------------------------------- ### Send Welcome Email in Upstash Workflow (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt This snippet demonstrates the initial step in an Upstash Workflow for customer onboarding. It uses `context.run` to execute the logic for sending a welcome email to a new user. No external dependencies are explicitly shown, but `sendEmail` is assumed to be available within the workflow context. ```TypeScript await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` -------------------------------- ### Configure .env for Local QStash Server Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Sets environment variables for connecting to a local QStash server. The QSTASH_URL is typically set to the local development server address. ```bash export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= ``` -------------------------------- ### Configure Local QStash Environment Variables (.env.local) Source: https://upstash.com/docs/workflow/llms-txt Adds QSTASH_URL and QSTASH_TOKEN to the .env.local file when using a local QStash server. This is a common setup step for local development to provide necessary credentials to the application. ```dotenv QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` -------------------------------- ### JSON: Workflow Run Details Source: https://upstash.com/docs/workflow/llms-txt Example JSON response detailing a workflow run. It includes the workflow run ID, URL, state, creation and completion timestamps, and a breakdown of steps executed in sequential and parallel execution flows. ```json { "cursor": "1686652644442-12", "runs": [ { "workflowRunId": "wfr_rj0Upr1rvdzGfz96fXNHh", "workflowUrl": "https://feasible-eft-notably.ngrok-free.app/api/call", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1736340463061, "workflowRunCompletedAt": 1736340464684, "steps": [ { "steps": [ { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_7YoJxFpwkEy5zBp378JgvD6YBDPBEqkBPje2JGTCEUiASMJQ1FwY9", "concurrent": 1, "state": "STEP_SUCCESS", "createdAt": 1736340463064 } ], "type": "sequential" }, { "steps": [ { "stepId": 1, "stepName": "external call", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNCtiJGNsULmt63vFfcZxQ3sfYFKLZe2dKww4BSb2kVF", "out": "1", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340464111 }, { "stepId": 2, "stepName": "external call 2", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNB882AMRP1TsgzpygELRcLWep4ACNTTsCHhrZuaNLij", "out": "2", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340463895 } ], "type": "parallel" } ] } ] } ``` -------------------------------- ### Handle Webhook Events with context.run (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Processes incoming webhook events within discrete, trackable steps using `context.run`. This example demonstrates handling a 'user.created' event and extracting user data. ```TypeScript import { serve } from "@upstash/workflow/nextjs" type UserPayload = { event: string userId: string email: string firstName: string } export const { POST } = serve(async (context) => { // ... Parse and validate the incoming request const user = await context.run( "handle-webhook-event", async () => { if (event.type === "user.created") { const { id: clerkUserId, email_addresses, first_name } = event.data const primaryEmail = email_addresses.find( (email) => (email.id = event.data.primary_email_address_id) ) if (!primaryEmail) { return false } return { event: event.type, userId: clerkUserId, email: primaryEmail.email_address, firstName: first_name, } as UserPayload } return false } ) }); ``` -------------------------------- ### Run SvelteKit App and Trigger Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/svelte Demonstrates how to start a SvelteKit development server and then trigger the defined workflow endpoint using `curl`. This process initiates a workflow run, and the endpoint responds with a unique `workflowRunId` for tracking purposes. ```bash npm run dev curl -X POST https://localhost:5000/api/workflow \ -H "content-type: application/json" # result: {"workflowRunId":"wfr_xxxxxx"} ``` -------------------------------- ### List DLQ with cURL Source: https://upstash.com/docs/workflow/llms-txt This example demonstrates how to list failed workflow runs from the Dead Letter Queue (DLQ) using a cURL command, including authorization. ```APIDOC ## List DLQ with cURL ### Description Example cURL command to list failed workflow runs from the DLQ, including the necessary authorization header. ### Method GET ### Endpoint `https://qstash.upstash.io/v2/workflows/dlq` ### Headers - `Authorization`: `Bearer ` (Required) ### Request Example ```bash curl https://qstash.upstash.io/v2/workflows/dlq \ -H "Authorization: Bearer " ``` ``` -------------------------------- ### Cloudflare Workers Integration with Upstash Workflow (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example shows how to define and trigger an Upstash Workflow within a Cloudflare Workers application. It includes a basic workflow definition with a single step and an example of how to initiate the workflow from a worker's fetch handler. ```typescript import { Workflow } from '@upstash/workflow'; // Example workflow definition for Cloudflare Workers const cfWorkflow = new Workflow({ name: 'my-cloudflare-workflow', steps: [ { id: 'worker-step', handler: async ({ input }) => { console.log('Cloudflare Worker received:', input); // Perform actions specific to Cloudflare Workers return { message: 'Cloudflare Worker step done' }; }, }, ], }); // Example of triggering the workflow from a worker route // export default { // async fetch(request) { // await cfWorkflow.run({ data: 'trigger data' }); // return new Response('Workflow triggered!'); // }, // }; ``` -------------------------------- ### Apply Filters to Resized Images - Python Source: https://upstash.com/docs/workflow/examples/imageProcessing Applies predefined filters (grayscale, sepia, contrast) to resized images. It iterates through each resized image and applies each filter by calling an external service via context.call. The results are collected for further processing. ```python filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] ``` -------------------------------- ### Trigger Single Workflow with Upstash Client (TypeScript) Source: https://upstash.com/docs/workflow/howto/start Initiates a single workflow run using the Upstash client's `trigger` method. This method allows for specifying the workflow URL, request body, headers, a unique workflow run ID, retry attempts, delay, a failure URL, and whether a failure function is defined. It's the recommended programmatic approach for starting workflows. ```typescript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3, // optional retries in the initial request delay: "10s", // optional delay value failureUrl: "https://", // optional failure url useFailureFunction: true, // whether a failure function is defined in the endpoint flowControl: { ... } // optional flow control }) console.log(workflowRunId) // prints wfr_my-workflow ``` -------------------------------- ### Implement 'Ask AI' Tool with context.run() (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Shows how to implement an 'Ask AI' tool by wrapping its execution logic within a `context.run()` call in JavaScript. This snippet includes a placeholder for a weather API call and returns mock weather data, illustrating a common pattern for integrating tools within workflows. ```javascript execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) ``` -------------------------------- ### Download Dataset with Context Call (TypeScript) Source: https://upstash.com/docs/workflow/examples/allInOne Retrieves a dataset URL and downloads the dataset using `context.call` for HTTP requests. This method is suitable for long-running operations that might exceed standard serverless function limits. It takes a dataset ID as input and returns the dataset content. ```typescript const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) ``` -------------------------------- ### Correct Step Execution Order (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates the correct way to structure a workflow step before calling `generateText`. This example ensures the prompt is retrieved from the request payload, preventing a 'prompt is undefined' error. ```typescript export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Will throw "prompt is undefined" const result = await generateText({ model: openai('gpt-3.5-turbo'), prompt: context.requestPayload.prompt }); }); ``` -------------------------------- ### Generate and Send Data Report (Python) Source: https://upstash.com/docs/workflow/examples/allInOne This section outlines the final steps of the workflow: generating a comprehensive data report using `generate_report` and then sending this report to the user via `send_report`. Both operations are executed asynchronously using `context.run` to manage long-running tasks. ```python # Step 3: Generate and send data report async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` -------------------------------- ### Create .env.local File for Nuxt.js Source: https://upstash.com/docs/workflow/quickstarts/nuxt Creates a `.env.local` file in the project root to store environment variables, specifically for local development configurations. This is a standard practice for managing secrets and configurations in Node.js applications. ```bash touch .env.local ``` -------------------------------- ### Set Up Basic Webhook Endpoint (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates setting up a basic webhook endpoint using the `serve` function in TypeScript for Next.js applications. It initializes an endpoint that receives webhook payloads and uses `initialPayloadParser` for custom parsing. This requires the `@upstash/workflow/nextjs` package. ```TypeScript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` -------------------------------- ### Sync User - TypeScript Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously synchronizes a user by creating them in the database. It returns an object containing the user's ID. This function is a prerequisite for subsequent user-related operations. ```typescript const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); ``` -------------------------------- ### Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/llms-txt This snippet demonstrates sending a welcome email within a Python Upstash Workflow. It defines an asynchronous function to send the email and then executes it using `context.run`, similar to TypeScript examples. ```python # Python equivalent would involve similar context.run calls if a Python SDK is used for workflow execution. # Example placeholder: # report = context.run("generate-report", lambda: generate_report(request.dataset_id)) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### Send Email - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously sends an email to a specified recipient with given content. This is a general-purpose utility function used across the workflow for various notifications. ```python async def send_email(email: str, content: str) -> None: # Implement logic to send an email print("Sending email to", email, content) ``` -------------------------------- ### Define SolidJS Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/solidjs Defines a workflow endpoint in a SolidJS application using the `@upstash/workflow/solidjs` library. This endpoint, when triggered, executes a series of defined steps ('initial-step', 'second-step') with console logging. It requires the `@upstash/workflow/solidjs` package. ```typescript import { serve } from "@upstash/workflow/solidjs" export const { POST } = serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ``` -------------------------------- ### Basic Webhook Endpoint Setup - TypeScript Source: https://upstash.com/docs/workflow/howto/use-webhooks Sets up a basic webhook endpoint using the `serve` function from `@upstash/workflow/nextjs`. This function handles incoming requests and allows for custom logic within an asynchronous callback. It also includes an `initialPayloadParser` for pre-processing the incoming payload. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` -------------------------------- ### Image Processing Workflow in TypeScript (Next.js) Source: https://upstash.com/docs/workflow/examples/imageProcessing Implements an image processing workflow using Upstash Workflow with Next.js. It handles image retrieval, resizing, filtering, and storage. This snippet utilizes `serve`, `run`, and `call` for orchestration and external service interaction. ```typescript import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` -------------------------------- ### Get Failed Workflow Run from DLQ Source: https://upstash.com/docs/workflow/llms-txt Retrieves a specific failed workflow run from the Dead Letter Queue (DLQ) using its DLQ ID. Requires an authorization token. The API endpoint is GET /v2/workflows/dlq/{dlqId}. ```APIDOC ## GET /v2/workflows/dlq/{dlqId} ### Description Get a single failed workflow run from the Dead Letter Queue (DLQ). ### Method GET ### Endpoint `/v2/workflows/dlq/{dlqId}` ### Parameters #### Path Parameters - **dlqId** (string) - Required - The DLQ id of the failed workflow run you want to retrieve. This ID can be obtained from the [/v2/workflows/dlq](https://upstash.com/docs/workflow/rest/dlq/list) endpoint. ### Request Example ```bash curl -X GET https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \ -H "Authorization: Bearer " ``` ### Response #### Success Response (200) - **messageId** (string) - The ID of the message. - **url** (string) - The URL the workflow was intended to be sent to. - **method** (string) - The HTTP method for the request. - **header** (object) - The headers for the request. - **maxRetries** (number) - The maximum number of retries for the workflow. - **notBefore** (number) - The timestamp before which the message should not be delivered. - **createdAt** (number) - The timestamp when the workflow was created. - **failureCallback** (string) - The URL to call back upon failure. - **callerIP** (string) - The IP address of the caller. - **workflowRunId** (string) - The ID of the workflow run. - **workflowCreatedAt** (number) - The timestamp when the workflow run was created. - **workflowUrl** (string) - The URL of the workflow. - **responseStatus** (number) - The HTTP status code of the response. - **responseHeader** (object) - The headers of the response. - **responseBody** (string) - The body of the response. - **failureCallbackInfo** (object) - Information about the failure callback. - **dlqId** (string) - The ID of the Dead Letter Queue entry. #### Response Example ```json { "messageId":"msg_26hZCxZCuWyyTWPmSVBrNC1RADwpgWxPcak2rQD51EMjFMuzcW7qYXpPiDyw8Gd", "url":"https://my.app/workflow", "method":"POST", "header":{ "Content-Type":[ "application/json" ] }, "maxRetries":10, "notBefore":1752829294505, "createdAt":1752829294505, "failureCallback":"https://my.app/workflow", "callerIP":"88.240.188.2", "workflowRunId":"wfr_5XAx4IJergqkGK1v23VzR", "workflowCreatedAt":1752829293531, "workflowUrl":"https://my.app/workflow", "responseStatus":489, "responseHeader":{ "Content-Type":[ "text/plain;charset=UTF-8" ] }, "responseBody":"{\"error\":\"WorkflowNonRetryableError\",\"message\":\"this workflow has stopped\"}", "failureCallbackInfo":{ "state":"CALLBACK_SUCCESS", "responseStatus":200, "responseBody":"{\"workflowRunId\":\"wfr_Q_khHG-a414M-xKRh2kNI\"}", "responseHeaders":{ "Content-Type":[ "text/plain;charset=UTF-8" ] } }, "dlqId":"1752829295505-0" } ``` ``` -------------------------------- ### GET /websites/upstash-workflow/runs Source: https://upstash.com/docs/workflow/rest/runs/logs Fetches details about workflow runs, allowing filtering by various parameters and pagination. ```APIDOC ## GET /websites/upstash-workflow/runs ### Description Fetch details about workflow runs, including their state, completed and in-progress steps, and step details. You can fetch details about workflow runs, including their state, completed and in-progress steps, and step details. The retention duration for completed workflow runs depends on your quota. Please check the [pricing](https://upstash.com/pricing/workflow) page for details. **Note:** If you have executed multiple workflow runs with the same workflowRunId, the `workflowRunId` filter will return all of them. To uniquely identify a single workflow run, include the `workflowCreatedAt` timestamp in your filter. ### Method GET ### Endpoint /websites/upstash-workflow/runs ### Parameters #### Query Parameters - **cursor** (string) - Optional - By providing a cursor you can paginate through all of the workflow runs. - **workflowRunId** (string) - Optional - Filter workflow runs by run id. - **workflowUrl** (string) - Optional - Filter workflow runs by workflow url. - **workflowCreatedAt** (number) - Optional - Filter workflow runs by the unix milliseconds value of creation timestamp. - **state** (string) - Optional - Filter workflow runs by state. Possible values: `RUN_STARTED`, `RUN_SUCCESS`, `RUN_FAILED`, `RUN_CANCELED`. - **fromDate** (number) - Optional - Filter workflow runs by starting date, in milliseconds (Unix timestamp). This is inclusive. - **toDate** (number) - Optional - Filter workflow runs by ending date, in milliseconds (Unix timestamp). This is inclusive. - **count** (number) - Optional - The number of workflow runs to return. Default and max is 10. ### Request Example ```json { "message": "This endpoint does not have a request body. Parameters are passed via query string." } ``` ### Response #### Success Response (200) - **label** (string) - Filter workflow run by the label assigned by the user. (Note: This field appears to be miscategorized in the source and is listed as a response field but described as a filter. Assuming it's intended as a response field for now). (Further response fields are not detailed in the provided text.) #### Response Example ```json { "runs": [ { "workflowRunId": "run_abc123", "workflowUrl": "http://example.com/workflow", "workflowCreatedAt": 1678886400000, "state": "RUN_SUCCESS", "steps": [ { "name": "step1", "status": "COMPLETED", "startTime": 1678886401000, "endTime": 1678886405000 } ] } ], "nextCursor": "cursor_xyz789" } ``` ``` -------------------------------- ### Basic Webhook Endpoint Setup - Python Source: https://upstash.com/docs/workflow/howto/use-webhooks Sets up a basic webhook endpoint using the `Serve` class from `upstash_workflow.fastapi`. This integrates with FastAPI to create an asynchronous endpoint that accepts a specific payload parser. The `initial_payload_parser` is used to transform the incoming data before it's processed. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` -------------------------------- ### Send Trial Warning Email (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a warning email to users 2 days before their trial ends, prompting them to upgrade. The workflow first waits for the appropriate time, checks if the user has already upgraded, and then sends the warning email if they haven't. It depends on `context.sleep`, `checkUpgradedPlan`/`check_upgraded_plan`, and `sendEmail`. ```typescript await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); ``` ```python await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) ``` -------------------------------- ### Workflow Execution Response Example (JSON) Source: https://upstash.com/docs/workflow/rest/runs/logs This JSON snippet shows a sample response from a workflow execution. It details the overall status and the results of individual steps, including parallel execution. The data includes message IDs, output values, concurrency settings, and creation timestamps. ```json { "messageId": "msg_26hZCxZCuWyyTWPmSVBrNB882AMRP1TsgzpygELRcLWep4ACNTTsCHhrZuaNLij", "out": "2", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340463895 } ``` -------------------------------- ### Resume Bulk Workflow Runs with cURL Source: https://upstash.com/docs/workflow/rest/dlq/bulk-resume This example demonstrates how to resume multiple failed workflow runs using a cURL command. It shows how to specify authorization, flow control headers, and retry configurations for the bulk resume operation. ```sh curl -X POST https://qstash.upstash.io/v2/workflows/dlq/resume \ -H "Authorization: Bearer " \ -H "Upstash-Flow-Control-Key: custom-key" \ -H "Upstash-Flow-Control-Value: parallelism=1" \ -H "Upstash-Retries: 3" ``` -------------------------------- ### Create SvelteKit Workflow Endpoint with Upstash Source: https://upstash.com/docs/workflow/quickstarts/svelte Defines a workflow endpoint using the `@upstash/workflow/svelte` library in a SvelteKit `+server.ts` file. It utilizes environment variables for configuration and registers two sequential steps, 'initial-step' and 'second-step', within the workflow. The `serve` function handles the workflow execution and exposes it as a POST endpoint. ```typescript import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ``` -------------------------------- ### Charge Customer in TypeScript and Python Source: https://upstash.com/docs/workflow/examples/paymentRetry Demonstrates how to charge a customer using a 'chargeCustomer' function. Includes error handling with a try-catch block to prevent workflow retries on payment failure and allows for custom logic. Returns a result object or null on error. ```typescript const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1) } catch (e) { console.error(e) return } }); ``` ```python async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) ``` -------------------------------- ### Get Failed Workflow Run from DLQ using cURL Source: https://upstash.com/docs/workflow/rest/dlq/get This snippet demonstrates how to retrieve a failed workflow run from the Dead Letter Queue (DLQ) using the cURL command-line tool. It requires the DLQ ID and an authorization token. ```sh curl -X GET https://qstash.upstash.io/v2/workflows/dlq/my-dlq-id \ -H "Authorization: Bearer " ``` -------------------------------- ### GET /v1/workflows Source: https://upstash.com/docs/workflow/llms-txt Retrieves a list of all available workflows. This endpoint provides an overview of the workflows managed within the Upstash Workflow service. ```APIDOC ## GET /v1/workflows ### Description Retrieves a list of all available workflows. This endpoint provides an overview of the workflows managed within the Upstash Workflow service. ### Method GET ### Endpoint /v1/workflows ### Parameters #### Query Parameters (No query parameters defined in the provided documentation) #### Headers - **Authorization** (string) - Required - Bearer token for authentication (e.g., `Bearer `) ### Response #### Success Response (200 OK) (Response schema not provided in the documentation) #### Error Responses (Error responses not specified in the documentation for this endpoint, but typically include 401 Unauthorized) ``` -------------------------------- ### QStash CLI Development Server Output (Plaintext) Source: https://upstash.com/docs/workflow/llms-txt Displays the startup output of the QStash local development server. This output includes the server's address and default authorization credentials (token and signing keys), which are essential for configuring applications to connect to the local server. ```plaintext Upstash QStash development server is runnning at http://127.0.0.1:8080 A default user has been created for you to authorize your requests. QSTASH_TOKEN=eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= QSTASH_CURRENT_SIGNING_KEY=sig_7RvLjqfZBvP5KEUimQCE1pvpLuou QSTASH_NEXT_SIGNING_KEY=sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE Sample cURL request: curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" Check out documentation for more details: https://upstash.com/docs/qstash/howto/local-development ``` -------------------------------- ### Create Workflow Step with Upstash JS SDK (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt An example using the Upstash Workflow JavaScript library to create a new step. It shows how to instantiate the `Workflow` object with credentials and then use the `create_step` method to define a step's name, handler logic (as a string), and the next step in the sequence. ```javascript const workflow = new Workflow('your-workflow-id', 'your-api-key'); const stepData = { name: 'my-first-step', handler: 'console.log(\'Hello, Upstash!\')', next: 'another-step-id' }; workflow.create_step(stepData) .then(step => { console.log('Step created:', step); }) .catch(error => { console.error('Error creating step:', error); }); ``` -------------------------------- ### TypeScript: Custom OpenAI Client for Text Generation Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example demonstrates integrating a custom OpenAI client within an Upstash Workflow for text generation. It includes steps for obtaining the prompt, generating text, logging the output, and handling potential `WorkflowAbort` errors. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve < { prompt: string } > (async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` -------------------------------- ### Define Workflow Endpoint with Next.js Source: https://upstash.com/docs/workflow/llms-txt This example shows how to define a workflow endpoint using the `serve` method in a Next.js application with TypeScript. It includes handling sequential steps and string payloads. ```APIDOC ## POST /api/workflow ### Description Defines a workflow endpoint using the `serve` method in a Next.js application. ### Method POST ### Endpoint `/api/workflow` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body - **input** (string) - The input payload for the workflow step. ### Request Example ```typescript import { serve } from "@upstash/workflow/nextjs"; // mock function const someWork = (input: string) => { // ... implementation ... }; export const { POST } = serve(async (context) => { const payload = context.requestPayload; await context.run("do-some-work", () => someWork(payload)); }); ``` ### Response #### Success Response (200) - **result** (any) - The result of the workflow execution. #### Response Example ```json { "result": "workflow completed successfully" } ``` ``` -------------------------------- ### FastAPI Workflow Endpoint using Upstash Source: https://upstash.com/docs/workflow/llms-txt Defines a workflow endpoint in a FastAPI application using the `upstash_workflow.fastapi.Serve` class. This example illustrates creating a workflow with two sequential steps, `initial-step` and `second-step`. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context): async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` -------------------------------- ### Fetch Message Logs using Go Source: https://upstash.com/docs/workflow/rest/runs/message-logs This Go program demonstrates how to fetch message logs from the Upstash Workflow API. It constructs an HTTP GET request, sets the Authorization header, and sends the request using the default HTTP client. ```go req, err := http.NewRequest("GET", "https://qstash.upstash.io/v2/workflows/messageLogs", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` -------------------------------- ### Configure Local QStash Server Environment Variables Source: https://upstash.com/docs/workflow/llms-txt Sets environment variables for connecting to a local QStash server during development. `QSTASH_URL` specifies the address of the local QStash instance, and `QSTASH_TOKEN` provides the authentication token. This setup allows for local testing of Upstash Workflows without using the production QStash service. ```txt QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` ```bash QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` -------------------------------- ### Resize Image to Multiple Resolutions - Python Source: https://upstash.com/docs/workflow/examples/imageProcessing Resizes an image to specified resolutions (640, 1280, 1920) using an external service. It iterates through resolutions, calling the service via context.call for each. The responses are collected and their body parts extracted. ```python resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] ``` -------------------------------- ### Client.getWaiters - Get Waiters for an Event Source: https://upstash.com/docs/workflow/llms-txt Retrieves a list of all workflow runs that are currently waiting for a specific event ID. ```APIDOC ## Client.getWaiters - Get Waiters for an Event ### Description Retrieve a list of all workflow runs that are currently waiting for a specific event ID. This helps in understanding which workflows are pending a particular event. ### Method (Client method) ### Endpoint (N/A - Client method) ### Parameters #### Path Parameters None #### Query Parameters - **eventId** (string) - Required - The ID of the event to get waiters for. #### Request Body None ### Request Example ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ``` ### Response #### Success Response - **waiters** (array) - A list of workflow runs waiting for the event. - **runId** (string) - The ID of the waiting workflow run. - **workflowId** (string) - The ID of the workflow. #### Response Example ```json { "waiters": [ { "runId": "run_jkl789ghi", "workflowId": "wf_mno012def" } ] } ``` ``` -------------------------------- ### Configure .env for Local Tunnel Source: https://upstash.com/docs/workflow/quickstarts/nextjs-flask Sets environment variables for using a local tunnel with Upstash Workflow. This requires your QStash token and the public URL provided by your tunneling service. ```bash export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL= ``` -------------------------------- ### Store Processed Images - TypeScript Source: https://upstash.com/docs/workflow/examples/imageProcessing Stores the processed image URLs in cloud storage. It uses Promise.all to concurrently run the storage operation for all processed images, ensuring efficiency. The storeImage function is called within a context run command. ```typescript const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) ``` -------------------------------- ### Run Project Tests with Bun Source: https://upstash.com/docs/workflow/sdk/workflow-js This command executes the project's test suite using Bun. It assumes that environment variables required for testing are configured, typically in a `.env` file. ```bash bun run test ``` -------------------------------- ### Dispatch Order (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Following successful payment confirmation, this snippet initiates the order dispatch process. It utilizes the 'dispatch-order' context to call the relevant function, which typically involves preparing the items for shipment. This is a crucial step in fulfilling the customer's order. ```typescript await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) ``` ```python async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) ``` -------------------------------- ### Ensure Idempotency in `context.run` (Python) Source: https://upstash.com/docs/workflow/basics/caveats Provides examples of how to make business logic idempotent when using `context.run` in Python workflows. This practice is essential for reliability, especially when dealing with potential system retries. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: return await some_work(input) await context.run("step-1", _step_1) ``` -------------------------------- ### Create User, Integrate with Stripe, and Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/llms-txt Handles user creation in a database and Stripe, initiates a trial, and sends a welcome email. It also includes a webhook endpoint for authentication provider events. This function orchestrates several asynchronous operations. ```python class UserCreatedPayload(TypedDict): name: str email: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: print("Starting a trial of 14 days in Stripe for", email) async def send_email(email: str, content: str) -> None: print("Sending email to", email, content) @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: ``` -------------------------------- ### Check Upgraded Plan - Python Source: https://upstash.com/docs/workflow/examples/authWebhook Asynchronously checks if a user has upgraded their plan based on their email address. It returns a boolean value indicating the upgrade status. This is used to determine workflow continuation. ```python async def check_upgraded_plan(email: str) -> bool: # Implement logic to check if the user has upgraded the plan print("Checking if the user has upgraded the plan", email) return False ``` -------------------------------- ### Send Trial Ended Email (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/authWebhook Sends a trial ended email to users who have not upgraded their plan. This snippet first introduces a delay using `context.sleep` to wait for the trial to end, then executes the email sending logic within `context.run`. It requires the user's email and the email content as input. ```typescript await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); ``` ```python await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` -------------------------------- ### Parallel Inventory Check and Coffee Brewing Workflow (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example demonstrates running parallel inventory checks for coffee beans, cups, and milk using `Promise.all`. If all items are available, it proceeds to brew coffee and print a receipt. It leverages `ctx.run` to execute individual workflow steps and is designed for Next.js applications. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` -------------------------------- ### Include At Least One Step in Upstash Workflow Source: https://upstash.com/docs/workflow/basics/caveats This code illustrates the requirement for every Upstash Workflow to contain at least one step executed using `context.run`. Workflows without any steps will fail authentication and return a `Failed to authenticate Workflow request.` error. Examples are provided for both incorrect (missing steps) and correct (including a dummy step) implementations in TypeScript and Python. ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload // πŸ‘‡ Problem: No context.run call console.log("Processing input:", input) // This workflow will fail with "Failed to authenticate Workflow request." }) ``` ```typescript export const { POST } = serve(async (context) => { const input = context.requestPayload // πŸ‘‡ At least one step is required await context.run("dummy-step", async () => { return }) }) ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # πŸ‘‡ Problem: No context.run call print("Processing input:", input) # This workflow will fail with "Failed to authenticate Workflow request." ``` ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # πŸ‘‡ At least one step is required async def _dummy_step(): return await context.run("dummy-step", _dummy_step) ``` -------------------------------- ### GET /v2/workflows/logs Source: https://upstash.com/docs/workflow/rest/runs/logs Retrieves logs for workflow runs. This endpoint allows filtering by workflow run ID, workflow server URL, or workflow state. ```APIDOC ## GET /v2/workflows/logs ### Description Retrieves logs for workflow runs. This endpoint allows filtering by workflow run ID, workflow server URL, or workflow state. ### Method GET ### Endpoint /v2/workflows/logs ### Parameters #### Query Parameters - **workflowRunId** (string) - Optional - Filter logs by a specific workflow run ID. - **workflowUrl** (string) - Optional - Filter logs by the URL of the workflow server. - **state** (string) - Optional - Filter logs by the state of the workflow run (e.g., RUN_SUCCESS, RUN_FAILED). ### Request Example ```sh curl curl https://qstash.upstash.io/v2/workflows/logs \ -H "Authorization: Bearer " ``` ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // Filter by workflow run ID const { runs } = await client.logs({ workflowRunId: ""}); // Filter by workflow server url const { runs } = await client.logs({ workflowUrl: ""}); // Filter by state const { runs } = await client.logs({ state: "RUN_SUCCESS"}); ``` ### Response #### Success Response (200) - **cursor** (string) - A cursor for pagination, used to retrieve the next set of results. - **runs** (array) - An array of workflow run objects, each containing details about a specific run. - **workflowRunId** (string) - The unique identifier for the workflow run. - **workflowUrl** (string) - The URL of the workflow server for this run. - **workflowState** (string) - The current state of the workflow run (e.g., RUN_SUCCESS, RUN_FAILED). - **workflowRunCreatedAt** (number) - The Unix timestamp (in milliseconds) when the workflow run was created. - **workflowRunCompletedAt** (number) - The Unix timestamp (in milliseconds) when the workflow run was completed. - **steps** (array) - An array of step objects representing the execution flow of the workflow. - **stepName** (string) - The name of the step. - **stepType** (string) - The type of the step (e.g., Initial, Run, Call). - **callType** (string) - The type of call made by the step (e.g., step). - **messageId** (string) - The message ID associated with the step. - **concurrent** (number) - Indicates if the step was executed concurrently. - **state** (string) - The state of the individual step. - **createdAt** (number) - The Unix timestamp (in milliseconds) when the step was created. - **callHeaders** (string) - The HTTP headers of the request sent to the external address. Available only if stepType is `Call`. - **callBody** (string) - The body of the request sent to the external address. Available only if stepType is `Call`. - **callResponseStatus** (number) - The HTTP status returned by the external call. Available only if stepType is `Call`. - **callResponseBody** (string) - The body returned by the external call. Available only if stepType is `Call`. - **callResponseHeaders** (array) - The HTTP headers returned by the external call. Available only if stepType is `Call`. - **invokedWorkflowRunId** (string) - The ID of the invoked workflow run if this step is an invoke step. - **invokedWorkflowUrl** (string) - The URL address of the workflow server of invoked workflow run if this step is an invoke step. - **invokedWorkflowCreatedAt** (number) - The Unix timestamp (in milliseconds) when the invoked workflow was started if this step is an invoke step. - **invokedWorkflowRunBody** (string) - The body passed to the invoked workflow if this step is an invoke step. - **invokedWorkflowRunHeaders** (string) - The HTTP headers passed to invoked workflow if this step is an invoke step. #### Response Example ```json { "cursor": "1686652644442-12", "runs": [ { "workflowRunId": "wfr_rj0Upr1rvdzGfz96fXNHh", "workflowUrl": "https://feasible-eft-notably.ngrok-free.app/api/call", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1736340463061, "workflowRunCompletedAt": 1736340464684, "steps": [ { "steps": [ { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_7YoJxFpwkEy5zBp378JgvD6YBDPBEqkBPje2JGTCEUiASMJQ1FwY9", "concurrent": 1, "state": "STEP_SUCCESS", "createdAt": 1736340463064 } ], "type": "sequential" }, { "steps": [ { "stepId": 1, "stepName": "external call", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNCtiJGNsULmt63vFfcZxQ3sfYFKLZe2dKww4BSb2kVF", "out": "1", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340464111 }, { "stepId": 2, "stepName": "external call 2", "stepType": "Run", "callType": "step" } ], "type": "sequential" } ] } ] } ``` ``` -------------------------------- ### GET /v2/workflows/messageLogs Source: https://upstash.com/docs/workflow/rest/runs/message-logs Retrieves a log of messages associated with Upstash Workflows. This endpoint is useful for monitoring workflow execution and debugging. ```APIDOC ## GET /v2/workflows/messageLogs ### Description Retrieves a log of messages associated with Upstash Workflows. This endpoint is useful for monitoring workflow execution and debugging. ### Method GET ### Endpoint /v2/workflows/messageLogs ### Parameters #### Query Parameters - **limit** (number) - Optional - The maximum number of messages to return. - **offset** (string) - Optional - The cursor for pagination, indicating the starting point for the next set of results. ### Request Example ```sh curl curl https://qstash.upstash.io/v2/workflows/messageLogs \ -H "Authorization: Bearer " ``` ### Response #### Success Response (200) - **cursor** (string) - A cursor for paginating through results. - **events** (array) - An array of workflow event objects. - **time** (number) - The timestamp (in milliseconds) when the event occurred. - **state** (string) - The state of the workflow run (e.g., CREATED, RUN_STARTED). - **workflowRunId** (string) - The unique identifier for the workflow run. - **workflowUrl** (string) - The URL of the workflow server. - **workflowCreatedAt** (number) - The Unix timestamp (in milliseconds) when the workflow was started. - **stepInfo** (object) - Information about the current step in the workflow. - **stepName** (string) - The name of the step. - **stepType** (string) - The type of the step (e.g., Initial, SleepUntil, SleepFor, Call, Wait, Invoke). - **callType** (string) - The type of call for the step. - **messageId** (string) - The ID of the message associated with the step. - **concurrent** (number) - Concurrency level of the step. - **createdAt** (number) - The Unix timestamp (in milliseconds) when the step was created. - **nextDeliveryTime** (number) - The next scheduled delivery time for the message (in milliseconds). - **sleepUntil** (number) - The unix timestamp (in milliseconds) until which the step will sleep. Only set if stepType is `SleepUntil`. - **sleepFor** (number) - The duration in milliseconds for which the step will sleep. Only set if stepType is `SleepFor`. - **callUrl** (string) - The URL of the external address. Available only if stepType is `Call`. - **callMethod** (string) - The HTTP method of the request sent to the external address. Available only if stepType is `Call`. - **callHeaders** (string) - The HTTP headers of the request sent to the external address. Available only if stepType is `Call`. - **callBody** (string) - The body of the request sent to the external address. Available only if stepType is `Call`. - **callResponseStatus** (number) - The HTTP status returned by the external call. Available only if stepType is `Call`. - **callResponseBody** (string) - The body returned by the external call. Available only if stepType is `Call`. - **callResponseHeaders** (array) - The HTTP headers returned by the external call. Available only if stepType is `Call`. - **waitEventId** (string) - The event ID of the wait step. Only set if stepType is `Wait`. - **waitTimeoutDeadline** (string) - The Unix timestamp (in milliseconds) when the wait will time out. - **waitTimeoutDuration** (string) - The duration of timeout in human-readable format (e.g., 120s, 1m, 1h). - **waitTimeout** (string) - Set to true if this step is causing a wait timeout rather than notifying the waiter. - **invokedWorkflowRunId** (string) - The ID of the invoked workflow run if this step is an invoke step. - **invokedWorkflowUrl** (string) - The URL address of the workflow server of the invoked workflow run if this step is an invoke step. - **invokedWorkflowCreatedAt** (number) - The Unix timestamp (in milliseconds) when the invoked workflow was started if this step is an invoke step. - **invokedWorkflowRunBody** (string) - The body passed to the invoked workflow if this step is an invoke step. - **invokedWorkflowRunHeaders** (string) - The HTTP headers passed to the invoked workflow if this step is an invoke step. #### Response Example ```json { "cursor": "", "events": [ { "time": 1738788333107, "state": "CREATED", "workflowRunId": "wfr_6MXE3GfddpBMWJM7s5WSRPqwcFm8", "workflowUrl": "http://my-workflow-server.com/workflowEndpoint", "workflowCreatedAt": 1738788333105, "stepInfo": { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_2KxeAKGVEjwDjNK1TVPormoRf7shRyNBpPThVbpvkuZNqri4cXp5nwSajNzAs6UWakvbco3qEPvtjQU3qxqjWarm2kisK", "concurrent": 1, "createdAt": 1738788333106 }, "nextDeliveryTime": 1738788333106 } ] } ``` ``` -------------------------------- ### Process Data Chunks with OpenAI GPT-4 (TypeScript) Source: https://upstash.com/docs/workflow/examples/allInOne Processes individual data chunks using OpenAI's GPT-4 model via the `context.api.openai.call` method. Each chunk is sent with a system prompt defining the AI's role and a user prompt containing the data to analyze. It specifies the model, messages, and maximum completion tokens. The results are stored in `processedChunks`. ```typescript for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) } ``` -------------------------------- ### Send Confirmation and Dispatch Emails (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This section covers sending essential communications to the customer. It includes sending an order confirmation email immediately after order placement and a notification email when the order has been dispatched. These actions enhance customer experience by providing timely updates. ```typescript await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) ``` ```python async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Upstash Workflow Next.js Example Source: https://upstash.com/docs/workflow/llms-txt Demonstrates implementing a workflow endpoint with the Upstash SDK for Next.js. It uses `serve` to handle a request payload and executes asynchronous tasks like `retrieveEmail` and `fetchFromLLm` in parallel, including a `sleep` function. ```javascript import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ``` -------------------------------- ### Access Request Payload in TypeScript Source: https://upstash.com/docs/workflow/llms-txt TypeScript example demonstrating how to access `context.requestPayload`. It highlights that accessing `context.requestPayload` directly before any steps, particularly during `context.call` execution, might result in it being undefined. ```typescript export const { POST } = serve(async (context) => { // Will print undefined while executing context.call const payload = context.requestPayload console.log(payload) // ... steps or any other code context.call( ... ) }) ``` -------------------------------- ### Create Astro Workflow Endpoint with Upstash Source: https://upstash.com/docs/workflow/quickstarts/astro Defines a workflow endpoint using the `@upstash/workflow/astro` library. This example demonstrates running two sequential steps within a workflow, passing data between them. It requires environment variables for configuration, which can be sourced from `import.meta.env` for local development and `process.env` for deployment. ```typescript import { serve } from "@upstash/workflow/astro"; export const { POST } = serve(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { // env must be passed in astro. // for local dev, we need import.meta.env. // For deployment, we need process.env: env: { ...process.env, ...import.meta.env } }) ``` -------------------------------- ### Verify Production Workflow Endpoint Source: https://upstash.com/docs/workflow/quickstarts/hono After deploying your Hono app to production, use this cURL command to make a POST request to your workflow endpoint. This verifies that the endpoint is accessible and functioning correctly in the production environment. ```bash curl -X POST https:///workflow ``` -------------------------------- ### Fetch Upstash Workflow Logs with Go net/http Source: https://upstash.com/docs/workflow/llms-txt Demonstrates fetching workflow logs from Upstash using Go's `net/http` package. It constructs a GET request to the QStash API, sets the necessary authorization header, and executes the request with basic error handling. Requires a bearer token. ```go req, err := http.NewRequest("GET", "https://qstash.upstash.io/v2/workflows/logs", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } def resp.Body.Close() ``` -------------------------------- ### GET /v2/workflows/dlq - List Workflow Runs in DLQ Source: https://upstash.com/docs/workflow/llms-txt Lists workflow runs currently in the Dead Letter Queue (DLQ) using a cURL request. Requires an authorization token. ```APIDOC ## GET /v2/workflows/dlq ### Description Lists workflow runs currently in the Dead Letter Queue (DLQ) using a cURL request. Includes the necessary authorization header. ### Method GET ### Endpoint https://qstash.upstash.io/v2/workflows/dlq ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Request Example ```bash curl https://qstash.upstash.io/v2/workflows/dlq \ -H "Authorization: Bearer " ``` ### Response #### Success Response (200) - **runs** (array) - A list of workflow runs in the DLQ. - **runId** (string) - The ID of the workflow run. - **workflowId** (string) - The ID of the workflow. - **error** (object) - Information about the error that caused the run to be placed in the DLQ. - **message** (string) - The error message. - **details** (string) - Additional error details. #### Response Example ```json { "runs": [ { "runId": "run_def456uvw", "workflowId": "wf_ghi789rst", "error": { "message": "Failed to process message", "details": "Timeout occurred during execution" } } ] } ``` ``` -------------------------------- ### Execute User Creation Workflow with TypeScript Source: https://upstash.com/docs/workflow/llms-txt Demonstrates a user onboarding workflow using TypeScript and Upstash Workflow. It includes creating a user, setting up Stripe accounts, sending welcome and trial-ending emails, and checking upgrade status. The workflow relies on asynchronous operations and timed delays. ```TypeScript import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` -------------------------------- ### Parallel Agent Execution and Aggregation in TypeScript Source: https://upstash.com/docs/workflow/agents/patterns/parallelization This TypeScript code defines a workflow that simultaneously invokes three distinct agent tasks (worker1, worker2, worker3) and then uses a fourth agent (aggregator) to synthesize their results. It leverages `Promise.all` for efficient parallel execution and `context.agents.task().run()` to manage agent interactions. The primary dependencies are the `@upstash/workflow` library and Next.js for serving the workflow. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Define worker agents const worker1 = context.agents.agent({ model, name: 'worker1', maxSteps: 1, background: 'You are an agent that explains quantum physics.', tools: {} }); const worker2 = context.agents.agent({ model, name: 'worker2', maxSteps: 1, background: 'You are an agent that explains relativity.', tools: {} }); const worker3 = context.agents.agent({ model, name: 'worker3', maxSteps: 1, background: 'You are an agent that explains string theory.', tools: {} }); // Await results const [result1, result2, result3] = await Promise.all([ context.agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(), context.agents.task({ agent: worker2, prompt: "Explain relativity." }).run(), context.agents.task({ agent: worker3, prompt: "Explain string theory." }).run(), ]); // Aggregating results const aggregator = context.agents.agent({ model, name: 'aggregator', maxSteps: 1, background: 'You are an agent that summarizes multiple answers.', tools: {} }); const task = await context.agents.task({ agent: aggregator, prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}` }) const finalSummary = await task.run(); console.log(finalSummary.text); }); ``` -------------------------------- ### Example Upstash Workflow API Request (Bash) Source: https://upstash.com/docs/workflow/llms-txt Sends a POST request to an Upstash Workflow API endpoint using `curl`. It includes a JSON payload with an ID and a question, demonstrating a typical interaction pattern. ```bash curl -XPOST https://qstash-workflow.vercel.app/api/example -d '{"id": "id_123", "question": "what is the meaning of life?"}' ``` -------------------------------- ### Send Invoice Email in TypeScript and Python Source: https://upstash.com/docs/workflow/examples/paymentRetry Demonstrates sending a payment confirmation email to the user, including invoice details and total cost obtained from the payment result. This step is performed after a successful charge and user unsuspension. ```typescript await context.run("send invoice email", async () => { await sendEmail( email, `Payment successful. Invoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); ``` ```python async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) ``` -------------------------------- ### Create Nuxt.js Workflow Endpoint Source: https://upstash.com/docs/workflow/llms-txt Defines a workflow endpoint in a Nuxt.js project's server/api directory using the Upstash Workflow SDK. It includes two example steps that are executed sequentially by the workflow. ```typescript import { serve } from "@upstash/workflow/h3" const { handler } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, ) export default handler; ``` -------------------------------- ### Example Notify Response Structure (JSON) Source: https://upstash.com/docs/workflow/llms-txt A sample JSON structure illustrating a successful response from the notify endpoint. It includes an array containing an object with a `waiter` object (detailing the resumed workflow) and a `messageId` for the sent message. ```json [ { "waiter": { "url": "https://my-workflow.com/path", "headers": { "Upstash-Workflow-Runid": [ "wfr_melCHYvPkVhDqIhhk2" ], "Upstash-Workflow-Url": [ "https://my-workflow.com/path" ] }, "deadline": 1729869206 }, "messageId": "msg_26hZCxxbG2TT3AnHEr1w" } ] ``` -------------------------------- ### Bulk Cancel Workflows (Go) Source: https://upstash.com/docs/workflow/rest/runs/bulk-cancel Go language example for initiating a bulk workflow run cancellation. It constructs an HTTP DELETE request with a JSON body containing workflow run IDs. ```go var data = strings.NewReader(`{ "workflowRunIds": [ "run_id_1", "run_id_2", "run_id_3" ] }`) req, err := http.NewRequest("DELETE", "https://qstash.upstash.io/v2/workflows/runs", data) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` -------------------------------- ### Minimal Workflow Endpoint (Next.js Pages Router - TypeScript) Source: https://upstash.com/docs/workflow/llms-txt A minimal example for defining a workflow endpoint in a Next.js Pages Router using `servePagesRouter` from `@upstash/workflow/nextjs`. It sets up a basic workflow with two sequential steps: `initial-step` and `second-step`. ```typescript import { servePagesRouter } from "@upstash/workflow/nextjs"; const { handler } = servePagesRouter( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) export default handler; ``` -------------------------------- ### Store Processed Images and Get URLs Source: https://upstash.com/docs/workflow/llms-txt Stores processed images in cloud storage and retrieves their URLs. This code snippet utilizes `Promise.all` for parallel processing and `context.run` for executing the `storeImage` function. ```APIDOC ```typescript const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) ``` ``` -------------------------------- ### Send Welcome Email (Python) Source: https://upstash.com/docs/workflow/llms-txt Sends a welcome email to a user upon trial initiation using Python. It leverages `context.run` to execute an asynchronous `send_email` function, passing the user's email and a welcome message. This function is defined within an async context. ```python async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### TypeScript: Validate Webhook Requests Source: https://upstash.com/docs/workflow/llms-txt Illustrates a TypeScript example for validating incoming webhook requests using a `validateRequest` function. This is a critical security measure to ensure the authenticity of webhook sources. ```typescript export const { POST } = serve(async (context) => { const payloadString = context.requestPayload; const headerPayload = context.headers; let event: WebhookEvent; try { event = await validateRequest(payloadString, headerPayload); } catch { return } // Next steps based on the event }) ``` -------------------------------- ### Avoid Non-Idempotent Functions Outside context.run (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates an incorrect pattern of calling non-idempotent functions outside of `context.run`, which can lead to inconsistencies. This example highlights the potential issue in a TypeScript workflow. ```typescript export const { POST } = serve<{ entryId: string }>(async (context) => { const { entryId } = context.requestPayload; // πŸ‘‡ Problem: Non-idempotent function outside context.run: const result = await getResultFromDb(entryId); if (result.return) { return; } // ... }) ``` -------------------------------- ### Apply Filters to Resized Images - TypeScript Source: https://upstash.com/docs/workflow/examples/imageProcessing Applies specified filters (grayscale, sepia, contrast) to resized images. It iterates through each resized image and each filter, making parallel calls to an external filtering service using Promise.all. The input includes the image URL and the filter name. ```typescript const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) ``` -------------------------------- ### Customer Onboarding Workflow (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Initializes a customer onboarding workflow in TypeScript using Upstash Workflow SDK. It handles user sign-up, email notifications, and periodic checks for user activity, utilizing context.run for atomic operations and context.sleep for delays. ```typescript import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ``` -------------------------------- ### Pause Workflow Until Timestamp Source: https://upstash.com/docs/workflow/llms-txt Demonstrates pausing a workflow until a specific future timestamp using `context.sleepUntil`. This example pauses execution until one week from the current date before sending an email. ```APIDOC ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // πŸ‘‡ Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // πŸ‘‡ Wait until the calculated date ``` ``` -------------------------------- ### Cancel Workflow Runs (TypeScript) Source: https://upstash.com/docs/workflow/basics/client Provides methods to cancel workflow runs. Runs can be cancelled individually by ID, by a URL prefix to cancel all runs starting with that URL, or all pending/active runs. ```typescript // cancel a single workflow await client.cancel({ ids: "" }); // cancel a set of workflow runs await client.cancel({ ids: ["", ""] }); ``` ```typescript await client.cancel({ urlStartingWith: "https://your-endpoint.com" }); ``` ```typescript await client.cancel({ all: true }); ``` -------------------------------- ### Pause Workflow Until Timestamp (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates pausing a workflow execution until a specific future timestamp using `context.sleepUntil`. This example pauses until one week from the current date before proceeding. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // πŸ‘‡ Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // πŸ‘‡ Wait until the calculated date ``` -------------------------------- ### Customize Initial Payload Parsing (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Defines a custom function for processing the initial request payload when an Upstash Workflow starts in TypeScript. The parsed data is then accessible via `context.requestPayload`. ```typescript type InitialPayload = { foo: string; bar: number; }; // πŸ‘‡ 1: provide initial payload type export const { POST } = serve( async (context) => { ``` -------------------------------- ### Cancel Workflow Runs by URL Prefix Source: https://upstash.com/docs/workflow/llms-txt Cancel all workflow runs that start with a given URL prefix. This method is effective for stopping all runs associated with a particular endpoint or service. ```APIDOC ```javascript await client.cancel({ urlStartingWith: "https://your-endpoint.com" }); ``` ``` -------------------------------- ### Fetch Workflow Logs with cURL Source: https://upstash.com/docs/workflow/rest/runs/logs This snippet demonstrates how to retrieve workflow logs using a cURL command. It requires an authorization token and targets the Upstash Workflows logs endpoint. No specific filtering is applied in this basic example. ```sh curl https://qstash.upstash.io/v2/workflows/logs \ -H "Authorization: Bearer " ``` -------------------------------- ### Notify Endpoint Response Example (JSON) Source: https://upstash.com/docs/workflow/llms-txt Illustrates a successful response from the notify endpoint, which is an array containing an object. This object includes a `waiter` detailing the resumed workflow and a `messageId` for the sent message. ```json [ { "waiter": { "url": "https://my-workflow.com/path", "headers": { "Upstash-Workflow-Runid": [ "wfr_melCHYvPkVhDqIhhk2" ], "Upstash-Workflow-Url": [ "https://my-workflow.com/path" ] }, "deadline": 1729869206 }, "messageId": "msg_26hZCxxbG2TT3AnHEr1w" } ] ``` -------------------------------- ### Process Payment (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This code block handles the payment processing for an order after stock availability has been confirmed. It uses the 'process-payment' context to execute the payment logic. Successful payment is a prerequisite for proceeding to the dispatch stage. ```typescript await context.run("process-payment", async () => { return await processPayment(orderId) }) ``` ```python async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) ``` -------------------------------- ### Get Workflow Logs Source: https://upstash.com/docs/workflow/basics/client Retrieves logs for workflow runs. This method supports filtering by workflow run ID, count, state, URL, creation timestamp, and allows for cursor-based pagination. ```APIDOC ## GET /workflow/runs/logs ### Description Retrieves logs for workflow runs with optional filtering and pagination. ### Method GET ### Endpoint /workflow/runs/logs ### Parameters #### Query Parameters - **workflowRunId** (string) - Optional - ID of the workflow run to get. - **count** (number) - Optional - Number of workflows to retrieve. - **state** (string) - Optional - Workflow state to filter for. One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED". - **workflowUrl** (string) - Optional - Workflow URL to search for (exact match). - **workflowCreatedAt** (number) - Optional - Unix timestamp when the run was created. - **cursor** (string) - Optional - Cursor from a previous request to continue the search. ### Response #### Success Response (200) - **cursor** (string) - A cursor for fetching the next page of results. - **runs** (array) - A list of workflow run objects. Each run object contains fields like `workflowRunId`, `workflowUrl`, `workflowState`, `workflowRunCreatedAt`, `workflowRunCompletedAt`, `failureFunction`, `dlqId`, `workflowRunResponse`, `invoker`, and `steps`. ### Response Example ```json { "cursor": "some-cursor-value", "runs": [ { "workflowRunId": "run-123", "workflowUrl": "https://example.com/workflow", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1678886400, "workflowRunCompletedAt": 1678886460, "failureFunction": null, "dlqId": null, "workflowRunResponse": { "output": "Success!" }, "invoker": null, "steps": [ { "type": "single", "steps": [ { "name": "step1", "status": "SUCCESS" } ] } ] } ] } ``` ``` -------------------------------- ### Create Upstash Workflow Endpoint (Next.js) Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example demonstrates how to serve an Upstash Workflow endpoint using the `serve` function from `@upstash/workflow/nextjs`. It defines sequential workflow steps within the `serve` callback, allowing you to define business logic for each step. This is useful for creating backend endpoints that execute workflows in response to requests. ```TypeScript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const result = await context.run("step-1", async () => { // define a piece of business logic as step 1 }); await context.run("step-2", async () => { // define another piece of business logic as step 2 }); }); ``` -------------------------------- ### Process Data Chunks with OpenAI GPT-4 (Python) Source: https://upstash.com/docs/workflow/examples/allInOne Iterates through data chunks, sending each to OpenAI's GPT-4 API using `context.call` for analysis. It includes a system message to define the AI's task and a user message with the chunk data. The API call specifies the model, messages, and `max_tokens`. Processed chunks are stored for aggregation. ```python for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) ``` -------------------------------- ### Get List of Waiters for an Event Source: https://upstash.com/docs/workflow/basics/client This method retrieves a list of all workflow runs that are currently waiting for a specific event. It requires the `eventId` to query for waiters and returns a list of `Waiter` objects. ```javascript import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ``` -------------------------------- ### Retrieve Image URL - TypeScript Source: https://upstash.com/docs/workflow/examples/imageProcessing Retrieves the URL of an uploaded image using its ID. It utilizes a context run command for asynchronous execution. No external dependencies are explicitly mentioned beyond the getImageUrl function. ```typescript const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) ``` -------------------------------- ### TypeScript: Send Welcome Email Source: https://upstash.com/docs/workflow/llms-txt This snippet sends a welcome email to the user upon trial initiation using TypeScript. It utilizes `context.run` to execute the `sendEmail` function, providing the user's email and a welcome message. ```typescript await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` -------------------------------- ### Get Flow Control Information Source: https://upstash.com/docs/workflow/llms-txt Retrieves information about a specific flow control key, including its size in the wait list. This is useful for monitoring and managing message processing parallelism. ```APIDOC ## GET /v2/flowControl/{flowControlKey} ### Description Get Information on Flow-Control ### Parameters #### Path Parameters - **flowControlKey** (string, required) - The key of the flow control. See the flow control documentation for more details. ### Response #### Success Response (200) { "flowControlKey": "", "waitListSize": 123 } - **flowControlKey** (string) - The key of of the flow control. - **waitListSize** (integer) - The number of messages in the wait list that waits for `parallelism` set in the flow control. ### Request Example ```bash curl -X GET https://qstash.upstash.io/v2/flowControl/YOUR_FLOW_CONTROL_KEY -H "Authorization: Bearer " ``` ``` -------------------------------- ### Create Workflow Endpoint with Hono Context Source: https://upstash.com/docs/workflow/quickstarts/hono Demonstrates creating a workflow endpoint with Hono that leverages Hono's native context. This allows access to environment variables and other Hono-specific features within the workflow handler. It requires `hono/adapter` for environment variable access and `WorkflowBindings` for workflow-specific types. ```typescript import { Hono } from "hono" import { serve, WorkflowBindings } from "@upstash/workflow/hono" import { env } from "hono/adapter" interface Bindings extends WorkflowBindings { ENVIRONMENT: "development" | "production" } const app = new Hono<{ Bindings: Bindings }>() app.post("/workflow", (c) => { // πŸ‘‡ access Honos native context, i.e. getting an env variable const { ENVIRONMENT } = env(c) // πŸ‘‡ `unknown` represents your initial payload data type const handler = serve( async (context) => { ... } ) return await handler(c) }) ``` -------------------------------- ### Create Hono Workflow Endpoint (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Defines a workflow endpoint using Hono and the Upstash Workflow serve function. It includes two example steps that log messages to the console. This requires the 'hono' and '@upstash/workflow/hono' packages. ```typescript import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` -------------------------------- ### Send Welcome Email (TypeScript and Python) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates sending a welcome email upon trial initiation using Upstash Workflow's `context.run` method. This functionality is available in both TypeScript and Python, ensuring cross-language compatibility for user onboarding. ```typescript await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` ```python async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` -------------------------------- ### Workflow Update Error Example Source: https://upstash.com/docs/workflow/llms-txt Illustrates the error message received when an in-progress workflow encounters an incompatible step name due to code changes. This typically results in a 400 Bad Request. ```APIDOC ## Workflow Update Error Example ### Description Illustrates the error message received when an in-progress workflow encounters an incompatible step name due to code changes. This typically results in a 400 Bad Request. ### Method (Not applicable, this describes an error response) ### Endpoint (Not applicable, this describes an error response) ### Response #### Error Response (400 Bad Request) - **Error Message**: Incompatible step name. Expected ``, got `` #### Response Example ``` HTTP status 400. Incompatible step name. Expected , got ``` ``` -------------------------------- ### Upstash Workflow Logs API Response Example (JSON) Source: https://upstash.com/docs/workflow/llms-txt A sample successful response from the Upstash Workflow logs API. This JSON object details a single workflow run, including its ID, URL, state, timestamps, and a breakdown of its execution steps. ```json { "cursor": "1686652644442-12", "runs": [ { "workflowRunId": "wfr_rj0Upr1rvdzGfz96fXNHh", "workflowUrl": "https://feasible-eft-notably.ngrok-free.app/api/call", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1736340463061, "workflowRunCompletedAt": 1736340464684, "steps": [ { "steps": [ { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_7YoJxFpwkEy5zBp378JgvD6YBDPBEqkBPje2JGTCEUiASMJQ1FwY9", "concurrent": 1, "state": "STEP_SUCCESS", "createdAt": 1736340463064 } ], "type": "sequential" }, { "steps": [ { "stepId": 1, "stepName": "external call", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNCtiJGNsULmt63vFfcZxQ3sfYFKLZe2dKww4BSb2kVF", "out": "1", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340464111 }, { "stepId": 2, "stepName": "external call 2", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNB882AMRP1TsgzpygELRcLWep4ACNTTsCHhrZuaNLij", "out": "2", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340463895 } ], "type": "parallel" } ] } ] } ``` -------------------------------- ### Verify Production Workflow Endpoint with Curl Source: https://upstash.com/docs/workflow/quickstarts/express This snippet demonstrates how to verify your deployed workflow endpoint using `curl`. It sends a POST request with JSON content to your production URL. Ensure that your deployment URL is correctly substituted. ```bash curl -X POST /workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` -------------------------------- ### TypeScript: Customer Onboarding Workflow with Email and Delays Source: https://upstash.com/docs/workflow/llms-txt Defines a customer onboarding workflow that sends a welcome email, waits for 3 days, and then enters a loop to check user activity. It utilizes `context.run` for atomic operations and `context.sleep` for introducing delays. ```typescript import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` -------------------------------- ### TypeScript: Pause Workflow Until Timestamp Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example showcases how to use `context.sleepUntil` to pause workflow execution until a specified timestamp. It calculates a future date and waits until that time before sending a welcome email. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve < User > (async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // πŸ‘‡ Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // πŸ‘‡ Wait until the calculated date await context.sleepUntil("wait-for-one-week", oneWeekFromNow); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` -------------------------------- ### Define Basic Workflow Endpoint in FastAPI Source: https://upstash.com/docs/workflow/quickstarts/fastapi This snippet demonstrates how to create a basic workflow endpoint in a FastAPI application using Upstash Workflow. It defines two sequential steps, 'initial-step' and 'second-step', to be executed within the workflow. No external dependencies are explicitly required beyond FastAPI and Upstash Workflow. ```python from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` -------------------------------- ### Configure Local Tunnel in .env.local for Nuxt.js Source: https://upstash.com/docs/workflow/quickstarts/nuxt Configures the `.env.local` file for use with a local tunnel, connecting to the production QStash service. It requires your QStash token and the public URL provided by the local tunnel service (UPSTASH_WORKFLOW_URL). This allows logs to appear in the Upstash Console. ```txt QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL= ``` -------------------------------- ### Fetch Workflow Logs with Go Source: https://upstash.com/docs/workflow/llms-txt Demonstrates fetching workflow logs using Go's standard `net/http` package. It constructs a GET request to the Upstash QStash workflows logs endpoint, sets the Authorization header, and executes the call with basic error handling for network operations. ```go req, err := http.NewRequest("GET", "https://qstash.upstash.io/v2/workflows/logs", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) ``` -------------------------------- ### Expose Multiple Workflows with serveMany in Next.js (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates using `serveMany` in a Next.js catch-all route to expose multiple workflows. It shows an example where `workflowOne` invokes `workflowTwo`, requiring both to be exposed in the same `serveMany` endpoint for proper interaction. ```typescript import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow, serveMany } from "@upstash/workflow/nextjs"; const workflowOne = createWorkflow(async (context) => { await context.run("say hi", () => { console.log("workflow one says hi!") }) const { body, isCanceled, isFailed } = await context.invoke("invoking other", { workflow: workflowTwo, body: "hello from workflow one", }) console.log(`received response from workflowTwo: ${body}`) }) const workflowTwo = createWorkflow(async (context: WorkflowContext) => { await context.run("say hi", () => { console.log("workflowTwo says hi!") console.log(`received: '${context.requestPayload}' in workflowTwo`) }) return "Workflow two finished!" }) // Example of exposing them: // export const { POST } = serveMany([ // workflowOne, // workflowTwo // ]); ``` -------------------------------- ### Connect and Use Redis CLI with Upstash Redis Source: https://upstash.com/docs/workflow/llms-txt Demonstrates how to connect to an Upstash Redis database using the redis-cli and execute basic commands like SET, GET, and INCR. This requires the redis-cli tool and your database credentials (password, endpoint, port). ```Redis CLI > redis-cli --tls -a PASSWORD -h ENDPOINT -p PORT ENDPOINT:PORT> set counter 0 OK ENDPOINT:PORT> get counter "0" ENDPOINT:PORT> incr counter (int) 1 ENDPOINT:PORT> incr counter (int) 2 ``` -------------------------------- ### Fetch Workflow Logs with Node.js Fetch API Source: https://upstash.com/docs/workflow/llms-txt Retrieves workflow logs using the native Fetch API by sending a GET request to the QStash API's logs endpoint with an authorization token. ```APIDOC ```javascript const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", { headers: { Authorization: "Bearer ", }, }); ``` ``` -------------------------------- ### Python Workflow Endpoint Example Source: https://upstash.com/docs/workflow/llms-txt Defines a Python API endpoint for an Upstash Workflow using `@serve.post`. It demonstrates running asynchronous steps within the workflow, logging, and accessing results from previous steps. Uses `AsyncWorkflowContext` for managing workflow execution. ```python @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ``` -------------------------------- ### Store Processed Images - Python Source: https://upstash.com/docs/workflow/examples/imageProcessing Stores processed image URLs in cloud storage. It defines an internal async function to handle the storage of a single image and then iterates through all processed images, calling this function via context.run to store each one and collect their URLs. ```python async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ``` -------------------------------- ### Create Express.js Workflow Endpoint (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Example of creating a workflow endpoint using Express.js and the @upstash/workflow library. This defines a '/workflow' POST route that executes a two-step workflow. It requires 'express', 'dotenv', and '@upstash/workflow/express'. ```typescript import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string } >( async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) } ) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ``` -------------------------------- ### Run Local Cloudflare Worker Development Server Source: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers This command starts a local development server for your Cloudflare Worker project using `wrangler`. It allows you to test your workflow endpoints locally before deploying. The output will typically include a local URL for your endpoint and information about environment variable bindings like `QSTASH_TOKEN`. ```bash npm run wrangler dev ``` -------------------------------- ### Configure Workflow Retry Delay Source: https://upstash.com/docs/workflow/basics/serve Shows how to specify the delay between retries for a workflow using the `retryDelay` option. The delay is specified in milliseconds and can be a static value or a dynamic expression. This example is for Next.js (TypeScript). ```typescript export const { POST } = serve( async (context) => { ... }, { retryDelay: "(retried + 1) * 1000" // delay in milliseconds } ); ``` -------------------------------- ### Dynamic Base URL for Workflow Endpoint (JavaScript) Source: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs Dynamically constructs the base URL for the workflow endpoint based on the environment. It uses VERCEL_URL for production deployments and falls back to localhost for local development. This helps in managing different environment configurations for workflow triggers. ```javascript const BASE_URL = process.env.VERCEL_URL ? `https://${process.env.VERCEL_URL}` : `http://localhost:3000` const { workflowRunId } = await client.trigger({ url: `${BASE_URL}/api/workflow`, retries: 3, }); ``` -------------------------------- ### Verify Stock Availability (TypeScript, Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment This snippet checks if items in an order are available in stock. It first generates an order ID and then calls a stock checking function. If stock is not available, it logs a warning and halts the process. This prevents orders from being placed for out-of-stock items. ```typescript const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } ``` ```python async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ``` -------------------------------- ### Orchestrator-Workers Setup with Upstash Workflow Source: https://upstash.com/docs/workflow/agents/patterns/orchestrator-workers This TypeScript code sets up an orchestrator workflow using Upstash Workflow and Next.js. It defines multiple worker agents, each specialized in a different domain (general physics, quantum mechanics, relativity), and uses a Wikipedia tool for information retrieval. The orchestrator then synthesizes the results from these workers into a Q&A format. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); // Worker agents const worker1 = context.agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = context.agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = context.agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = context.agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ``` -------------------------------- ### Next.js API Route with Workflow Steps (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example shows how to define a Next.js API route using the Upstash Workflow SDK. It includes executing workflow steps like 'create-backup' and 'upload-backup', and demonstrates how to configure a `failureFunction` for error handling in your workflows. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { createBackup, uploadBackup } from "./utils"; export const { POST } = serve( async (ctx) => { const backup = await ctx.run("create-backup", async () => { return await createBackup(); }); await ctx.run("upload-backup", async () => { await uploadBackup(backup); }); }, { failureFunction({ context, failStatus, failResponse, failHeader }) { // immediately get notified for failed backups // i.e. send an email, log to Sentry }, } ); ``` -------------------------------- ### Monitor Workflow Execution Concurrently (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Shows how to monitor and execute multiple workflow tasks concurrently in TypeScript. This example retrieves emails and fetches data from an LLM in parallel using `Promise.all`, optimizing workflow performance. ```typescript import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ``` -------------------------------- ### Fetch Workflow Logs with Node.js fetch API Source: https://upstash.com/docs/workflow/rest/runs/logs This snippet shows how to retrieve workflow logs using the native `fetch` API in a Node.js environment. It makes a GET request to the logs endpoint and includes the necessary Authorization header. ```javascript const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", { headers: { Authorization: "Bearer ", }, }); ``` -------------------------------- ### Define Basic Workflow Endpoint with Flask Source: https://upstash.com/docs/workflow/quickstarts/flask Creates a Flask application and defines a basic workflow endpoint '/api/workflow'. This endpoint consists of two sequential steps, '_step1' and '_step2', which are automatically retried on failure. Dependencies include Flask and Upstash Workflow's Flask integration. ```python from flask import Flask from upstash_workflow.flask import Serve app = Flask(__name__) serve = Serve(app) @serve.route("/api/workflow") def workflow(context) -> None: def _step1() -> None: print("initial step ran") context.run("initial-step", _step1) def _step2() -> None: print("second step ran") context.run("second-step", _step2) ``` -------------------------------- ### Cancel Workflow Runs (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Provides JavaScript examples for canceling Upstash workflow runs. It supports canceling individual runs by their ID or canceling multiple runs based on a URL prefix. ```javascript // cancel a single workflow await client.cancel({ ids: "" }); // cancel a set of workflow runs await client.cancel({ ids: ["", ""] }); ``` ```javascript await client.cancel({ urlStartingWith: "https://your-endpoint.com" }); ``` -------------------------------- ### Running Cloudflare Worker Locally with Wrangler Source: https://upstash.com/docs/workflow/llms-txt This command starts your Cloudflare Worker locally using `wrangler dev`. It makes your workflow endpoint accessible, typically at `http://localhost:8787`, allowing you to test your workflow during development. ```APIDOC ```bash npm run wrangler dev ``` ``` -------------------------------- ### E-commerce Order Fulfillment Workflow (TypeScript) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Implements an e-commerce order fulfillment workflow using Upstash Workflow in a Next.js environment. It defines a series of steps including order creation, stock verification, payment processing, dispatch, and customer notifications. Dependencies include @upstash/workflow/nextjs. ```typescript import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` -------------------------------- ### Retrieve Image URL - Python Source: https://upstash.com/docs/workflow/examples/imageProcessing Retrieves the URL of an uploaded image using its ID asynchronously. It defines an internal async function for the retrieval and executes it via context.run. Type hints indicate a string return type for the URL. ```python async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) ``` -------------------------------- ### Implement Evaluator-Optimizer LLM Workflow (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates the Evaluator-Optimizer pattern using JavaScript in Upstash Workflow. It sets up generator and evaluator agents to iteratively refine text generation based on feedback. This example requires OpenAI API access and the '@upstash/workflow/nextjs' package. ```javascript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = context.agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = context.agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ``` -------------------------------- ### Implement Image Processing Workflow with Upstash Workflow (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt This TypeScript example defines an Upstash Workflow for image processing. It retrieves an image, resizes it to multiple resolutions, applies filters, and stores the processed images. It uses `context.run` for internal operations and `context.call` for external service interactions, demonstrating parallel execution. ```typescript import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }> async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { ``` -------------------------------- ### Bulk Cancel Workflows (Node.js) Source: https://upstash.com/docs/workflow/rest/runs/bulk-cancel Node.js example for bulk canceling workflow runs using the fetch API. It sends a DELETE request to the QStash API with workflow run IDs in the JSON body. ```js const response = await fetch('https://qstash.upstash.io/v2/workflows/runs', { method: 'DELETE', headers: { 'Authorization': 'Bearer ', 'Content-Type': 'application/json', body: { workflowRunIds: [ "run_id_1", "run_id_2", "run_id_3", ], }, } }); ``` -------------------------------- ### QStash CLI Development Server Output and Usage Source: https://upstash.com/docs/workflow/llms-txt Shows typical output from the QStash CLI when its local development server is running, including URLs, credentials, and a sample publish request. ```APIDOC ## QStash CLI Development Server Output and Usage ### Description This snippet shows the typical output from the QStash CLI when its local development server is running. It provides the URL, default user credentials (QSTASH_TOKEN), and signing keys necessary for local testing. It also includes a sample cURL request for publishing messages. ### Method (N/A - CLI Output) ### Endpoint (N/A - CLI Output) ### Parameters (N/A - CLI Output) ### Request Example (Publish Message) ```bash curl -X POST http://127.0.0.1:8080/v2/publish/https://example.com -H "Authorization: Bearer eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0=" ``` ### Response #### Success Response (CLI Output) - **Server URL**: http://127.0.0.1:8080 - **QSTASH_TOKEN**: eyJVc2VySUQiOiJkZWZhdWx0VXNlciIsIlBhc3N3b3JkIjoiZGVmYXVsdFBhc3N3b3JkIn0= - **QSTASH_CURRENT_SIGNING_KEY**: sig_7RvLjqfZBvP5KEUimQCE1pvpLuou - **QSTASH_NEXT_SIGNING_KEY**: sig_7W3ZNbfKWk5NWwEs3U4ixuQ7fxwE #### Response Example (Publish Message) (Success response for publish message is typically a status confirmation and message ID, but not detailed here.) ``` -------------------------------- ### Fetch Workflow Logs with Node.js Fetch API (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt Retrieves workflow logs by making a GET request to the QStash API logs endpoint using the native Fetch API in Node.js. Requires an authorization token. ```javascript const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", { headers: { Authorization: "Bearer ", }, }); ``` -------------------------------- ### Create Workflow Endpoint in Next.js (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Defines a workflow endpoint within a Next.js application using the `@upstash/workflow/nextjs` library. This example illustrates two sequential steps, 'initial-step' and 'second-step', executed within the workflow context. ```typescript import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) ``` -------------------------------- ### AI Data Processing with Upstash Workflow (JavaScript) Source: https://upstash.com/docs/workflow/llms-txt This example demonstrates advanced AI data processing using Upstash Workflow. The workflow downloads a dataset, processes it in chunks using OpenAI's GPT-4 model, aggregates results, and generates a report. It serves as a high-level overview with a placeholder for the actual implementation. ```javascript /* This example demonstrates advanced AI data processing using Upstash Workflow. The following example workflow downloads a large dataset, processes it in chunks using OpenAI’s GPT-4 model, aggregates the results and generates a report. Use Case: Our workflow will: 1. Receive a request to process a dataset 2. Download the dataset from a remote source 3. Process the data in chunks using OpenAI 4. Aggregate results 5. Generate and send a final report */ // Placeholder for actual workflow implementation console.log('Upstash Workflow AI Generation Example'); ``` -------------------------------- ### Suspend User in Python Source: https://upstash.com/docs/workflow/examples/paymentRetry This Python code snippet demonstrates how to suspend a user's account if their payment fails. It first checks the suspension status and then proceeds to suspend the user and send a notification email if they are not already suspended. ```python # suspend user if the user isn't suspended is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` -------------------------------- ### Workflow Logs API Response Example (JSON) Source: https://upstash.com/docs/workflow/llms-txt A sample JSON response from the Upstash Workflow logs API, detailing a single workflow run. This structure is useful for understanding workflow execution data, including ID, URL, state, timestamps, and steps. ```json { "cursor": "1686652644442-12", "runs": [ { "workflowRunId": "wfr_rj0Upr1rvdzGfz96fXNHh", "workflowUrl": "https://feasible-eft-notably.ngrok-free.app/api/call", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1736340463061 } ] } ``` -------------------------------- ### Cancel Workflow Run via HTTP Request (Python) Source: https://upstash.com/docs/workflow/rest/runs/cancel Provides a Python example for cancelling a workflow run using the 'requests' library. It constructs a DELETE request with the necessary authorization header. ```python import requests headers = { 'Authorization': 'Bearer ', } response = requests.delete( 'https://qstash.upstash.io/v2/workflows/runs/wfr_TzazoUCuZmFGp2u9cdy5K', headers=headers ) ``` -------------------------------- ### E-commerce Order Fulfillment Workflow (Python) Source: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment Implements an e-commerce order fulfillment workflow using Upstash Workflow with FastAPI. It defines sequential steps for order processing, including ID creation, stock checks, payment, dispatch, and notifications. Requires upstash_workflow and fastapi libraries. ```python from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` -------------------------------- ### Resize Image to Multiple Resolutions - TypeScript Source: https://upstash.com/docs/workflow/examples/imageProcessing Resizes an image to predefined resolutions (640, 1280, 1920) by calling an external image processing service. It uses Promise.all to execute these calls in parallel for efficiency. The input requires the image URL and desired width, and the output is expected to be of type ImageResult. ```typescript const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) ``` -------------------------------- ### Schedule Weekly User Summary on Signup (Python) Source: https://upstash.com/docs/workflow/howto/schedule Schedules a weekly summary email for a user to be sent starting 7 days after their signup using Python and FastAPI. It calculates the first summary date and creates a QStash schedule with a unique ID for idempotency. Dependencies include QStash Python SDK and FastAPI. ```python from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ``` -------------------------------- ### Bulk Cancel Workflows (cURL) Source: https://upstash.com/docs/workflow/rest/runs/bulk-cancel Example of how to cancel multiple workflow runs using cURL. This command targets the QStash API endpoint for canceling workflow runs, specifying criteria in the request body. ```sh curl -XDELETE https://qstash.upstash.io/v2/workflows/runs \ -H "Content-Type: application/json" \ -H "Authorization: Bearer " \ -d '{"workflowUrl": "https://example.com"}' ``` -------------------------------- ### Trigger Upstash Workflow via HTTP POST Source: https://upstash.com/docs/workflow/llms-txt This example demonstrates how to trigger an Upstash workflow using an HTTP POST request with the Fetch API in JavaScript. It shows how to include a JSON body and custom headers. ```APIDOC ## POST // ### Description Triggers an Upstash workflow via an HTTP POST request. ### Method POST ### Endpoint `//` ### Parameters #### Path Parameters - **YOUR_WORKFLOW_ENDPOINT** (string) - Required - The endpoint for your workflow. - **YOUR-WORKFLOW-ROUTE** (string) - Required - The specific route for your workflow. #### Query Parameters None #### Request Body - **foo** (string) - Example field for the request body. ### Request Example ```javascript await fetch("https:///", { method: "POST", body: JSON.stringify({ "foo": "bar" }), headers: { "my-header": "foo" } }); ``` ### Response #### Success Response (200) - **status** (string) - Indicates the status of the workflow trigger. #### Response Example ```json { "status": "triggered" } ``` ``` -------------------------------- ### Trigger Workflow via HTTP Request (curl) Source: https://upstash.com/docs/workflow/llms-txt Provides an example of how to trigger a workflow endpoint using a `curl` command. This method is suitable for quick testing and demonstrates sending a POST request with custom headers and a JSON body. ```bash curl -X POST https:/// \ -H "my-header: foo" \ -d '{"foo": "bar"}' ``` -------------------------------- ### Trigger Workflow via HTTP POST (fetch TypeScript) Source: https://upstash.com/docs/workflow/howto/start Initiates a workflow run by sending an HTTP POST request using the browser's `fetch` API in TypeScript. This involves specifying the workflow endpoint URL, setting the method to POST, providing the request body as a JSON string, and including any necessary headers. This approach is useful for testing workflows from a web application. ```javascript await fetch("https:///", { method: "POST", body: JSON.stringify({ "foo": "bar" }), headers: { "my-header": "foo" } }); ``` -------------------------------- ### TypeScript: Upstash Workflow Image Processing Example Source: https://upstash.com/docs/workflow/llms-txt A TypeScript workflow for image processing that retrieves an image URL, resizes it to multiple resolutions, applies filters, and stores the processed images. It is designed for sequential execution due to current limitations and uses `context.call` for external service interactions. ```typescript import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` -------------------------------- ### Email Analyzer Agent Example (TypeScript) Source: https://upstash.com/docs/workflow/llms-txt Demonstrates creating an email analysis agent using Upstash Workflow. This agent can classify emails and suggest responses by defining and using tools for analysis and response generation. It requires the '@upstash/workflow' package. ```typescript import { Agent } from '@upstash/workflow'; // Example agent for email analysis const emailAnalyzerAgent = new Agent({ name: 'email-analyzer', description: 'Analyzes email content, classifies it, and suggests responses.', // ... other configurations }); // Define tools for email analysis emailAnalyzerAgent.addTool({ name: 'analyzeEmailContent', description: 'Analyzes the content of an email.', parameters: { type: 'object', properties: { emailBody: { type: 'string', description: 'The body of the email' }, }, required: ['emailBody'], }, handler: async ({ emailBody }) => { // Implementation for email content analysis console.log('Analyzing email content...'); return { classification: 'Inquiry', sentiment: 'Positive' }; }, }); emailAnalyzerAgent.addTool({ name: 'generateResponse', description: 'Generates a response to an email.', parameters: { type: 'object', properties: { analysis: { type: 'object', description: 'The analysis result of the email' }, }, required: ['analysis'], }, handler: async ({ analysis }) => { // Implementation for generating response console.log('Generating response...'); return 'Thank you for your inquiry. We will get back to you shortly.'; }, }); // Example of using the agent async function processEmail() { const emailContent = 'Hello, I have a question about your product.'; const analysis = await emailAnalyzerAgent.call('analyzeEmailContent', { emailBody: emailContent }); const response = await emailAnalyzerAgent.call('generateResponse', { analysis }); console.log('Response:', response); } ``` -------------------------------- ### Schedule Workflows with Cron Expressions (TypeScript) Source: https://context7.com/context7/upstash-workflow/llms.txt This example shows how to schedule recurring workflow runs using cron expressions with the Upstash QStash client. It demonstrates creating schedules for daily backups, weekly reports, and per-user recurring tasks. Requires `@upstash/qstash` package. ```typescript import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); // Schedule daily backup at 2:00 AM await client.schedules.create({ scheduleId: "daily-backup", destination: "https://myapp.com/api/workflow/backup", cron: "0 2 * * *", body: { backupType: "full", retention: 30 } }); // Schedule weekly report every Monday at 9:00 AM await client.schedules.create({ scheduleId: "weekly-report", destination: "https://myapp.com/api/workflow/generate-report", cron: "0 9 * * 1", body: { reportType: "weekly_summary" } }); // Per-user schedule: weekly summary starting 7 days after signup const user = await signUpUser({ email: "user@example.com", name: "Jane" }); const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); const userCron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https://myapp.com/api/workflow/user-summary", body: { userId: user.id, email: user.email }, cron: userCron }); ``` -------------------------------- ### Define Python Onboarding Workflow (Python) Source: https://upstash.com/docs/workflow/llms-txt Defines an asynchronous onboarding workflow using Upstash Workflow and FastAPI. It includes sending welcome emails, pausing for three days, calling an AI service, and sending follow-up emails. ```python from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from email_utils import send_email app = FastAPI() serve = Serve(app) class UserProfile(TypedDict): name: str email: str async def onboarding_workflow(context: AsyncWorkflowContext, user_profile: UserProfile): await context.run(send_email(user_profile['email'], 'Welcome!', 'Welcome to our service!')) await context.sleep(days=3) ai_message = await context.call("ai_service", "generate_message", name=user_profile['name']) await context.run(send_email(user_profile['email'], 'Follow-up', ai_message)) serve.add_workflow(onboarding_workflow) ``` -------------------------------- ### Implement Generator-Evaluator Loop with Upstash Workflow (TypeScript) Source: https://upstash.com/docs/workflow/agents/patterns/evaluator-optimizer This TypeScript code defines a workflow using Upstash agents for generation and evaluation. It iteratively generates content, evaluates it, and revises based on feedback until a passing evaluation is achieved. Dependencies include '@upstash/workflow/nextjs'. ```typescript import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = context.agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = context.agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ``` -------------------------------- ### Fetch Message Logs using Python Source: https://upstash.com/docs/workflow/rest/runs/message-logs This Python code snippet illustrates fetching message logs from the Upstash Workflow API using the requests library. It configures the necessary headers, including the authorization token, and sends a GET request to the specified endpoint. ```python import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/messageLogs', headers=headers ) ``` -------------------------------- ### Schedule Weekly User Summary on Signup (TypeScript) Source: https://upstash.com/docs/workflow/howto/schedule Schedules a weekly summary email for a user to be sent starting 7 days after their signup. It calculates the first summary date and creates a QStash schedule with a unique ID for idempotency. Dependencies include Upstash QStash client and utility functions for user sign-up. ```typescript import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Simulate user registration const user = await signUp(userData); // Calculate the date for the first summary (7 days from now) const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // Create cron expression for weekly summaries starting 7 days from signup const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ```