TITLE: Implementing Image Processing Workflow with Upstash Workflow in TypeScript DESCRIPTION: This comprehensive TypeScript example defines an Upstash Workflow that orchestrates image processing. It handles retrieving an image, resizing it to multiple resolutions, applying various filters, and finally storing the processed images in cloud storage. It utilizes `context.run` for internal operations and `context.call` for interacting with external image processing services, demonstrating parallel execution with `Promise.all`. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` ---------------------------------------- TITLE: Upstash Workflow for Payment Retries and Account Management DESCRIPTION: This comprehensive Upstash Workflow orchestrates a payment retry process. It attempts to charge a user, retries up to three times with a 24-hour delay if payment fails, and manages user account suspension and invoice emails based on the payment outcome. It integrates with Upstash Workflow's `serve` and `context` functionalities to ensure durable execution and state management. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; type ChargeUserPayload = { email: string; }; export const { POST } = serve(async (context) => { const { email } = context.requestPayload; for (let i = 0; i < 3; i++) { // attempt to charge the user const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Unsuspend User const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } // send invoice email await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); // by retuning, we end the workflow run return; } } // suspend user if the user isn't suspended const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } }); async function sendEmail(email: string, content: string) { // Implement the logic to send an email console.log("Sending email to", email, "with content:", content); } async function checkSuspension(email: string) { // Implement the logic to check if the user is suspended console.log("Checking suspension status for", email); return true; } async function suspendUser(email: string) { // Implement the logic to suspend the user console.log("Suspending the user", email); } async function unsuspendUser(email: string) { // Implement the logic to unsuspend the user console.log("Unsuspending the user", email); } async function chargeCustomer(attempt: number) { // Implement the logic to charge the customer console.log("Charging the customer"); if (attempt <= 2) { throw new Error("Payment failed"); } return { invoiceId: "INV123", totalCost: 100, } as const; } ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import TypedDict, Optional from dataclasses import dataclass from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class ChargeResult: invoice_id: str total_cost: float class ChargeUserPayload(TypedDict): email: str async def send_email(email: str, content: str) -> None: # Implement the logic to send an email print("Sending email to", email, "with content:", content) async def check_suspension(email: str) -> bool: # Implement the logic to check if the user is suspended print("Checking suspension status for", email) return True async def suspend_user(email: str) -> None: # Implement the logic to suspend the user print("Suspending the user", email) async def unsuspend_user(email: str) -> None: # Implement the logic to unsuspend the user print("Unsuspending the user", email) async def charge_customer(attempt: int) -> Optional[ChargeResult]: # Implement the logic to charge the customer print("Charging the customer") if attempt <= 2: raise Exception("Payment failed") return ChargeResult(invoice_id="INV123", total_cost=100) @serve.post("/payment-retries") async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None: email = context.request_payload["email"] async def _check_suspension() -> bool: return await check_suspension(email) for i in range(3): # attempt to charge the user async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Unsuspend User is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) # send invoice email async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) # by returning, we end the workflow run return # suspend user if the user isn't suspended is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` ---------------------------------------- TITLE: Complete AI Data Processing Workflow with Upstash (Python) DESCRIPTION: This Python snippet implements an Upstash Workflow using FastAPI to manage an AI data processing pipeline. It includes steps for downloading datasets, processing data in chunks with OpenAI's GPT-4, aggregating results periodically, and generating/sending a final report. It utilizes `context.run` for idempotent operations and `context.call` for making long-timeout HTTP requests. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( openai_response.body["choices"][0]["message"]["content"] ) # Every 10 chunks, we'll aggregate intermediate results if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) # Step 3: Generate and send data report async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` ---------------------------------------- TITLE: Executing Business Logic in context.run - TypeScript DESCRIPTION: Demonstrates placing business logic inside `context.run` for each step in an Upstash Workflow. Code outside `context.run` runs multiple times, while code inside runs once per step. Shows how to pass results between steps. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) ``` ---------------------------------------- TITLE: Implementing Auth Provider Webhook Workflow with Upstash DESCRIPTION: This comprehensive workflow processes webhook events from an authentication provider to manage user accounts. It orchestrates user creation in a database and Stripe, initiates a 14-day trial, and sends a series of automated emails for welcome, activity reminders, and trial status updates, including warnings and end notifications. The workflow leverages `context.run` for atomic operations and `context.sleep` for timed delays. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext, email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10_000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` LANGUAGE: Python CODE: ``` from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: # Implement logic to create a new user in Stripe print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: # Implement logic to start a trial in Stripe print("Starting a trial of 14 days in Stripe for", email) async def get_user_stats(userid: str) -> UserStats: # Implement logic to get user stats print("Getting user stats for", userid) return {"total_problems_solved": 10000, "most_interested_topic": "Python"} async def check_upgraded_plan(email: str) -> bool: # Implement logic to check if the user has upgraded the plan print("Checking if the user has upgraded the plan", email) return False async def send_email(email: str, content: str) -> None: # Implement logic to send an email print("Sending email to", email, content) async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) # get user stats and send email with them async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) # wait until there are two days to the end of trial period and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` ---------------------------------------- TITLE: Scheduling Weekly User Summaries on Sign-Up (TypeScript) DESCRIPTION: This TypeScript snippet defines a Next.js API route for user sign-up. It registers a user, calculates a future date (7 days from sign-up), generates a CRON expression, and schedules a weekly account summary using `@upstash/qstash` client, ensuring each user receives their first report after 7 days. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: typescript CODE: ``` import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Simulate user registration const user = await signUp(userData); // Calculate the date for the first summary (7 days from now) const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // Create cron expression for weekly summaries starting 7 days from signup const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` ---------------------------------------- TITLE: Chaining LLM Agents for Sequential Tasks in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript code demonstrates how to chain multiple Large Language Model (LLM) agents within an Upstash Workflow to perform a sequence of tasks. It initializes three distinct OpenAI `gpt-3.5-turbo` agents, each with a specific role: listing physicists, describing their work using a Wikipedia tool, and summarizing the descriptions. The output of one agent's task execution is used as the input for the subsequent agent, illustrating a prompt chaining pattern. SOURCE: https://upstash.com/docs/workflow/agents/patterns/prompt-chaining.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const agent1 = context.agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = context.agents.agent({ model, name: 'secondAgent', // set to 2 as this agent will first request tools // and then summarize them: maxSteps: 2, background: 'You are an agent that describes the work of' + ' the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = context.agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the ' + 'works of the physicists mentioned previously.', tools: {} }); // Chaining agents const firstOutput = await context.agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await context.agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await context.agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ``` ---------------------------------------- TITLE: Customer Onboarding Workflow Initialization - TypeScript/Python DESCRIPTION: This comprehensive snippet defines the entire customer onboarding workflow. It initializes the process by registering a new user and sending a welcome email, then pauses for three days. Subsequently, it enters an infinite loop to periodically check the user's activity state (active/non-active) and send appropriate follow-up emails, pausing for a month between checks. It uses `context.run` for atomic operations and `context.sleep` for non-blocking delays. SOURCE: https://upstash.com/docs/workflow/examples/customerOnboarding.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` ---------------------------------------- TITLE: Performing HTTP Calls with context.call - Upstash Workflow - TypeScript DESCRIPTION: This snippet demonstrates how to use `context.call` within an Upstash Workflow to make an HTTP POST request to the OpenAI API. It highlights the ability to handle long response times (up to 15 minutes or 2 hours depending on plan) and shows how to pass request body, headers, and retrieve response status, headers, and body. This method is suitable for operations that might cause a normal serverless function to timeout. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { status, // response status headers, // response headers body, // response body } = await context.call( "generate-long-essay", // Step name { url: "https://api.openai.com/v1/chat/completions", // Endpoint URL method: "POST", body: { // Request body model: "gpt-4o", messages: [ { role: "system", content: "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, { role: "user", content: request.topic }, ], }, headers: { // request headers authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); }); ``` ---------------------------------------- TITLE: Triggering Workflow Runs with Upstash Workflow Client (TypeScript) DESCRIPTION: This snippet demonstrates how to initiate a new workflow run using the `client.trigger` method. It shows how to specify the workflow URL, an optional request body, headers, a custom workflow run ID, retry attempts for the initial request, a delay, and flow control parameters for rate limiting and parallelism. The method returns the `workflowRunId` of the newly started run. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3, // optional retries in the initial request delay: "10s", // optional delay value flowControl: { // optional flow control key: "USER_GIVEN_KEY", rate: 10, parallelism: 5, period: "10m" }, }) console.log(workflowRunId) // prints wfr_my-workflow ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Route with Next.js SDK DESCRIPTION: This snippet demonstrates how to define an Upstash Workflow route using the `@upstash/workflow/nextjs` SDK. It shows how to use `context.sleep` for a delay and `context.run` to execute asynchronous operations like `retrieveEmail` and `fetchFromLLm` in parallel using `Promise.all`. It processes an input payload of type `UserRequest`. SOURCE: https://upstash.com/docs/workflow/howto/monitor.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ``` ---------------------------------------- TITLE: Implementing E-commerce Order Fulfillment Workflow DESCRIPTION: This comprehensive code example demonstrates the full e-commerce order fulfillment workflow using Upstash Workflow. It orchestrates multiple steps including order ID creation, stock verification, payment processing, order dispatch, and customer notifications, ensuring a complete and automated process. It relies on various utility functions for each step. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` ---------------------------------------- TITLE: Complete AI Data Processing Workflow with Upstash (TypeScript) DESCRIPTION: This TypeScript snippet defines an Upstash Workflow that orchestrates a multi-step AI data processing pipeline. It handles downloading large datasets, processing them in chunks using OpenAI's GPT-4, aggregating intermediate results, and finally generating and sending a report. It leverages `context.run` for idempotent operations and `context.call` for long-running HTTP requests. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }>( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ``` ---------------------------------------- TITLE: Processing Data Chunks with OpenAI in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet illustrates how to iterate through data chunks and process each one using OpenAI's GPT-4 model within an Upstash Workflow. It utilizes `context.api.openai.call` to interact with the OpenAI API, sending a system and user message for analysis and specifying `max_completion_tokens`. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) } ``` ---------------------------------------- TITLE: Implementing Customer Onboarding Workflow with Upstash Workflow in TypeScript DESCRIPTION: This TypeScript snippet demonstrates a multi-step customer onboarding workflow using Upstash Workflow. It includes sending an initial welcome email, pausing execution for three days using `context.sleep`, generating a personalized follow-up message via OpenAI's API, and finally sending that message as a follow-up email. The `context.run` function ensures individual steps are retried on failure, and `context.api.openai.call` integrates with external services. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { sendEmail } from "./emailUtils"; // Type-safety for starting our workflow interface InitialData { userId: string email: string name: string } export const { POST } = serve(async (context) => { const { userId, email, name } = context.requestPayload; // Step 1: Send welcome email await context.run("send-welcome-email", async () => { await sendEmail(email, "Welcome to our service!"); }); // Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3); // Step 3: AI-generate personalized follow-up message const { body: aiResponse } = await context.api.openai.call( "generate-personalized-message", { token: "", operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [ { role: "system", content: "You are an assistant creating personalized follow-up messages." }, { role: "user", content: `Create a short, friendly follow-up message for ${name} who joined our service 3 days ago.` } ] } } ); const personalizedMessage = aiResponse.choices[0].message.content; // Step 4: Send personalized follow-up email await context.run("send-follow-up-email", async () => { await sendEmail(email, personalizedMessage); }); }); ``` ---------------------------------------- TITLE: Implementing a Single Agent with Wikipedia Tool in TypeScript DESCRIPTION: This snippet demonstrates how to set up a single AI agent using Upstash Workflow. The `researcherAgent` is configured with an OpenAI model and a `WikipediaQueryRun` tool, enabling it to query Wikipedia for information. The agent is then assigned a task to research advanced physics topics, and its output is logged to the console. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const task = context.agents.task({ agent: researcherAgent, prompt: "Tell me about 5 topics in advanced physics.", }); const { text } = await task.run(); console.log("result:", text) }) ``` ---------------------------------------- TITLE: Ensuring Idempotency in context.run - TypeScript DESCRIPTION: Emphasizes that business logic within `context.run` must be idempotent. This means that executing the `someWork` function multiple times with the same input should produce the same result as executing it once, crucial for reliability in distributed systems with retries. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { return someWork(input) }) }) ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with TypeScript SDK DESCRIPTION: This TypeScript snippet demonstrates how to programmatically trigger an Upstash Workflow endpoint using the `@upstash/workflow` SDK. It shows how to initialize the client with a QStash token and then use the `client.trigger` method to send a request, specifying the workflow URL, optional body, headers, a custom workflow run ID, and retry attempts for the initial request. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` ---------------------------------------- TITLE: Creating Custom OpenAI Client with Upstash Workflow in TypeScript DESCRIPTION: This function `createWorkflowOpenAI` customizes the OpenAI client's fetch implementation to use `context.call` from Upstash Workflow. This ensures that HTTP requests are handled by the workflow, providing durability and extended timeouts for LLM interactions. It can be generalized for other LLM SDKs. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, }); }; ``` ---------------------------------------- TITLE: Notifying Waiting Workflows with context.notify - Upstash Workflow - JavaScript DESCRIPTION: This snippet demonstrates how `context.notify` is used to send a payload to workflows that are currently waiting for a specific event ID. It shows how to define a step name, the event ID, and the payload to be sent, returning a list of `NotifyResponse` objects indicating the result of the notification for each waiting workflow. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload; const { notifyResponse, // result of notify, which is a list of notified waiters } = await context.notify("notify step", "my-event-Id", payload); }); ``` ---------------------------------------- TITLE: Performing Subsequent Operations After Webhook Event Processing DESCRIPTION: Following initial webhook event processing, this example demonstrates how to perform additional operations, such as creating a customer record in Stripe, using `context.run`. This ensures that each external API call or significant step is trackable and retryable within the Upstash Workflow, leveraging previously extracted user data. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // ... Previous validation and user data extraction if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Previous validation and user data extraction if not user: return async def _create_stripe_customer(): return await stripe.customers.create( email=user["email"], name=f"{user['first_name']} {user['last_name']}", metadata={"user_id": user["user_id"]}, ) customer = await context.run("create-stripe-customer", _create_stripe_customer) # ... Additional steps ``` ---------------------------------------- TITLE: Orchestrating Worker Agents with Upstash Workflow in TypeScript DESCRIPTION: This snippet demonstrates how to set up an Upstash Workflow orchestrator to manage multiple worker agents. It initializes a Wikipedia tool, defines three specialized worker agents for different physics topics (advanced physics, quantum mechanics, relativity), and then uses a task agent to synthesize their responses into a Q&A format. It showcases the use of @upstash/workflow/nextjs for serving the workflow and @langchain/community/tools/wikipedia_query_run for tool integration. SOURCE: https://upstash.com/docs/workflow/agents/patterns/orchestrator-workers.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); // Worker agents const worker1 = context.agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = context.agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = context.agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = context.agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ``` ---------------------------------------- TITLE: Setting Up Python Virtual Environment DESCRIPTION: These commands create a new Python virtual environment named 'venv' and then activate it. This practice isolates project dependencies, preventing conflicts with other Python projects on your system. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Bash CODE: ``` python -m venv venv source venv/bin/activate ``` ---------------------------------------- TITLE: Validating Webhook Requests within Upstash Workflow DESCRIPTION: This code illustrates how to integrate webhook request validation directly into an Upstash Workflow endpoint. It retrieves the raw payload and headers, then attempts to validate them using a separate `validateRequest` function. If validation fails, the workflow returns early, preventing unauthorized processing. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { const payloadString = context.requestPayload; const headerPayload = context.headers; let event: WebhookEvent; try { event = await validateRequest(payloadString, headerPayload); } catch { return } // Next steps based on the event }) ``` LANGUAGE: Python CODE: ``` async def validate_request(payload_string: str, header_payload: dict): # Validate the request pass @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: payload_string = context.request_payload header_payload = context.headers try: event = await validate_request(payload_string, header_payload) except: return # Next steps based on the event ``` ---------------------------------------- TITLE: Limiting External API Calls with context.call (JavaScript) DESCRIPTION: This snippet demonstrates how to apply rate and parallelism limits to specific external API calls made within a workflow step using `context.call`. By configuring `flowControl` with a `key`, `parallelism`, and `rate` for the API request, it prevents excessive requests to third-party services like OpenAI, ensuring compliance with their rate limits and stable integration. SOURCE: https://upstash.com/docs/workflow/howto/flow-control.mdx LANGUAGE: JavaScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const response = await context.call( "generate-long-essay", { url: "https://api.openai.com/v1/chat/completions", method: "POST", body: {/*****/}, flowControl: { key: "opani-call", parallelism: 3, rate: 10 } } ); }); ``` ---------------------------------------- TITLE: Calling OpenAI API with context.api - Upstash Workflow - TypeScript DESCRIPTION: This snippet demonstrates using `context.api.openai.call` for type-safe integration with the OpenAI API within an Upstash Workflow. It shows how to specify the API key, the operation (e.g., `chat.completions.create`), and the request body for interacting with OpenAI's models. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.openai.call("Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" }, ], }, }); ``` ---------------------------------------- TITLE: Correctly Returning Results from context.run - TypeScript DESCRIPTION: Demonstrates the correct pattern for returning results from `context.run` by directly assigning the awaited result of the `context.run` call to a variable. This ensures the result is available for subsequent steps, even if the workflow endpoint is called multiple times. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", async () => { return await someWork(input) }) await context.run("step-2", async () => { someOtherWork(result) }) }) ``` ---------------------------------------- TITLE: Implementing Parallel Agent Execution with Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet demonstrates how to implement parallel processing using Upstash Workflow. It defines three distinct AI agents (worker1, worker2, worker3) with specific backgrounds, runs their tasks concurrently using Promise.all, and then uses an 'aggregator' agent to summarize their combined results, showcasing efficient task distribution and consolidation. It depends on '@upstash/workflow/nextjs' and 'openai' agents. SOURCE: https://upstash.com/docs/workflow/agents/patterns/parallelization.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Define worker agents const worker1 = context.agents.agent({ model, name: 'worker1', maxSteps: 1, background: 'You are an agent that explains quantum physics.', tools: {} }); const worker2 = context.agents.agent({ model, name: 'worker2', maxSteps: 1, background: 'You are an agent that explains relativity.', tools: {} }); const worker3 = context.agents.agent({ model, name: 'worker3', maxSteps: 1, background: 'You are an agent that explains string theory.', tools: {} }); // Await results const [result1, result2, result3] = await Promise.all([ context.agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(), context.agents.task({ agent: worker2, prompt: "Explain relativity." }).run(), context.agents.task({ agent: worker3, prompt: "Explain string theory." }).run(), ]); // Aggregating results const aggregator = context.agents.agent({ model, name: 'aggregator', maxSteps: 1, background: 'You are an agent that summarizes multiple answers.', tools: {} }); const task = await context.agents.task({ agent: aggregator, prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}` }) const finalSummary = await task.run(); console.log(finalSummary.text); }); ``` ---------------------------------------- TITLE: Executing Business Logic in context.run - Python DESCRIPTION: Illustrates placing business logic within `context.run` for each step in an Upstash Workflow. Code outside `context.run` executes multiple times, whereas code inside runs only once per step. Shows how to pass results between steps. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with Client (TypeScript) DESCRIPTION: This snippet demonstrates the recommended way to start an Upstash workflow using the `@upstash/workflow` client. It returns the workflow run ID and results in fewer QStash publishes. Parameters include the workflow URL, optional body, headers, a custom workflow run ID, and retries for the initial request. SOURCE: https://upstash.com/docs/workflow/howto/start.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` ---------------------------------------- TITLE: Implementing Custom Retry Logic with OpenAI in Upstash Workflow DESCRIPTION: This comprehensive example demonstrates how to implement custom retry logic for OpenAI API calls within an Upstash Workflow. It attempts API calls up to 10 times, dynamically adjusts delays based on rate limit headers or status codes, and asynchronously stores successful responses using `context.run`. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { storeResponse } from "@/lib/utils" const BASE_DELAY = 10; const createSystemMessage = () => ({ role: "system", content: "You are an AI assistant providing a brief summary and key insights for any given data.", }) const createUserMessage = (data: string) => ({ role: "user", content: `Analyze this data chunk: ${data}`, }) export const { POST } = serve<{ userData: string }>(async (context) => { // 👇 initial data sent along when triggering the workflow const { userData } = context.requestPayload for (let attempt = 0; attempt < 10; attempt++) { const response = await context.api.openai.call(`call-openai`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [createSystemMessage(), createUserMessage(userData)], max_completion_tokens: 150, }, }) // Success case if (response.status < 300) { await context.run("store-response-in-db", () => storeResponse(response.body)) return } // Rate limit case - wait and retry if (response.status === 429) { const resetTime = response.header["x-ratelimit-reset-tokens"]?.[0] || response.header["x-ratelimit-reset-requests"]?.[0] || BASE_DELAY // assuming `resetTime` is in seconds await context.sleep("sleep-until-retry", Number(resetTime)) continue } // Any other scenario - pause for 5 seconds to avoid overloading OpenAI API await context.sleep("pause-to-avoid-spam", 5) } }) ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Dict, Any, TypedDict import os from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_response app = FastAPI() serve = Serve(app) class InitialData(TypedDict): user_data: str def create_system_message() -> Dict[str, str]: return { "role": "system", "content": "You are an AI assistant providing a brief summary and key insights for any given data.", } def create_user_message(data: str) -> Dict[str, str]: return {"role": "user", "content": f"Analyze this data chunk: {data}"} @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: # 👇 initial data sent along when triggering the workflow user_data = context.request_payload["user_data"] for attempt in range(10): response: CallResponse[Dict[str, Any]] = await context.call( "call-openai", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [create_system_message(), create_user_message(user_data)], "max_tokens": 150, }, ) # Success case if response.status_code < 300: async def _store_response_in_db() -> None: await store_response(response.body) await context.run("store-response-in-db", _store_response_in_db) return # Rate limit case - wait and retry if response.status_code == 429: ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens") ratelimit_requests_header = response.header.get( "x-ratelimit-reset-requests" ) reset_time = ( (ratelimit_tokens_header[0] if ratelimit_tokens_header else None) or (ratelimit_requests_header[0] if ratelimit_requests_header else None) or 10 ) # assuming `reset_time` is in seconds await context.sleep("sleep-until-retry", float(reset_time)) continue # Any other scenario - pause for 5 seconds to avoid overloading OpenAI API await context.sleep("pause-to-avoid-spam", 5) ``` ---------------------------------------- TITLE: Implementing Order Processing Workflow with Upstash DESCRIPTION: This comprehensive TypeScript example demonstrates an Upstash Workflow for handling order processing. It initiates an order request, pauses execution to await an external processing event with a 10-minute timeout, and then resumes to log processed data and send a confirmation email upon event reception or handles timeout scenarios. SOURCE: https://upstash.com/docs/workflow/examples/waitForEvent.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const { orderId, userEmail } = context.requestPayload; // Step 1: request order processing await context.run("request order processing", async () => { await requestProcessing(orderId) }) // Step 2: Wait for the order to be processed const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" // 10 minutes timeout } ); if (timeout) { // end workflow in case of timeout return; } const processedData = eventData; // Step 3: Log the processed order await context.run("process-order", async () => { console.log(`Order ${orderId} processed:`, processedData); }); // Step 4: Send a confirmation email await context.run("send-confirmation-email", async () => { await sendEmail( userEmail, "Your order has been processed!", processedData ); }); }); ``` ---------------------------------------- TITLE: Sending Batch Emails with Resend using Upstash Workflow (TypeScript) DESCRIPTION: This snippet illustrates how to send multiple emails in a single batch request using `context.api.resend.call`. By setting `batch: true`, the `body` field expects an array of email objects, each containing sender, recipient, subject, and HTML content. This optimizes sending multiple emails with one API call. SOURCE: https://upstash.com/docs/workflow/integrations/resend.mdx LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.resend.call( "Call Resend", { batch: true, token: "", body: [ { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", } ], headers: { "content-type": "application/json", }, } ); ``` ---------------------------------------- TITLE: Ensuring Idempotency in context.run - Python DESCRIPTION: Highlights that business logic within `context.run` must be idempotent. This implies that executing the `some_work` function multiple times with the same input should yield the same outcome as a single execution, which is vital for reliability in distributed systems with retries. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: return await some_work(input) await context.run("step-1", _step_1) ``` ---------------------------------------- TITLE: Avoiding Non-Idempotent Functions Outside context.run - TypeScript DESCRIPTION: Shows an incorrect example where a non-idempotent function (`getResultFromDb`) is called outside `context.run`. This can lead to inconsistent behavior or `Failed to authenticate Workflow request` errors if the workflow endpoint is re-executed. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve<{ entryId: string }>(async (context) => { const { entryId } = context.requestPayload; // 👇 Problem: Non-idempotent function outside context.run: const result = await getResultFromDb(entryId); if (result.return) { return; } // ... }) ``` ---------------------------------------- TITLE: Avoiding Time-Dependent Code Outside context.run - Python DESCRIPTION: Illustrates an incorrect use of time-dependent code (`time.time()`) outside `context.run`. This can cause non-deterministic workflow execution, as the condition might evaluate differently across multiple calls to the workflow endpoint. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 Problem: time-dependent code if time.time() % 5 == 2: await context.run("step-1", lambda: ...) else: await context.run("step-2", lambda: ...) ``` ---------------------------------------- TITLE: Avoiding Random Code Outside context.run - Python DESCRIPTION: Illustrates an incorrect example of using random code (`random.randint()`) outside `context.run`. Randomness makes the workflow non-deterministic, meaning it might produce different results or take different paths on re-execution, which is problematic for reliable workflows. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 Problem: random code if random.randint(0, 9) % 5 == 2: await context.run("step-1", lambda: ...) else: await context.run("step-2", lambda: ...) ``` ---------------------------------------- TITLE: Defining a Workflow Endpoint with `serve` DESCRIPTION: This snippet demonstrates how to use the `serve` method to define an endpoint for a workflow. It provides a `context` object, which is used to define sequential business logic steps with `context.run`, ensuring proper execution flow within the workflow. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const result = await context.run("step-1", async () => { // define a piece of business logic as step 1 }); await context.run("step-2", async () => { // define another piece of business logic as step 2 }); }); ``` LANGUAGE: Python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ``` ---------------------------------------- TITLE: Implementing Payment Retry Loop with 24-Hour Delay DESCRIPTION: This code block outlines the core retry mechanism for payments. It iterates up to three times, attempting to charge the customer. If the payment fails (`!result`), it uses `context.sleep` to introduce a 24-hour delay before the next attempt, ensuring a persistent retry strategy. If successful, the workflow concludes. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` for (let i = 0; i < 3; i++) { // attempt to charge the customer if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Payment succeeded // Unsuspend user, send invoice email // end the workflow: return; } } ``` LANGUAGE: python CODE: ``` for i in range(3): # attempt to charge the customer if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Payment succeeded # Unsuspend user, send invoice email # end the workflow: return ``` ---------------------------------------- TITLE: Correct Step Execution Order in Upstash Workflow (TypeScript) DESCRIPTION: This example demonstrates the correct pattern for calling `generateText` within an Upstash Workflow. It ensures that the `context.requestPayload.prompt` is first retrieved within a `context.run` step, satisfying the prerequisite for `generateText` and preventing errors. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Get prompt in a step first const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); const result = await generateText({ model: openai('gpt-3.5-turbo'), prompt }); }); ``` ---------------------------------------- TITLE: Implementing Sleep and Delay in Upstash Workflow DESCRIPTION: This workflow demonstrates how to introduce delays using `context.sleep_until` and `context.sleep`. It processes an input, waits for a specific time, processes again, waits for a duration, and then performs a final step, showcasing time-based workflow control. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI import time from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/sleep") async def sleep(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) await context.sleep_until("sleep1", time.time() + 3) async def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = await context.run("step2", _step2) await context.sleep("sleep2", 2) async def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) await context.run("step3", _step3) ``` ---------------------------------------- TITLE: Executing Workflow Steps in Parallel with context.run (TypeScript) DESCRIPTION: Illustrates how to execute multiple workflow steps concurrently using `context.run` combined with `Promise.all`. This allows `someWork` and `someOtherWork` to run in parallel, improving efficiency. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { const input = context.requestPayload; const promise1 = context.run("step-1", async () => { return someWork(input); }); const promise2 = context.run("step-2", async () => { return someOtherWork(input); }); await Promise.all([promise1, promise2]); }, ); ``` ---------------------------------------- TITLE: Implementing Tools with OpenAI Client in Upstash Workflow (TypeScript) DESCRIPTION: This advanced workflow endpoint demonstrates how to integrate tools with the Vercel AI SDK and Upstash Workflow. It defines a 'weather' tool that uses `context.run` for its execution, ensuring durability for tool calls. The `maxSteps` parameter is set to accommodate tool processing, and the standard error handling pattern is applied. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` import { z } from 'zod'; import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError, tool } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), tools: { weather: tool({ description: 'Get the weather in a location', parameters: z.object({ location: z.string().describe('The location to get the weather for'), }), execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) }), }, maxSteps: 2, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` ---------------------------------------- TITLE: Calling OpenAI Chat Completions in Upstash Workflow (TypeScript) DESCRIPTION: This snippet demonstrates how to use the `context.api.openai.call` method to interact with the OpenAI chat completions API. It shows how to pass the API token, specify the operation, model, and define user/system messages for text generation, and then how to access the generated content. SOURCE: https://upstash.com/docs/workflow/integrations/openai.mdx LANGUAGE: TypeScript CODE: ``` const { status, body } = await context.api.openai.call( "Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'" }, { role: "user", content: "User shouts back 'hi!'" } ] } } ); // get text: console.log(body.content[0].text) ``` ---------------------------------------- TITLE: Handling OpenAI Rate Limits with Upstash Workflow Sleep DESCRIPTION: This snippet addresses API rate limits (status code 429) by extracting reset times from response headers. It then uses `context.sleep` to pause the workflow for the specified duration, allowing the rate limit to reset before retrying the API call. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` if (response.status === 429) { const resetTime = response.header["x-ratelimit-reset-tokens"]?.[0] || response.header["x-ratelimit-reset-requests"]?.[0] || BASE_DELAY // assuming `resetTime` is in seconds await context.sleep("sleep-until-retry", Number(resetTime)) continue } ``` LANGUAGE: python CODE: ``` if response.status_code == 429: ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens") ratelimit_requests_header = response.header.get( "x-ratelimit-reset-requests" ) reset_time = ( (ratelimit_tokens_header[0] if ratelimit_tokens_header else None) or (ratelimit_requests_header[0] if ratelimit_requests_header else None) or 10 ) # assuming `reset_time` is in seconds await context.sleep("sleep-until-retry", float(reset_time)) continue ``` ---------------------------------------- TITLE: Defining a Multi-Step Onboarding Workflow in Python DESCRIPTION: This Python snippet defines an asynchronous onboarding workflow using Upstash Workflow's `AsyncWorkflowContext`. It orchestrates a series of steps including sending a welcome email, pausing for three days, generating a personalized follow-up message via an AI API call, and finally sending a follow-up email. It demonstrates the use of `context.run`, `context.sleep`, and `context.call` for managing workflow steps and external interactions. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: Python CODE: ``` class InitialData(TypedDict): user_id: str email: str name: str @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", } ] } ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ``` ---------------------------------------- TITLE: Processing Data Chunks with OpenAI in Upstash Workflow (Python) DESCRIPTION: This Python snippet demonstrates iterating over data chunks and processing each using the OpenAI GPT-4 model within an Upstash Workflow. It makes an HTTP `POST` request to the OpenAI chat completions endpoint via `context.call`, including necessary headers and a structured request body with system and user messages. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: python CODE: ``` for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) ``` ---------------------------------------- TITLE: Generating Text with OpenAI Client in Upstash Workflow (TypeScript) DESCRIPTION: This workflow endpoint uses the custom `createWorkflowOpenAI` client to generate text based on a provided prompt. It demonstrates the basic text generation process within an Upstash Workflow, ensuring the prompt is retrieved via a workflow step and includes the required error handling pattern for `ToolExecutionError` and `WorkflowAbort`. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI }mport './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Endpoint with Authentication DESCRIPTION: This Flask workflow endpoint demonstrates how to implement a basic authentication check by inspecting the `Authentication` header of the incoming request. If the authentication fails, the workflow steps are not executed, providing a simple security mechanism for your workflow endpoints. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Python CODE: ``` from flask import Flask from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/auth") def auth(context: WorkflowContext[str]) -> None: if context.headers.get("Authentication") != "Bearer secret_password": print("Authentication failed.") return def _step1() -> str: return "output 1" context.run("step1", _step1) def _step2() -> str: return "output 2" context.run("step2", _step2) ``` ---------------------------------------- TITLE: Implementing Authentication in Upstash Workflow Endpoint DESCRIPTION: This workflow demonstrates how to implement basic authentication by checking a `Bearer` token in the request headers. If the authentication fails, the workflow stops; otherwise, it proceeds to execute subsequent steps, ensuring secure access. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ``` ---------------------------------------- TITLE: Setting Up Basic Webhook Endpoint with Upstash Workflow DESCRIPTION: This snippet demonstrates how to set up a basic webhook endpoint using the `serve` function from `@upstash/workflow/nextjs` for TypeScript or `Serve` from `upstash_workflow.fastapi` for Python. It initializes an endpoint that can receive incoming webhook payloads, which are then parsed by `initialPayloadParser`. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); ``` LANGUAGE: Python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ``` ---------------------------------------- TITLE: Defining Type-Safe Workflows with createWorkflow and Invoking Them (TypeScript) DESCRIPTION: This example illustrates the use of `createWorkflow` to define a workflow object, ensuring type safety for both the request body and the response. It shows `anotherWorkflow` returning a structured message and `someWorkflow` invoking it with a string body, demonstrating how types are inferred and maintained across workflow invocations. SOURCE: https://upstash.com/docs/workflow/howto/invoke.mdx LANGUAGE: TypeScript CODE: ``` import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow } from "@upstash/workflow/nextjs"; const anotherWorkflow = createWorkflow( // Define the workflow logic, specifying the type of the initial request body. // In this case, the body is a string: async (context: WorkflowContext) => { await context.sleep("wait 1 second", 1) // Return a response from the workflow. The type of this // response will be available when `context.invoke` is // called with `anotherWorkflow`. return { message: "This is the data returned by the workflow" }; } ); const someWorkflow = createWorkflow(async (context) => { // Invoke anotherWorkflow with a string body and get the response // The types of the body parameter and the response are // typesafe and inferred from anotherWorkflow const { body } = await context.invoke( "invoke anotherWorkflow", { workflow: anotherWorkflow, body: "user-1" } ), }); ``` ---------------------------------------- TITLE: Executing Workflow Steps Serially with context.run (TypeScript) DESCRIPTION: Shows how to define and execute workflow steps sequentially using `context.run`. Each step awaits the completion of the previous one, ensuring ordered execution of `someWork` and `someOtherWork`. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const input = context.requestPayload; const result1 = await context.run("step-1", async () => { return someWork(input); }); await context.run("step-2", async () => { someOtherWork(result1); }); }); ``` ---------------------------------------- TITLE: Storing Processed Images in Cloud Storage in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet handles storing the final processed images in cloud storage. It maps over the processed images, using `context.run` for each storage operation, and executes these operations in parallel with `Promise.all` to obtain an array of stored image URLs. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) ``` ---------------------------------------- TITLE: Executing Customer Charge Step in Upstash Workflow DESCRIPTION: This snippet demonstrates how to execute the customer charging logic within an Upstash Workflow step using `context.run`. It includes a `try-catch` block to gracefully handle payment failures, allowing the workflow to proceed with custom error handling and retry logic instead of immediately failing the step. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); ``` LANGUAGE: python CODE: ``` async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) ``` ---------------------------------------- TITLE: Implementing Multi-Agent Collaboration with Research and Math Tools in TypeScript DESCRIPTION: This snippet illustrates a multi-agent system within Upstash Workflow. It defines two agents: a `researcherAgent` with a Wikipedia tool and a `mathAgent` with a custom `calculate` tool using `mathjs`. A single task is then assigned to both agents, prompting them to collaborate to find information about Japanese cities and calculate their combined population, demonstrating complex task delegation. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; import * as mathjs from 'mathjs' import { tool } from "ai"; import { z } from "zod"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const mathAgent = context.agents.agent({ model, name: "mathematician", maxSteps: 2, tools: { calculate: tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." + "only call this tool if you need to calculate a mathematical expression." + "when writing an expression, don't use words like 'thousand' or 'million'", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }), }, background: "You are a mathematician agent which can utilize" + "a calculator to compute expressions" }) const task = context.agents.task({ model, maxSteps: 3, agents: [researcherAgent, mathAgent], prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations", }); const { text } = await task.run(); console.log("result:", text) }) ``` ---------------------------------------- TITLE: Invoking Another Workflow with context.invoke (TypeScript) DESCRIPTION: This snippet demonstrates how to use `context.invoke` to call another workflow, await its execution, and retrieve its response and status. It details the parameters such as `workflow` (the workflow object to invoke), `body` (payload for the invoked workflow), `header`, `retries`, `flowControl`, and `workflowRunId`. SOURCE: https://upstash.com/docs/workflow/howto/invoke.mdx LANGUAGE: TypeScript CODE: ``` const { body, // response from the invoked workflow isFailed, // whether the invoked workflow was canceled isCanceled // whether the invoked workflow failed } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ) ``` ---------------------------------------- TITLE: Pausing Workflow Until Specific Timestamp with context.sleepUntil (TypeScript) DESCRIPTION: Shows how to pause a workflow until a precise future timestamp using `context.sleepUntil`. This example calculates a date one week from now and pauses the workflow until that exact time before sending an email. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // 👇 Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // 👇 Wait until the calculated date await context.sleepUntil("wait-for-one-week", oneWeekFromNow); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` ---------------------------------------- TITLE: Applying Flow Control to Workflow Trigger (JavaScript) DESCRIPTION: This example illustrates how to enforce rate and parallelism limits when triggering a workflow using the `@upstash/workflow` client's `trigger` method. The `flowControl` object, including `key`, `parallelism`, and `rate`, ensures that calls initiated via this trigger respect the defined limits, preventing the workflow from being overwhelmed by too many concurrent or rapid invocations. SOURCE: https://upstash.com/docs/workflow/howto/flow-control.mdx LANGUAGE: JavaScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https://workflow-endpoint.com", body: "hello there!", flowControl: { key: "app1", parallelism: 3, rate: 10 } }); ``` ---------------------------------------- TITLE: Calling Anthropic Messages API with `context.api.anthropic.call` in TypeScript DESCRIPTION: This snippet demonstrates how to make a type-safe call to the Anthropic `/v1/messages` endpoint using `context.api.anthropic.call`. It requires an Anthropic API key, specifies the `messages.create` operation, and includes the model, max tokens, and message content in the request body. The response body's content can be accessed to retrieve the generated text. SOURCE: https://upstash.com/docs/workflow/integrations/anthropic.mdx LANGUAGE: TypeScript CODE: ``` const { status, body } = await context.api.anthropic.call( "Call Anthropic", { token: "", operation: "messages.create", body: { model: "claude-3-5-sonnet-20241022", max_tokens: 1024, messages: [ {"role": "user", "content": "Hello, world"} ] }, } ); // get text: console.log(body.content[0].text) ``` ---------------------------------------- TITLE: Implementing Iterative Text Generation with Evaluation using Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript code defines an Upstash Workflow that orchestrates two AI agents: a 'generator' and an 'evaluator'. It iteratively generates text based on an initial prompt, evaluates it, and revises the generation if the evaluation is not 'PASS', demonstrating a self-correction loop for content generation. It utilizes 'context.agents.openai' for model interaction and 'context.agents.agent' to define custom agents with specific roles. SOURCE: https://upstash.com/docs/workflow/agents/patterns/evaluator-optimizer.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = context.agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = context.agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ``` ---------------------------------------- TITLE: Implementing a Coffee Brewing Workflow with Parallel Inventory Checks in TypeScript DESCRIPTION: This complete Upstash Workflow example demonstrates a practical application of parallel execution. It concurrently checks the availability of coffee beans, cups, and milk using `ctx.run` within `Promise.all`. If all ingredients are present, it proceeds to brew coffee and print a receipt. SOURCE: https://upstash.com/docs/workflow/howto/parallel-runs.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Endpoint with Sleep Steps DESCRIPTION: This Flask workflow endpoint demonstrates how to introduce pauses between steps using `context.sleep_until` and `context.sleep`. It processes an input, pauses until a specific time, processes again, and then pauses for a fixed duration before a final step, showcasing time-based control flow. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Python CODE: ``` from flask import Flask import time from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/sleep") def sleep(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) context.sleep_until("sleep1", time.time() + 3) def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = context.run("step2", _step2) context.sleep("sleep2", 2) def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) context.run("step3", _step3) ``` ---------------------------------------- TITLE: Handling Non-Deterministic Conditions as Workflow Steps DESCRIPTION: To avoid authorization errors caused by non-deterministic early returns, this snippet demonstrates transforming the conditional check into an explicit workflow step using context.run. This ensures that the condition is evaluated within the workflow's execution flow, preventing premature termination and authentication failures. SOURCE: https://upstash.com/docs/workflow/troubleshooting/general.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { const shouldReturn = await context.run("check condition", () => someCondition()) if (shouldReturn) => { return; } // rest of the workflow }) ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _check_condition() -> bool: return some_condition() should_return = await context.run("check condition", _check_condition) if should_return: return # rest of the workflow ``` ---------------------------------------- TITLE: Correctly Handling Non-Idempotent Code - Python DESCRIPTION: Illustrates the correct way to handle potentially non-idempotent code by placing it inside `context.run`. This ensures that even if the workflow is retried, the non-idempotent operation is managed by the workflow's retry mechanism, preventing unintended side effects. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` async def _get_result_from_db(): return await get_result_from_db(entry_id) result = await context.run("get-result-from-db", _get_result_from_db) if result.should_return: return ``` ---------------------------------------- TITLE: Correctly Handling Non-Idempotent Code - TypeScript DESCRIPTION: Demonstrates the correct way to handle potentially non-idempotent code by placing it inside `context.run`. This ensures that even if the workflow is retried, the non-idempotent operation is managed by the workflow's retry mechanism, preventing unintended side effects. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` const result = await context.run(async () => { await getResultFromDb(entryId) }); if (result.return) { return; } ``` ---------------------------------------- TITLE: Sending Trial Warning Email (TypeScript/Python) DESCRIPTION: This snippet handles sending a trial warning email 2 days before the trial ends. It first waits for 5 days using `context.sleep`, then checks the user's upgrade status with `checkUpgradedPlan` via `context.run`. If the user has already upgraded, the workflow terminates; otherwise, a warning email is sent. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); ``` LANGUAGE: Python CODE: ``` await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) ``` ---------------------------------------- TITLE: Implementing Webhook Request Validation with Svix (TypeScript) DESCRIPTION: This TypeScript function demonstrates how to validate incoming webhook requests using the Svix library, specifically for Clerk webhooks. It takes the raw payload string and header payload, extracts Svix-specific headers, and uses `wh.verify` to ensure the request's authenticity against a predefined secret. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` import { Webhook } from "svix"; import { WebhookEvent } from "@clerk/nextjs/server"; const webhookSecret = "YOUR_WEBHOOK_SECRET"; async function validateRequest(payloadString: string, headerPayload: Headers) { const svixHeaders = { "svix-id": headerPayload.get("svix-id") as string, "svix-timestamp": headerPayload.get("svix-timestamp") as string, "svix-signature": headerPayload.get("svix-signature") as string, }; const wh = new Webhook(webhookSecret); return wh.verify(payloadString, svixHeaders) as WebhookEvent; } ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Endpoint with External Call DESCRIPTION: This example illustrates a Flask workflow endpoint that makes an internal HTTP POST request to another endpoint (`/get-data`) within the same application using `context.call`. It demonstrates how to integrate external service calls as part of a workflow step, passing data between steps and handling responses. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Python CODE: ``` from flask import Flask from typing import Dict from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext, CallResponse app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.route("/get-data", methods=["POST"]) def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.route("/call") def call(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) response: CallResponse[Dict[str, str]] = context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1} ) def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output context.run("step2", _step2) ``` ---------------------------------------- TITLE: Invoking Other Workflows with context.invoke - Upstash Workflow - TypeScript DESCRIPTION: This snippet demonstrates how `context.invoke` triggers another workflow run and waits for its completion. It shows how to specify the workflow to invoke, pass a body, optional headers, configure retries, and apply flow control settings. The method returns the invoked workflow's body, and booleans indicating if it failed or was canceled. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: ts CODE: ``` const { body, isFailed, isCanceled } = await context.invoke( "invoke another workflow", { workflow: anotherWorkflow, body: "test", header: {}, retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ); ``` ---------------------------------------- TITLE: Downloading Dataset in Upstash Workflow (Python) DESCRIPTION: This Python snippet shows how to obtain a dataset URL and download the dataset within an Upstash Workflow. It uses `context.run` for the URL retrieval (for idempotency) and `context.call` for the HTTP download, which is designed to handle long-running requests beyond typical serverless execution limits. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: python CODE: ``` async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body ``` ---------------------------------------- TITLE: Implementing Weekly User Summary Workflow (TypeScript) DESCRIPTION: This TypeScript snippet defines a Next.js API route using `@upstash/workflow/nextjs` for processing weekly user summaries. It fetches user data, generates the summary, and sends an email, all orchestrated as distinct steps using `context.run`. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { getUserData, generateSummary } from "@/utils/user-utils"; import { sendEmail } from "@/utils/email-utils"; // Type-safety for starting our workflow interface WeeklySummaryData { userId: string; } export const { POST } = serve(async (context) => { const { userId } = context.requestPayload; // Step 1: Fetch user data const user = await context.run("fetch-user-data", async () => { return await getUserData(userId); }); // Step 2: Generate weekly summary const summary = await context.run("generate-summary", async () => { return await generateSummary(userId); }); // Step 3: Send email with weekly summary await context.run("send-summary-email", async () => { await sendEmail(user.email, "Your Weekly Summary", summary); }); }); ``` ---------------------------------------- TITLE: Defining Minimal Workflow Endpoint (Next.js App Router) DESCRIPTION: A minimal example of defining a workflow endpoint using `@upstash/workflow/nextjs` in a Next.js App Router `route.ts` file. It demonstrates two sequential steps, `initial-step` and `second-step`, executed within the `serve` function. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) ``` ---------------------------------------- TITLE: Pausing Workflow Execution with context.sleep (TypeScript) DESCRIPTION: Demonstrates how to pause a workflow for a specified duration using `context.sleep`. This example signs in a user, then waits for one day before sending a welcome email, ensuring a timed delay in the workflow. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { const signedInUser = await signIn(userData); return signedInUser; }); // 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d"); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ``` ---------------------------------------- TITLE: Configuring QStash Receiver for Request Verification in Python DESCRIPTION: This snippet shows how to configure the QStash Receiver for an Upstash Workflow endpoint in Python. It uses the `Receiver` class to validate incoming requests, ensuring they are from QStash. The `current_signing_key` and `next_signing_key` are retrieved from environment variables for secure verification. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: Python CODE: ``` from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Performing HTTP Calls with context.call - Upstash Workflow - Python DESCRIPTION: This Python snippet illustrates the usage of `context.call` within an Upstash Workflow integrated with FastAPI. It demonstrates making a long-running HTTP POST request to the OpenAI API, passing the request body and headers, and then destructuring the response to access status, headers, and body. This method is designed for operations that require extended execution times beyond typical serverless function limits. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class Request: topic: str @serve.post("/api/example") async def example(context: AsyncWorkflowContext[Request]) -> None: request: Request = context.request_payload result = await context.call( "generate-long-essay", url="https://api.openai.com/v1/chat/completions", method="POST", body={ "model": "gpt-4o", "messages": [ { "role": "system", "content": "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, {"role": "user", "content": request["topic"]}, ], }, headers={ "authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", }, ) status, headers, body = result.status, result.headers, result.body ``` ---------------------------------------- TITLE: Incorrect Step Execution Order in Upstash Workflow (TypeScript) DESCRIPTION: This example illustrates an incorrect pattern where `generateText` is called directly using `context.requestPayload.prompt` without wrapping the prompt retrieval in a workflow step. This will lead to an 'undefined prompt' error, highlighting the critical requirement for a preceding workflow step. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Will throw "prompt is undefined" const result = await generateText({ model: openai('gpt-3.5-turbo'), prompt: context.requestPayload.prompt }); }); ``` ---------------------------------------- TITLE: Defining Workflow Endpoint with Request Object (Next.js App Router) DESCRIPTION: Example of a workflow endpoint in Next.js App Router (`route.ts`) that allows access to the native `NextRequest` object. This enables processing of the incoming request before passing it to the workflow `serve` handler. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { NextRequest } from "next/server"; export const POST = async (request: NextRequest) => { // do something with the native request object const { POST: handler } = serve(async (context) => { // Your workflow steps }); return await handler(request); } ``` ---------------------------------------- TITLE: Configuring Options for createWorkflow and serveMany (TypeScript) DESCRIPTION: This snippet demonstrates how to pass configuration options to both `createWorkflow` and `serveMany`. It shows setting `retries` for a workflow and `failureUrl` for the `serveMany` endpoint. It clarifies that options provided to `createWorkflow` take precedence over those in `serveMany` if the same parameter is specified. SOURCE: https://upstash.com/docs/workflow/howto/invoke.mdx LANGUAGE: TypeScript CODE: ``` const workflowOne = createWorkflow( async (context) => { // ... }, { retries: 0 } ) export const { POST } = serveMany( { workflowOne }, { failureUrl: "https://some-url" } ) ``` ---------------------------------------- TITLE: Defining a Daily Backup Workflow with Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet defines a Next.js API route using `@upstash/workflow/nextjs` to create and upload daily backups. It uses `ctx.run` for orchestrating `createBackup` and `uploadBackup` steps, with a `failureFunction` for immediate notifications on backup failures. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { createBackup, uploadBackup } from "./utils"; export const { POST } = serve( async (ctx) => { const backup = await ctx.run("create-backup", async () => { return await createBackup(); }); await ctx.run("upload-backup", async () => { await uploadBackup(backup); }); }, { failureFunction({ context, failStatus, failResponse, failHeader }) { // immediately get notified for failed backups // i.e. send an email, log to Sentry }, } ); ``` ---------------------------------------- TITLE: Exposing Multiple Workflows with serveMany in Next.js (TypeScript) DESCRIPTION: This snippet demonstrates how to use `serveMany` within a Next.js catch-all route (`app/serve-many/[...any]/route.ts`) to expose multiple workflows defined by `createWorkflow` as API endpoints. It shows an example where `workflowOne` invokes `workflowTwo`, emphasizing that both workflows must be exposed within the same `serveMany` endpoint for successful invocation. SOURCE: https://upstash.com/docs/workflow/howto/invoke.mdx LANGUAGE: TypeScript CODE: ``` import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow, serveMany } from "@upstash/workflow/nextjs"; const workflowOne = createWorkflow(async (context) => { await context.run("say hi", () => { console.log("workflow one says hi!") }) const { body, isCanceled, isFailed } = await context.invoke("invoking other", { workflow: workflowTwo, body: "hello from workflow one", }) console.log(`received response from workflowTwo: ${body}`) }) const workflowTwo = createWorkflow(async (context: WorkflowContext) => { await context.run("say hi", () => { console.log("workflowTwo says hi!") console.log(`received: '${context.requestPayload}' in workflowTwo`) }) return "Workflow two finished!" }) export const { POST } = serveMany( { "workflow-one-route": workflowOne, "workflow-two-route": workflowTwo, } ) ``` ---------------------------------------- TITLE: Aggregating Intermediate Results in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet shows how to conditionally aggregate processed data chunks within an Upstash Workflow. It triggers an aggregation operation using `context.run` every 10 chunks or at the end of processing, ensuring that intermediate results are saved and the `processedChunks` array is cleared to manage memory. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } ``` ---------------------------------------- TITLE: Avoiding Non-Idempotent Functions Outside context.run - Python DESCRIPTION: Illustrates an incorrect example where a non-idempotent function (`get_result_from_db`) is called outside `context.run`. This can result in inconsistent behavior or authentication errors if the workflow endpoint is re-executed. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: entry_id = context.request_payload["entry_id"] # 👇 Problem: Non-idempotent function outside context.run: result = await get_result_from_db(entry_id) if result.should_return: return # ... ``` ---------------------------------------- TITLE: Ensuring requestPayload Persistence with context.run DESCRIPTION: This solution demonstrates how to reliably access context.requestPayload by wrapping its retrieval within a context.run step. By doing so, the payload is preserved and will never be undefined, even when context.call is executed later in the workflow, ensuring consistent data access. SOURCE: https://upstash.com/docs/workflow/troubleshooting/general.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // Payload will never be undefined const payload = await context.run("get payload", () => context.requestPayload) // ... steps or any other code context.call( ... ) }) ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _get_payload() -> str: return context.request_payload # Payload will never be None payload = await context.run("get payload", _get_payload) # ... steps or any other code context.call( ... ) ``` ---------------------------------------- TITLE: Defining an Agent with Model and Tools DESCRIPTION: Defines a `researcherAgent` using an OpenAI model and a Wikipedia tool, specifying its name, maximum steps, and a background prompt to guide its behavior. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }) }) ``` ---------------------------------------- TITLE: Notifying Upstash Workflow from External System DESCRIPTION: This TypeScript snippet shows how an external system can notify an Upstash Workflow using the `@upstash/workflow` Client. It sends an event with a specific `eventId` and `eventData` to resume a waiting workflow, typically after an order has been processed. SOURCE: https://upstash.com/docs/workflow/examples/waitForEvent.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const orderId = "1324" await client.notify({ eventId: `order-${orderId}`, eventData: { deliveryTime: "2 days" } }); ``` ---------------------------------------- TITLE: Notifying Waiting Workflow Runs (JavaScript) DESCRIPTION: This snippet demonstrates how to notify a workflow run that is currently paused and waiting for a specific event. The `client.notify` method takes an `eventId` to target the waiting workflow and `eventData` which is passed as input to the workflow upon notification. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "my-event-id", eventData: "my-data", // data passed to the workflow run }); ``` ---------------------------------------- TITLE: Notifying Workflow with client.notify (TypeScript) DESCRIPTION: This method is used by an external client to notify one or more workflows that a specific event has occurred. It requires an `eventId` to target waiting workflows and provides `eventData` which will be consumed by `context.waitForEvent`. An Upstash Workflow `Client` instance, initialized with a QStash token, is required. SOURCE: https://upstash.com/docs/workflow/howto/events.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "event-id", eventData: { my: "data" }, }); ``` ---------------------------------------- TITLE: Required Error Handling Pattern for Upstash Workflow AI (TypeScript) DESCRIPTION: This snippet outlines the mandatory error handling pattern for AI operations within Upstash Workflow. It specifically catches `ToolExecutionError` where the cause is `WorkflowAbort`, rethrowing the `WorkflowAbort` to ensure proper workflow termination, while rethrowing other errors for general handling. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` try { // Your generation code } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } ``` ---------------------------------------- TITLE: Configuring Failure Function in QStash Workflow (TypeScript) DESCRIPTION: This snippet demonstrates how to configure a `failureFunction` within the `serve` method of a QStash workflow. The `failureFunction` is an asynchronous callback that executes when a workflow run fails, allowing for custom error handling logic such as logging to Sentry or performing cleanup actions. It receives details about the failure including context, status, response, and headers. SOURCE: https://upstash.com/docs/workflow/howto/failures.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // Handle error, i.e. log to Sentry }, } ); ``` ---------------------------------------- TITLE: Notifying Workflow from Another Workflow with context.notify (TypeScript) DESCRIPTION: This method allows one workflow to notify another workflow about an event, enabling inter-workflow communication. It takes a `notify step name`, an `event-Id` to identify the target event, and `event data` to pass information. The response includes `notifyResponse` indicating the success of the notification. SOURCE: https://upstash.com/docs/workflow/howto/events.mdx LANGUAGE: TypeScript CODE: ``` const { notifyResponse } = await context.notify( "notify step", // notify step name "event-Id", // event id { my: "data" } // event data ); ``` ---------------------------------------- TITLE: Periodically Checking User Activity and Sending Emails - Upstash Workflow DESCRIPTION: This core loop of the workflow continuously monitors user engagement. It enters an infinite loop, periodically checking the user's state (active or non-active) using `context.run` and sending tailored emails based on their activity. After each check and email, the workflow pauses for one month using `context.sleep` before the next iteration, optimizing resource usage for long-running tasks. SOURCE: https://upstash.com/docs/workflow/examples/customerOnboarding.mdx LANGUAGE: typescript CODE: ``` while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } ``` LANGUAGE: python CODE: ``` while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Endpoint in Astro DESCRIPTION: This TypeScript code defines a workflow endpoint using `@upstash/workflow/astro`'s `serve` function. It outlines two sequential steps, 'initial-step' and 'second-step', demonstrating basic workflow logic and how to pass environment variables in Astro for both local and production environments. SOURCE: https://upstash.com/docs/workflow/quickstarts/astro.mdx LANGUAGE: typescript src/pages/api/workflow.ts CODE: ``` import { serve } from "@upstash/workflow/astro"; export const { POST } = serve(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { // env must be passed in astro. // for local dev, we need import.meta.env. // For deployment, we need process.env: env: { ...process.env, ...import.meta.env } }) ``` ---------------------------------------- TITLE: Defining SvelteKit Workflow Endpoint DESCRIPTION: This TypeScript code defines a SvelteKit API endpoint (`+server.ts`) that serves an Upstash Workflow. It uses `serve` from `@upstash/workflow/svelte` to create a POST handler, where `context.run` defines individual, retryable steps within the workflow, and `env` provides access to environment variables. SOURCE: https://upstash.com/docs/workflow/quickstarts/svelte.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ``` ---------------------------------------- TITLE: Updating `context.call` Method Parameters and Return DESCRIPTION: Shows the changes required for the `context.call` method. Parameters are now passed as an object, and the method returns an object containing `status`, `headers`, and `body`, instead of just the body. This also changes failure behavior, allowing workflows to continue even if the request fails. SOURCE: https://upstash.com/docs/workflow/migration.mdx LANGUAGE: javascript CODE: ``` // old const result = await context.call("call step", "", "POST", ...) // new const { status, // response status headers, // response headers body // response body } = await context.call("call step", { url: "", method: "POST", ... }) ``` ---------------------------------------- TITLE: Configuring Flow Control in Workflow Serve Method (JavaScript) DESCRIPTION: This snippet demonstrates how to apply rate and parallelism limits to the entire workflow execution environment by configuring the `flowControl` option within the `serve` method. It specifies a `key` to group limits, `parallelism` for concurrent executions, `rate` for calls per period, and `period` to define the time window (e.g., '1m' for one minute). This ensures all steps within the workflow respect the defined limits. SOURCE: https://upstash.com/docs/workflow/howto/flow-control.mdx LANGUAGE: JavaScript CODE: ``` export const { POST } = serve( async (context) => { await context.run("step-1", async () => { return someWork(); }); }, { flowControl: { key: "app1", parallelism: 3, rate: 10, period: "1m" } } ); ``` ---------------------------------------- TITLE: Defining a Daily Backup Workflow with Upstash Workflow (Python) DESCRIPTION: This Python snippet defines a FastAPI endpoint using `upstash_workflow.fastapi.Serve` to orchestrate daily data backups. It leverages `context.run` to execute `create_backup` and `upload_backup` functions as distinct workflow steps. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import create_backup, upload_backup app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context: AsyncWorkflowContext[str]) -> None: async def _step1(): return await create_backup() backup = await context.run("create_backup", _step1) async def _step2(): await upload_backup(backup) await context.run("upload_backup", _step2) ``` ---------------------------------------- TITLE: Correctly Returning Results from context.run - Python DESCRIPTION: Illustrates the correct pattern for returning results from `context.run` by directly assigning the awaited result of the `context.run` call to a variable. This ensures the result is available for subsequent steps, even if the workflow endpoint is called multiple times. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return await some_work(input) result = await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` ---------------------------------------- TITLE: Bypassing Vercel Deployment Protection with cURL (Bash) DESCRIPTION: This `curl` command demonstrates how to trigger a workflow on a Vercel preview deployment while bypassing its deployment protection. It requires a previously generated `x-vercel-protection-bypass` secret, which is appended as a query parameter to the workflow URL. The command sends a POST request with 'Hello world!' as data. SOURCE: https://upstash.com/docs/workflow/troubleshooting/vercel.mdx LANGUAGE: bash CODE: ``` curl -X POST \ 'https://vercel-preview.com/workflow?x-vercel-protection-bypass=' \ -d 'Hello world!' ``` ---------------------------------------- TITLE: Avoiding Nested Context Methods - Python DESCRIPTION: Illustrates an incorrect pattern of nesting `context.sleep`, `context.run`, or `context.call` within another `context.run` block. Upstash Workflow methods should be called directly within the main workflow function, not inside nested `context.run` callbacks. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload ``` ---------------------------------------- TITLE: Integrating Agentic AI SDK Tools DESCRIPTION: Integrates pre-built tools from the Agentic SDK, such as a `WeatherClient`, by using `createAISDKTools` to make them available to agents. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { createAISDKTools } from '@agentic/ai-sdk' import { WeatherClient } from '@agentic/weather' const weather = new WeatherClient() const tools = createAISDKTools(weather) ``` ---------------------------------------- TITLE: Implementing Custom Authorization in TypeScript Workflow DESCRIPTION: This TypeScript example illustrates how to implement a custom authorization mechanism within an Upstash workflow by accessing request headers via `context.headers`. It shows how to extract and validate a bearer token from the `Authorization` header, allowing for custom authentication logic before workflow execution. SOURCE: https://upstash.com/docs/workflow/howto/security.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { console.error("Authentication failed."); return; } // Your workflow steps.. }, { failureFunction: async () => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { // ... } }, } ); ``` ---------------------------------------- TITLE: Defining an AI Agent Workflow Endpoint (TypeScript) DESCRIPTION: This TypeScript code defines a Next.js API route (POST handler) that serves an Upstash Workflow endpoint. It initializes an OpenAI agent with a custom communicationTool to process prompts, log inner thoughts, and return a final response. SOURCE: https://upstash.com/docs/workflow/agents/getting-started.mdx LANGUAGE: typescript CODE: ``` import { z } from "zod"; import { tool } from "ai"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ prompt: string }>(async (context) => { const prompt = context.requestPayload.prompt const model = context.agents.openai('gpt-3.5-turbo') const communicatorAgent = context.agents.agent({ model, name: 'communicatorAgent', maxSteps: 2, tools: { communicationTool: tool({ description: 'A tool for informing the caller about your inner thoughts', parameters: z.object({ message: z.string() }), execute: async ({ message }) => { console.log("Inner thought:", message) return "success" } }) }, background: 'Answer questions directed towards you.' + ' You have access to a tool to share your inner thoughts' + ' with the caller. Utilize this tool at least once before' + ' answering the prompt. In your inner thougts, briefly' + ' explain what you will talk about and why. Keep your' + ' answers brief.', }) const task = context.agents.task({ agent: communicatorAgent, prompt }) const { text } = await task.run() console.log("Final response:", text); }) ``` ---------------------------------------- TITLE: Scheduling Weekly User Summaries on Sign-Up (Python) DESCRIPTION: This Python snippet defines a FastAPI endpoint for user sign-up. It registers a user, calculates a future date (7 days from sign-up), generates a CRON expression, and schedules a weekly account summary using `qstash` client, ensuring each user receives their first report after 7 days. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ``` ---------------------------------------- TITLE: Implementing Custom Authorization in Python Workflow (FastAPI) DESCRIPTION: This Python snippet demonstrates how to integrate a custom authorization method into an Upstash workflow using FastAPI. It accesses the `Authorization` header from the `context` object to extract and validate a bearer token, enabling personalized security checks for incoming requests. SOURCE: https://upstash.com/docs/workflow/howto/security.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: auth_header = context.headers.get("authorization") bearer_token = auth_header.split(" ")[1] if auth_header else None if not is_valid(bearer_token): print("Authentication failed.") return # Your workflow steps... ``` ---------------------------------------- TITLE: Preventing Authorization Errors from Early Workflow Returns DESCRIPTION: This snippet illustrates that returning from a workflow function before any steps have been executed will result in an HTTP 400 status code and an authentication error. This behavior is part of the custom authorization mechanism, where an early return without step execution is interpreted as an authentication failure. SOURCE: https://upstash.com/docs/workflow/troubleshooting/general.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { if (someCondition()) => { return; } // rest of the workflow }) ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: if some_condition(): return # rest of the workflow ``` ---------------------------------------- TITLE: Defining a Basic Upstash Workflow Endpoint in FastAPI DESCRIPTION: This FastAPI endpoint defines a simple Upstash Workflow with two sequential asynchronous steps. Each step executes a print statement, demonstrating the basic `context.run` functionality for defining workflow logic. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ``` ---------------------------------------- TITLE: Defining Express.js Workflow Endpoint DESCRIPTION: Defines an Express.js POST endpoint (`/workflow`) that serves an Upstash Workflow. It uses `serve` from `@upstash/workflow/express` to handle workflow execution, including two steps (`step1` and `step2`) that process a message from the request payload and log it. SOURCE: https://upstash.com/docs/workflow/quickstarts/express.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string }>( async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) } ) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ``` ---------------------------------------- TITLE: Making External HTTP Calls within Upstash Workflow DESCRIPTION: This workflow demonstrates how to make an external HTTP POST request to another endpoint (`/get-data`) using `context.call`. It processes an initial input, calls the external service, and then processes the response, illustrating integration with other services. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Dict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.post("/get-data") async def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.post("/call") async def call(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) response: CallResponse[Dict[str, str]] = await context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) async def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output await context.run("step2", _step2) ``` ---------------------------------------- TITLE: Executing Parallel Workflow Steps in TypeScript DESCRIPTION: This snippet illustrates the core concept of running multiple Upstash Workflow steps concurrently. It uses `Promise.all` with `ctx.run` to execute three distinct steps in parallel, collecting their results simultaneously. SOURCE: https://upstash.com/docs/workflow/howto/parallel-runs.mdx LANGUAGE: typescript CODE: ``` const [result1, result2, result3] = await Promise.all([ ctx.run("parallel-step-1", async () => { ... }), ctx.run("parallel-step-2", async () => { ... }), ctx.run("parallel-step-3", async () => { ... }), ]) ``` ---------------------------------------- TITLE: Syncing User Data to Database DESCRIPTION: This snippet demonstrates the initial step of synchronizing user data received from the authentication provider webhook into the application's internal database. It uses `context.run` to encapsulate the database operation, ensuring atomicity and retries within the Upstash Workflow. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); ``` LANGUAGE: Python CODE: ``` async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] ``` ---------------------------------------- TITLE: Pausing Workflow Execution with context.sleep (Python) DESCRIPTION: Illustrates pausing a workflow for a specific duration using `context.sleep` in Python. After user sign-in, the workflow pauses for one day before proceeding to send a welcome email, managing timed events. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d") async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ``` ---------------------------------------- TITLE: Executing Workflow Steps Serially with context.run (Python) DESCRIPTION: Demonstrates serial execution of workflow steps in Python using `context.run`. Asynchronous functions are defined and passed to `context.run` to ensure sequential processing of `some_work` and `some_other_work`. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1(): return some_work(input) result1 = await context.run("step-1", _step1) async def _step2(): return some_other_work(result1) await context.run("step-2", _step2) ``` ---------------------------------------- TITLE: Configuring QStash Receiver for Request Verification in TypeScript DESCRIPTION: This snippet demonstrates how to integrate the QStash Receiver with an Upstash Workflow Next.js endpoint in TypeScript. It ensures that only requests originating from QStash can trigger the workflow by verifying the signing keys. The `currentSigningKey` and `nextSigningKey` are crucial for this verification and should be sourced from the QStash dashboard. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ // 👇 grab these variables from your QStash dashboard currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!, }) } ); ``` ---------------------------------------- TITLE: Defining Workflow Endpoint with Request Object (Next.js Pages Router) DESCRIPTION: Example of a workflow endpoint in Next.js Pages Router (`src/pages/api/workflow.ts`) that provides access to the native `NextApiRequest` and `NextApiResponse` objects. This allows for custom request/response handling before the workflow steps are processed. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: typescript CODE: ``` import type { NextApiRequest, NextApiResponse } from "next"; import { servePagesRouter } from "@upstash/workflow/nextjs"; export default async function handler( req: NextApiRequest, res: NextApiResponse ) { // do something with the native request object const { handler } = servePagesRouter( async (context) => { // Your workflow steps } ) await handler(req, res) } ``` ---------------------------------------- TITLE: Waiting for External Events with context.waitForEvent - Upstash Workflow - JavaScript DESCRIPTION: This snippet illustrates how `context.waitForEvent` pauses a workflow run until an external notification is received or a specified timeout occurs. It demonstrates defining a step name, an event ID, and an optional timeout duration, returning `eventData` and a `timeout` boolean indicating if the step timed out. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { eventData, // data passed in notify timeout, // boolean denoting whether the step was notified or timed out } = await context.waitForEvent("wait for some event", "my-event-id", { timeout: "1000s", // 1000 second timeout }); }); ``` ---------------------------------------- TITLE: Pausing Workflow for Event with context.waitForEvent (TypeScript) DESCRIPTION: This method pauses the execution of an Upstash workflow, waiting for an external event identified by an `eventId`. It returns the `eventData` when the event is received or a `timeout` flag if the specified duration is exceeded. The `timeout` parameter defines the maximum waiting time in seconds. SOURCE: https://upstash.com/docs/workflow/howto/events.mdx LANGUAGE: TypeScript CODE: ``` const { eventData, timeout } = await context.waitForEvent( "description of event", "event-id", { timeout: timeoutInSeconds, } ); ``` ---------------------------------------- TITLE: Pausing Workflow to Wait for External Event DESCRIPTION: This TypeScript code demonstrates how an Upstash Workflow pauses execution using `context.waitForEvent` to await an external event. It specifies an `eventId` and includes a 10-minute timeout, ensuring the workflow resumes upon event reception or gracefully handles a timeout. SOURCE: https://upstash.com/docs/workflow/examples/waitForEvent.mdx LANGUAGE: typescript CODE: ``` const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" // 10 minutes timeout } ); if (timeout) { // end workflow in case of timeout return; } ``` ---------------------------------------- TITLE: Triggering Upstash Workflow via QStash Publish API (Bash) DESCRIPTION: This `curl` command demonstrates how to trigger an Upstash workflow endpoint using the QStash publish API. It includes setting the `Authorization` header with a bearer token and the `Content-type` header for JSON payload delivery to the specified workflow URL. SOURCE: https://upstash.com/docs/workflow/howto/security.mdx LANGUAGE: bash CODE: ``` curl -XPOST \ -H 'Authorization: Bearer ' \ -H "Content-type: application/json" \ -d '{ "initialData": "hello world" }' \ 'https://qstash.upstash.io/v2/publish/https:///api/workflow' ``` ---------------------------------------- TITLE: Processing Webhook Events with `context.run` in Upstash Workflow DESCRIPTION: This snippet shows how to process specific webhook events, such as `user.created`, using `context.run` in Upstash Workflow. It encapsulates the event handling logic within a trackable step, extracting relevant user data like ID, email, and first name from the event payload for further processing. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // ... Parse and validate the incoming request const user = await context.run( "handle-webhook-event", async () => { if (event.type === "user.created") { const { id: clerkUserId, email_addresses, first_name } = event.data; const primaryEmail = email_addresses.find( (email) => (email.id = event.data.primary_email_address_id) ); if (!primaryEmail) { return false; } return { event: event.type, userId: clerkUserId, email: primaryEmail.email_address, firstName: first_name, } as UserPayload; } return false; } ); }); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Parse and validate the incoming request async def _handle_webhook_event(): if event.type == "user.created": clerk_user_id = event.data["id"] email_addresses = event.data["email_addresses"] first_name = event.data["first_name"] primary_email = next( ( email for email in email_addresses if email.id == event.data["primary_email_address_id"] ), None, ) if not primary_email: return False return { "event": event.type, "user_id": clerk_user_id, "email": primary_email["email_address"], "first_name": first_name, } return False user = await context.run("handle-webhook-event", _handle_webhook_event) ``` ---------------------------------------- TITLE: Configuring Workflow Flow Control (TypeScript) DESCRIPTION: The `flowControl` option allows managing the rate of requests and the maximum number of concurrent requests to the workflow endpoint. It includes `rate` and `parallelism` sub-options to control request frequency and concurrency, respectively. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { flowControl: { key: "aFlowControlKey", rate: 10, parallelism: 3 } } ); ``` ---------------------------------------- TITLE: Avoiding Nested Context Methods - TypeScript DESCRIPTION: Illustrates an incorrect pattern of nesting `context.sleep`, `context.run`, or `context.call` within another `context.run` block. Upstash Workflow methods should be called directly within the main workflow function, not inside nested `context.run` callbacks. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { await context.sleep(...) // ❌ INCORRECT await context.run(...) // ❌ INCORRECT await context.call(...) // ❌ INCORRECT }) }) ``` ---------------------------------------- TITLE: Calling OpenAI Compatible Providers with Custom Base URL (TypeScript) DESCRIPTION: This snippet demonstrates how to configure `context.api.openai.call` to interact with an OpenAI-compatible API provider, such as Deepseek. It shows the use of the `baseURL` parameter to redirect API requests while maintaining the familiar `operation` and `body` structure for chat completions. SOURCE: https://upstash.com/docs/workflow/integrations/openai.mdx LANGUAGE: TypeScript CODE: ``` const { status, body } = await context.api.openai.call( "Call Deepseek", { baseURL: "https://api.deepseek.com", token: process.env.DEEPSEEK_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'" }, { role: "user", content: "User shouts back 'hi!'" } ] } } ); ``` ---------------------------------------- TITLE: Example of Tool Execution within Upstash Workflow (TypeScript) DESCRIPTION: This snippet provides a concise example of how a tool's `execute` function must be wrapped in a `context.run()` call within an Upstash Workflow. This ensures that the tool's operation is treated as a durable workflow step, allowing for proper tracking and retry mechanisms. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) ``` ---------------------------------------- TITLE: Initializing Workflow Context with Upstash Workflow (TypeScript) DESCRIPTION: Demonstrates how to initialize the workflow context in a Next.js environment using `@upstash/workflow/nextjs`. The `serve` function provides the context object for defining workflow logic. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( // 👇 the workflow context async (context) => { // ... } ); ``` ---------------------------------------- TITLE: Canceling Upstash Workflow Run with context.cancel (TypeScript) DESCRIPTION: This snippet demonstrates how to use `context.cancel` to programmatically terminate the current workflow run. It checks a condition (`result.cancel`) and, if true, calls `context.cancel()` to stop the workflow. This is useful for implementing conditional workflow termination logic. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload const result = await context.run("check if canceled", () => { ... }); if (result.cancel) { await context.cancel() // cancel the workflow run } }) ``` ---------------------------------------- TITLE: Example Workflow Logs API Response DESCRIPTION: This JSON object provides a sample successful response from the workflow logs API. It details a single workflow run, including its ID, URL, state, timestamps, and a structured breakdown of its sequential and parallel execution steps. SOURCE: https://upstash.com/docs/workflow/rest/runs/logs.mdx LANGUAGE: json CODE: ``` { "cursor": "1686652644442-12", "runs": [ { "workflowRunId": "wfr_rj0Upr1rvdzGfz96fXNHh", "workflowUrl": "https://feasible-eft-notably.ngrok-free.app/api/call", "workflowState": "RUN_SUCCESS", "workflowRunCreatedAt": 1736340463061, "workflowRunCompletedAt": 1736340464684, "steps": [ { "steps": [ { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_7YoJxFpwkEy5zBp378JgvD6YBDPBEqkBPje2JGTCEUiASMJQ1FwY9", "concurrent": 1, "state": "STEP_SUCCESS", "createdAt": 1736340463064 } ], "type": "sequential" }, { "steps": [ { "stepId": 1, "stepName": "external call", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNCtiJGNsULmt63vFfcZxQ3sfYFKLZe2dKww4BSb2kVF", "out": "1", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340464111 }, { "stepId": 2, "stepName": "external call 2", "stepType": "Run", "callType": "step", "messageId": "msg_26hZCxZCuWyyTWPmSVBrNB882AMRP1TsgzpygELRcLWep4ACNTTsCHhrZuaNLij", "out": "2", "concurrent": 2, "state": "STEP_SUCCESS", "createdAt": 1736340463895 } ], "type": "parallel" } ] } ] } ``` ---------------------------------------- TITLE: Suspending User and Sending Email - TypeScript DESCRIPTION: This TypeScript snippet checks if a user is already suspended. If not, it proceeds to suspend the user's account and sends a notification email about the payment failure. It uses `context.run` to encapsulate these operations, ensuring they are executed within the workflow's transactional context. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: TypeScript CODE: ``` const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( context.requestPayload.email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } ``` ---------------------------------------- TITLE: Storing Processed Images in Cloud Storage in Upstash Workflow (Python) DESCRIPTION: This Python snippet demonstrates storing each processed image in cloud storage. It iterates through the processed images, using `context.run` for each storage operation, and collects the resulting stored image URLs sequentially. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: python CODE: ``` async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ``` ---------------------------------------- TITLE: Pausing Workflow for Initial User Interaction - Upstash Workflow DESCRIPTION: This snippet shows how to introduce a non-blocking pause in the workflow using `context.sleep`. The workflow will pause for 3 days (60 * 60 * 24 * 3 seconds), allowing time for the user to engage with the platform before further automated actions are taken. This sleep is non-blocking, meaning it doesn't consume serverless execution time. SOURCE: https://upstash.com/docs/workflow/examples/customerOnboarding.mdx LANGUAGE: typescript CODE: ``` await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` LANGUAGE: python CODE: ``` await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ``` ---------------------------------------- TITLE: Sending Reminder Email after 7 Days (TypeScript/Python) DESCRIPTION: This snippet orchestrates sending a reminder email after 7 days. It first pauses the workflow using `context.sleep` for 7 days, then retrieves the user's activity statistics via `getUserStats` using `context.run`, and finally calls `sendProblemSolvedEmail` to determine and send the appropriate email. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.sleep("wait", 7 * 24 * 60 * 60); const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); ``` LANGUAGE: Python CODE: ``` await context.sleep("wait", 7 * 24 * 60 * 60) async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) ``` ---------------------------------------- TITLE: Defining a Basic Upstash Workflow Endpoint in Flask DESCRIPTION: This Python snippet demonstrates how to define a basic Upstash Workflow endpoint within a Flask application. It initializes the Flask app and the `Serve` object, then defines a workflow function decorated with `@serve.route` that executes two sequential steps using `context.run`. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Python CODE: ``` from flask import Flask from upstash_workflow.flask import Serve app = Flask(__name__) serve = Serve(app) @serve.route("/api/workflow") def workflow(context) -> None: def _step1() -> None: print("initial step ran") context.run("initial-step", _step1) def _step2() -> None: print("second step ran") context.run("second-step", _step2) ``` ---------------------------------------- TITLE: Defining Minimal Workflow Endpoint (Next.js Pages Router) DESCRIPTION: A minimal example of defining a workflow endpoint using `servePagesRouter` for Next.js Pages Router (`src/pages/api/workflow.ts`). It sets up a basic workflow with two steps, `initial-step` and `second-step`, similar to the App Router example. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: typescript CODE: ``` import { servePagesRouter } from "@upstash/workflow/nextjs"; const { handler } = servePagesRouter( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) export default handler; ``` ---------------------------------------- TITLE: Publishing Message to Upstash Workflow with QStash Client (TypeScript) DESCRIPTION: This snippet illustrates how to trigger an Upstash workflow by publishing a JSON message using the `@upstash/qstash` client. It sends a message to the specified workflow endpoint, allowing for an optional body, headers, and retries for the publish operation. SOURCE: https://upstash.com/docs/workflow/howto/start.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const { messageId } = await client.publishJSON({ url: "https:///", body: { hello: "there!" }, headers: { ... }, retries: 3 }); ``` ---------------------------------------- TITLE: Setting QStash Signing Keys Environment Variables (Bash) DESCRIPTION: This snippet shows how to set the `QSTASH_CURRENT_SIGNING_KEY` and `QSTASH_NEXT_SIGNING_KEY` environment variables. These keys are essential for enabling QStash's built-in request verification, ensuring only authorized clients can trigger your workflow endpoint. SOURCE: https://upstash.com/docs/workflow/howto/security.mdx LANGUAGE: bash CODE: ``` QSTASH_CURRENT_SIGNING_KEY=xxxxxxxxx QSTASH_NEXT_SIGNING_KEY=xxxxxxxxx ``` ---------------------------------------- TITLE: Configuring Failure URL in QStash Workflow (TypeScript) DESCRIPTION: This snippet illustrates how to specify a `failureUrl` within the `serve` method of a QStash workflow. The `failureUrl` defines an external endpoint that will receive a notification when the workflow fails, particularly useful when the service hosting the main workflow URL is unavailable. This allows for external processing or logging of failure events. SOURCE: https://upstash.com/docs/workflow/howto/failures.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureUrl: "https:///workflow-failure", } ); ``` ---------------------------------------- TITLE: Making Third-Party API Calls with Upstash Workflow DESCRIPTION: This code demonstrates how to make an API call to a third-party service (OpenAI) using `context.api.openai.call` (TypeScript) or `context.call` (Python) within an Upstash Workflow. It includes authentication, model parameters, and data to be processed, with the response used for subsequent retry logic. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` const response = await context.api.openai.call(`call-openai`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [createSystemMessage(), createUserMessage(userData)], max_completion_tokens: 150, }, }) ``` LANGUAGE: python CODE: ``` response: CallResponse[Dict[str, Any]] = await context.call( "call-openai", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [createSystemMessage(), create_user_message(user_data)], "max_tokens": 150, }, ) ``` ---------------------------------------- TITLE: Example Workflow Message Logs JSON Response DESCRIPTION: This JSON snippet illustrates a sample successful response (HTTP 200 OK) from the workflow message logs API. It includes a `cursor` for pagination and an `events` array, detailing individual log entries with fields like `time`, `state`, `workflowRunId`, and `stepInfo`. Note that the provided example is truncated. SOURCE: https://upstash.com/docs/workflow/rest/runs/message-logs.mdx LANGUAGE: json CODE: ``` { "cursor": "", "events": [ { "time": 1738788333107, "state": "CREATED", "workflowRunId": "wfr_6MXE3GfddpBMWJM7s5WSRPqwcFm8", "workflowUrl": "http://my-workflow-server.com/workflowEndpoint", "workflowCreatedAt": 1738788333105, "stepInfo": { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_2KxeAKGVEjwDjNK1TVPormoRf7shRyNBpPThVbpvkuZNqri4cXp5nwSajNzAs6UWakvbco3qEPvtjQU3qxqjWarm2kisK", "concurrent": 1, "createdAt": 1738788333106 }, "nextDeliveryTime": 1738788333106 }, { "time": 1738788333107, "state": "RUN_STARTED", ``` ---------------------------------------- TITLE: Implementing Image Processing Workflow with Upstash Workflow in Python DESCRIPTION: This Python example, built with FastAPI and Upstash Workflow, demonstrates a sequential image processing workflow. It retrieves an image, resizes it to different resolutions, applies various filters, and stores the processed images. It uses `context.run` for internal operations and `context.call` for external service interactions, noting the sequential processing due to current `workflow-py` limitations for parallel steps. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ``` ---------------------------------------- TITLE: Conditionally Unsuspending User Account Post-Payment DESCRIPTION: This snippet checks the user's suspension status using `checkSuspension` within a workflow step. If the user is found to be suspended, it proceeds to unsuspend their account via `unsuspendUser`, ensuring the account is reactivated upon successful payment. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } ``` LANGUAGE: python CODE: ``` async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) ``` ---------------------------------------- TITLE: Registering New User and Sending Welcome Email - Upstash Workflow DESCRIPTION: This snippet demonstrates the initial step of the customer onboarding workflow, where a new user is registered. It uses `context.run` to execute an asynchronous function that sends a welcome email to the newly signed-up user, ensuring this operation is tracked and retried by the workflow engine if necessary. SOURCE: https://upstash.com/docs/workflow/examples/customerOnboarding.mdx LANGUAGE: typescript CODE: ``` await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ``` LANGUAGE: python CODE: ``` async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) ``` ---------------------------------------- TITLE: Incorrectly Returning Results from context.run - TypeScript DESCRIPTION: Demonstrates an incorrect pattern for returning results from `context.run` where the `result` variable is declared outside the `context.run` block and assigned within it. This leads to `result` being uninitialized on subsequent workflow endpoint calls. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload let result await context.run("step-1", async () => { result = await someWork(input) }) await context.run("step-2", async () => { await someOtherWork(result) }) }) ``` ---------------------------------------- TITLE: Publishing Message to Upstash Workflow with QStash Client (Python) DESCRIPTION: This snippet demonstrates how to trigger an Upstash workflow by publishing a JSON message using the `qstash` Python client. It sends a message to the specified workflow endpoint, allowing for an optional body, headers, and retries for the publish operation. SOURCE: https://upstash.com/docs/workflow/howto/start.mdx LANGUAGE: Python CODE: ``` from qstash import AsyncQStash client = AsyncQStash("") res = await client.message.publish_json( url="https:///", body={"hello": "there!"}, headers={...}, retries=3, ) message_id = res.message_id ``` ---------------------------------------- TITLE: Fetching Workflow Logs with Python Requests DESCRIPTION: This Python snippet uses the popular `requests` library to perform a GET request for workflow logs. It constructs the request with the appropriate authorization header containing the bearer token for authentication. SOURCE: https://upstash.com/docs/workflow/rest/runs/logs.mdx LANGUAGE: python CODE: ``` import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/logs', headers=headers ) ``` ---------------------------------------- TITLE: Avoiding Time-Dependent Code Outside context.run - TypeScript DESCRIPTION: Demonstrates an incorrect use of time-dependent code (`Date.now()`) outside `context.run`. Such code can lead to non-deterministic workflow execution, as the condition might evaluate differently across multiple calls to the workflow endpoint. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: time-dependent code if (Date.now() % 5 == 2) { await context.run("step-1", () => { // ... }) } else { await context.run("step-2", () => { // ... }) } }) ``` ---------------------------------------- TITLE: Defining a Workflow Failure Function (TypeScript) DESCRIPTION: The `failureFunction` option allows defining a custom asynchronous function that executes when a workflow fails after exhausting all retries. This function receives detailed failure context, status, response, and headers, enabling custom error handling logic. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { failureFunction: async ({ context, // context during failure failStatus, // failure status failResponse, // failure message failHeaders // failure headers }) => { // handle the failure } } ); ``` ---------------------------------------- TITLE: Sending Single Email with Resend using Upstash Workflow (TypeScript) DESCRIPTION: This snippet demonstrates how to send a single email using the `context.api.resend.call` method in an Upstash Workflow. It requires a Resend API token and specifies the sender, recipient, subject, and HTML content for the email. The `headers` field ensures the correct content type for the API call. SOURCE: https://upstash.com/docs/workflow/integrations/resend.mdx LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.resend.call( "Call Resend", { token: "", body: { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, headers: { "content-type": "application/json", }, } ); ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with Client (TypeScript) DESCRIPTION: This snippet shows how to programmatically trigger an Upstash Workflow using the `@upstash/workflow` client. It sends a prompt as part of the request body to a specified workflow endpoint, initiating the text generation process. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: { "prompt": "How is the weather in San Francisco around this time?" }, }); ``` ---------------------------------------- TITLE: Calling Resend API with context.api - Upstash Workflow - TypeScript DESCRIPTION: This snippet demonstrates using `context.api.resend.call` for type-safe integration with the Resend email API within an Upstash Workflow. It shows how to specify the API key, the email body including sender, recipient, subject, and HTML content, and optional headers for sending emails. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.resend.call("Call Resend", { token: "", body: { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, headers: { "content-type": "application/json", }, }); ``` ---------------------------------------- TITLE: Fetching Workflow Logs with cURL DESCRIPTION: This cURL command demonstrates how to retrieve workflow execution logs from the Upstash QStash API. It performs a GET request to the logs endpoint, requiring an authorization bearer token in the header for successful authentication. SOURCE: https://upstash.com/docs/workflow/rest/runs/logs.mdx LANGUAGE: sh CODE: ``` curl https://qstash.upstash.io/v2/workflows/logs \ -H "Authorization: Bearer " ``` ---------------------------------------- TITLE: Fetching Workflow Message Logs with cURL DESCRIPTION: This snippet demonstrates how to retrieve workflow message logs using the `curl` command-line tool. It sends a GET request to the `/v2/workflows/messageLogs` endpoint, requiring an Authorization header with a Bearer token for authentication. SOURCE: https://upstash.com/docs/workflow/rest/runs/message-logs.mdx LANGUAGE: sh CODE: ``` curl https://qstash.upstash.io/v2/workflows/messageLogs \ -H "Authorization: Bearer " ``` ---------------------------------------- TITLE: Sending Welcome Email (TypeScript/Python) DESCRIPTION: This snippet sends a welcome email to the user upon trial initiation. It utilizes `context.run` to execute the `sendEmail` function, providing the user's email and a welcome message, including details about the 14-day free trial duration. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); ``` LANGUAGE: Python CODE: ``` async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ``` ---------------------------------------- TITLE: Incorrectly Returning Results from context.run - Python DESCRIPTION: Illustrates an incorrect pattern for returning results from `context.run` where the `result` variable is declared outside the `context.run` block and assigned within it using `nonlocal`. This causes `result` to be uninitialized on subsequent workflow endpoint calls. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload result = None async def _step_1() -> Dict: nonlocal result result = await some_work(input) await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ``` ---------------------------------------- TITLE: Avoiding Random Code Outside context.run - TypeScript DESCRIPTION: Shows an incorrect example of using random code (`Math.random()`) outside `context.run`. Randomness makes the workflow non-deterministic, meaning it might produce different results or take different paths on re-execution, which is problematic for reliable workflows. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: random code if (Math.floor(Math.random() * 10) % 5 == 2) { await context.run("step-1", () => { // ... }) } else { await context.run("step-2", () => { // ... }) } }) ``` ---------------------------------------- TITLE: Resizing Image to Multiple Resolutions in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet illustrates resizing an image to predefined resolutions (640, 1280, 1920) using an external image processing service. It leverages `context.call` for each resize operation and orchestrates parallel execution using `Promise.all`, sending the original image URL and desired width to the service. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) ``` ---------------------------------------- TITLE: Defining a Nuxt.js Workflow Endpoint DESCRIPTION: Demonstrates how to create a workflow endpoint in Nuxt.js using @upstash/workflow/h3, defining sequential steps (initial-step, second-step) that execute business logic and are automatically retried on failure. SOURCE: https://upstash.com/docs/workflow/quickstarts/nuxt.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/h3" const { handler } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, ) export default handler; ``` ---------------------------------------- TITLE: Defining a SolidJS Upstash Workflow endpoint DESCRIPTION: This TypeScript code defines a SolidJS API endpoint (`routes/api/workflow.ts`) that serves an Upstash Workflow. It uses `serve` from `@upstash/workflow/solidjs` to create a POST handler, demonstrating two sequential steps (`initial-step`, `second-step`) that log messages to the console. SOURCE: https://upstash.com/docs/workflow/quickstarts/solidjs.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/solidjs" export const { POST } = serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ``` ---------------------------------------- TITLE: Defining Upstash Workflow Endpoint in Cloudflare Worker DESCRIPTION: This TypeScript code defines a Cloudflare Worker endpoint that serves an Upstash Workflow. It imports `serve` from `@upstash/workflow/cloudflare` and uses `context.run` to define sequential, retryable steps within the workflow, processing an initial payload and passing results between steps. SOURCE: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/cloudflare" interface Env { ENVIRONMENT: "development" | "production" } export default serve<{ text: string }>( async (context) => { const initialPayload = context.requestPayload.text const result = await context.run("initial-step", async () => { console.log(`Step 1 running with payload: ${initialPayload}`) return { text: "initial step ran" } } ) await context.run("second-step", async () => { console.log(`Step 2 running with result from step 1: ${result.text}`) }) } ) ``` ---------------------------------------- TITLE: Re-throwing WorkflowAbort in try/catch Blocks DESCRIPTION: This snippet demonstrates the correct way to handle WorkflowAbort errors when context.run or other steps are wrapped in a try/catch block. The WorkflowAbort error, thrown by the SDK upon successful step execution, must be re-thrown to prevent unintended error handling, while other exceptions can be processed normally. SOURCE: https://upstash.com/docs/workflow/troubleshooting/general.mdx LANGUAGE: TypeScript CODE: ``` import { WorkflowAbort } from '@upstash/workflow'; try { await context.run( ... ); } catch (error) { if (error instanceof WorkflowAbort) { throw error; } else { // handle other errors } } ``` LANGUAGE: Python CODE: ``` from upstash_workflow import WorkflowAbort try: await context.run( ... ) except Exception as e: if isinstance(e, WorkflowAbort): raise e else: # handle other errors ``` ---------------------------------------- TITLE: Setting Workflow Base URL DESCRIPTION: The `baseUrl` option provides an alternative to `url`, allowing modification of only the base part of the inferred URL, rather than replacing the entire URL. This is beneficial for local development with tunnels or when setting a global base URL via an environment variable like `UPSTASH_WORKFLOW_URL`. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, // options: { baseUrl: "" } ); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example", base_url="") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Retrieving Workflow Run Logs with Upstash Workflow Client (TypeScript) DESCRIPTION: This snippet demonstrates how to fetch workflow run logs using the `client.logs` method. It illustrates the use of various optional parameters like `workflowRunId`, `count`, `state`, `workflowUrl`, `workflowCreatedAt`, and `cursor` to filter and paginate results. The method returns a `runs` array containing workflow details and a `cursor` for subsequent requests. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { runs, cursor } = await client.logs({ // Id of the workflow run to get workflowRunId, // Number of workflows to get count, // Workflow state to filter for. // One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED" state, // Workflow url to search for. should be an exact match workflowUrl, // Unix timestamp when the run was created workflowCreatedAt, // Cursor from a previous request to continue the search cursor }) ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with Python SDK DESCRIPTION: This Python snippet illustrates how to trigger an Upstash Workflow endpoint using the `qstash` Python SDK. It shows the initialization of `AsyncQStash` with a token and then uses `client.message.publish_json` to send a JSON payload to the specified workflow URL, including optional headers and retry configurations. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: Python CODE: ``` from qstash import AsyncQStash client = AsyncQStash("") res = await client.message.publish_json( url="https:///", body={"hello": "there!"}, headers={...}, retries=3, ) ``` ---------------------------------------- TITLE: Deploying Cloudflare Worker to Production DESCRIPTION: This command deploys your Cloudflare Worker to production using the Cloudflare CLI tool, `wrangler`. Ensure all necessary environment variables are set in your Cloudflare Worker project settings before deployment. SOURCE: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers.mdx LANGUAGE: bash CODE: ``` wrangler deploy ``` ---------------------------------------- TITLE: Fetching Workflow Logs with Node.js Fetch API DESCRIPTION: This Node.js example shows how to retrieve workflow logs using the native Fetch API. It sends a GET request to the QStash API's logs endpoint, including the necessary authorization bearer token in the request headers. SOURCE: https://upstash.com/docs/workflow/rest/runs/logs.mdx LANGUAGE: javascript CODE: ``` const response = await fetch("https://qstash.upstash.io/v2/workflows/logs", { headers: { Authorization: "Bearer ", }, }); ``` ---------------------------------------- TITLE: Sending Order Confirmation Email in Workflow DESCRIPTION: This TypeScript code illustrates the final step in the workflow: sending a confirmation email to the user. Using `context.run`, it ensures that the email is dispatched after the order has been successfully processed and relevant data is available. SOURCE: https://upstash.com/docs/workflow/examples/waitForEvent.mdx LANGUAGE: typescript CODE: ``` await context.run("send-confirmation-email", async () => { await sendEmail( userEmail, "Your order has been processed!", processedData ); }); ``` ---------------------------------------- TITLE: Configuring OpenAI Compatible Model DESCRIPTION: Configures an OpenAI-compatible model by specifying a custom `baseURL` and `apiKey`, allowing integration with alternative providers like DeepSeek. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` const model = context.agents.openai('gpt-3.5-turbo', { baseURL: "https://api.deepseek.com", apiKey: process.env.DEEPSEEK_API_KEY }) ``` ---------------------------------------- TITLE: Sending Trial Ended Email (TypeScript/Python) DESCRIPTION: This snippet sends a final email to the user once their trial period has officially concluded. It uses `context.sleep` to wait for the remaining 2 days of the trial, then `context.run` to execute the `sendEmail` function, informing the user that their trial has ended and prompting them to upgrade their plan. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); ``` LANGUAGE: Python CODE: ``` await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` ---------------------------------------- TITLE: Using Anthropic Model with AI SDK DESCRIPTION: Demonstrates how to integrate an Anthropic model using the AI SDK, requiring the `createAnthropic` provider and an API key for authentication. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { createAnthropic } from "@ai-sdk/anthropic"; const model = context.agents.AISDKModel({ context, provider: createAnthropic, providerParams: { apiKey: "", }, }); ``` ---------------------------------------- TITLE: Configuring LLM Call Parameters DESCRIPTION: Configures advanced call settings for an LLM model, including optional timeout, retries, and flow control parameters, to manage API interactions and reliability. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` const model = context.agents.openai('gpt-3.5-turbo', { callSettings: { timeout: 1000, // optional request timeout retries: 0, // optional retries flowControl: { // optional flow control key: "flow-control-key", rate: 10, period: "10s", parallelism: 10, }, } }) ``` ---------------------------------------- TITLE: Cancelling a Workflow Run with Upstash Workflow Client (JavaScript) DESCRIPTION: This snippet demonstrates how to programmatically cancel a specific Upstash Workflow run using the `@upstash/workflow` JavaScript client. It initializes the client with an authentication token and then calls the `cancel` method, specifying the workflow run ID to be terminated. This method is used to stop a workflow that is currently in progress. SOURCE: https://upstash.com/docs/workflow/howto/cancel.mdx LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.cancel({ ids: "" }); ``` ---------------------------------------- TITLE: Downloading Dataset in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet demonstrates how to retrieve a dataset URL and then download the dataset within an Upstash Workflow. It uses `context.run` for fetching the URL (ensuring idempotency) and `context.call` for the actual HTTP download, which supports much longer timeouts than standard serverless functions. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) ``` ---------------------------------------- TITLE: Configuring Environment Variables for Local Tunnel (.env) DESCRIPTION: These lines configure the `.env` file for local development using a public tunnel. `QSTASH_TOKEN` authenticates with the production QStash, and `UPSTASH_WORKFLOW_URL` specifies the public URL provided by the tunnel, allowing production QStash to reach the local endpoint and log workflows in the Upstash Console. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi.mdx LANGUAGE: bash CODE: ``` export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Configuring .env for Local Tunnel Development DESCRIPTION: This snippet illustrates the environment variables required in the `.env` file when using a local tunnel for development. It includes the QStash token and the public URL provided by the tunnel, connecting the local endpoint to production QStash for logging. SOURCE: https://upstash.com/docs/workflow/quickstarts/astro.mdx LANGUAGE: txt CODE: ``` QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Sending Invoice Email After Successful Payment DESCRIPTION: This snippet sends an invoice email to the user upon successful payment. It leverages `context.run` to execute the `sendEmail` function, dynamically including invoice details like `invoiceId` and `totalCost` obtained from the successful payment result. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); ``` LANGUAGE: python CODE: ``` async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) ``` ---------------------------------------- TITLE: Fetching Workflow Message Logs with Python Requests DESCRIPTION: This Python snippet utilizes the `requests` library to perform a GET request to the `/v2/workflows/messageLogs` endpoint. It sets the `Authorization` header with a Bearer token to authenticate the request and retrieve workflow logs. SOURCE: https://upstash.com/docs/workflow/rest/runs/message-logs.mdx LANGUAGE: python CODE: ``` import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/messageLogs', headers=headers ) ``` ---------------------------------------- TITLE: Implementing Weekly User Summary Workflow (Python) DESCRIPTION: This Python snippet defines a FastAPI endpoint using `upstash_workflow.fastapi.Serve` for processing weekly user summaries. It fetches user data, generates the summary, and sends an email, all orchestrated as distinct steps using `context.run`. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import get_user_data, generate_summary, send_email app = FastAPI() serve = Serve(app) @dataclass class WeeklySummaryData: user_id: str @serve.post("/api/send-weekly-summary") async def send_weekly_summary(context: AsyncWorkflowContext[WeeklySummaryData]) -> None: user_id = context.request_payload.user_id # Step 1: Fetch user data async def _step1(): return await get_user_data(user_id) user = await context.run("fetch_user_data", _step1) # Step 2: Generate weekly summary async def _step2(): return await generate_summary(user_id) summary = await context.run("generate_summary", _step2) # Step 3: Send email with weekly summary async def _step3(): await send_email(user.email, "Your Weekly Summary", summary) await context.run("send_summary_email", _step3) ``` ---------------------------------------- TITLE: Configuring Workflow Failure URL (TypeScript) DESCRIPTION: This configuration option allows specifying a URL that the workflow will call if it exhausts all retries and ultimately fails. The specified URL will receive a failure callback payload, including the error message in its body field. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { failureUrl: "https:///..." } ); ``` ---------------------------------------- TITLE: Pausing Workflow Until Specific Timestamp with context.sleep_until (Python) DESCRIPTION: Demonstrates pausing a workflow until a specific future datetime using `context.sleep_until` in Python. The workflow calculates a date one week ahead and waits until then before sending a welcome email, managing scheduled events. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from datetime import datetime, timedelta from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Calculate the date for one week from now one_week_from_now = datetime.now() + timedelta(days=7) # 👇 Wait until the calculated date await context.sleep_until("wait-for-one-week", one_week_from_now) async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ``` ---------------------------------------- TITLE: Updating Next.js Serve Method Usage DESCRIPTION: Illustrates the change in how the `serve` method's return value is handled in Next.js. The new SDK requires destructuring the `POST` method from the `serve` function's return, making it easier to extend the API in the future. SOURCE: https://upstash.com/docs/workflow/migration.mdx LANGUAGE: javascript CODE: ``` // old export const POST = serve(...); // new export const { POST } = serve(...); ``` ---------------------------------------- TITLE: Notifying Workflows using Upstash Workflow SDK (JavaScript) DESCRIPTION: This JavaScript snippet uses the `@upstash/workflow` SDK to notify an event. It initializes a `Client` with a QStash token and then calls the `notify` method, providing the `eventId` and optional `eventData`. SOURCE: https://upstash.com/docs/workflow/rest/runs/notify.mdx LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.notify({ eventId: "my-event-id", eventData: "Hello World!" }); ``` ---------------------------------------- TITLE: Deploying Hono App to Cloudflare Workers DESCRIPTION: Deploys the Hono application, including the Upstash Workflow endpoint, to Cloudflare Workers using the `wrangler deploy` command. SOURCE: https://upstash.com/docs/workflow/quickstarts/hono.mdx LANGUAGE: bash CODE: ``` wrangler deploy ``` ---------------------------------------- TITLE: Bulk Canceling Workflows using Upstash Workflow SDK (JavaScript) DESCRIPTION: This JavaScript example shows how to use the Upstash Workflow SDK to cancel multiple workflow runs. It demonstrates three methods: canceling by a list of specific IDs, canceling by a URL prefix, and canceling all workflow runs. SOURCE: https://upstash.com/docs/workflow/rest/runs/bulk-cancel.mdx LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; // cancel a set of workflow runs await client.cancel({ ids: [ "", "", ]}) // cancel workflows starting with a url await client.cancel({ urlStartingWith: "https://your-endpoint.com" }) // cancel all workflows await client.cancel({ all: true }) ``` ---------------------------------------- TITLE: Verifying Stock Availability in Upstash Workflow DESCRIPTION: This snippet demonstrates creating an order ID and verifying item stock availability within an Upstash Workflow. If items are out of stock, the process is halted to prevent further processing of an unfulfillable order. It requires `createOrderId` and `checkStockAvailability` utility functions. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } ``` LANGUAGE: python CODE: ``` async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ``` ---------------------------------------- TITLE: Defining Custom LangChain Structured Tool DESCRIPTION: Defines a custom `DynamicStructuredTool` using LangChain, which generates a random number within a specified range, demonstrating custom tool creation. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { DynamicStructuredTool } from "@langchain/core/tools"; const numberGenerator = new DynamicStructuredTool({ name: "random-number-generator", description: "generates a random number between two input numbers", schema: z.object({ low: z.number().describe("The lower bound of the generated number"), high: z.number().describe("The upper bound of the generated number"), }), func: async ({ low, high }) => (Math.random() * (high - low) + low).toString(), // Outputs still must be strings }) ``` ---------------------------------------- TITLE: Processing Payment in Upstash Workflow DESCRIPTION: This snippet handles the payment processing step for a given order ID within the workflow. It ensures that payment is successfully processed before the workflow proceeds to dispatch the order. This step depends on the `processPayment` utility function. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` await context.run("process-payment", async () => { return await processPayment(orderId) }) ``` LANGUAGE: python CODE: ``` async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) ``` ---------------------------------------- TITLE: Starting Local Upstash QStash Server DESCRIPTION: Command to start a local QStash server using the `@upstash/qstash-cli`. This allows for local testing of workflows without incurring billing charges or affecting production logs. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: bash CODE: ``` npx @upstash/qstash-cli dev ``` ---------------------------------------- TITLE: Starting Local Upstash QStash Server DESCRIPTION: This command initiates a local QStash server using the `@upstash/qstash-cli` tool, providing a local endpoint for testing Upstash Workflow without interacting with the production service. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: bash CODE: ``` npx @upstash/qstash-cli dev ``` ---------------------------------------- TITLE: Importing Workflow `serve` Methods (Python) DESCRIPTION: Imports the `serve` and `async_serve` methods from the `upstash_workflow` package. These methods provide the foundational integration points for using Upstash Workflow with custom Python platforms. SOURCE: https://upstash.com/docs/workflow/quickstarts/platforms.mdx LANGUAGE: Python CODE: ``` from upstash_workflow import serve, async_serve ``` ---------------------------------------- TITLE: Starting Local QStash Server DESCRIPTION: This command initiates a local QStash server using the `@upstash/qstash-cli` tool. Running a local server allows for testing Upstash Workflows during development without incurring costs or affecting production environments. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Bash CODE: ``` npx @upstash/qstash-cli dev ``` ---------------------------------------- TITLE: Creating New User in Stripe DESCRIPTION: This step integrates with Stripe to create a new customer record after a user is created via the authentication provider. It ensures that the user's email is registered in Stripe, preparing for subsequent billing and trial management operations. The `context.run` ensures this external API call is managed by the workflow. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); ``` LANGUAGE: Python CODE: ``` async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) ``` ---------------------------------------- TITLE: Aggregating Intermediate Results in Upstash Workflow (Python) DESCRIPTION: This Python snippet demonstrates the conditional aggregation of processed data chunks within an Upstash Workflow. It uses `context.run` to perform the aggregation operation every 10 chunks or when all chunks have been processed, ensuring that intermediate results are persisted and the `processed_chunks` list is reset. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: python CODE: ``` if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) ``` ---------------------------------------- TITLE: Configuring QStash Receiver in Python Workflow (Explicit Keys) DESCRIPTION: This Python snippet demonstrates how to integrate QStash request verification by passing `current_signing_key` and `next_signing_key` to the `Receiver` within the `serve.post` decorator. It allows explicit configuration of signing keys for workflow endpoints, typically retrieved from environment variables. SOURCE: https://upstash.com/docs/workflow/howto/security.mdx LANGUAGE: python CODE: ``` from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Missing Workflow Step - TypeScript DESCRIPTION: This TypeScript workflow demonstrates an incorrect implementation where no `context.run` call is made. Workflows without at least one step will fail with a 'Failed to authenticate Workflow request.' error, as the authentication mechanism relies on step execution. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: No context.run call console.log("Processing input:", input) // This workflow will fail with "Failed to authenticate Workflow request." }) ``` ---------------------------------------- TITLE: Missing Workflow Step - Python DESCRIPTION: This Python workflow illustrates an incorrect setup where `context.run` is omitted. Upstash workflows require at least one step execution to authenticate successfully; otherwise, they will result in a 'Failed to authenticate Workflow request.' error. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 Problem: No context.run call print("Processing input:", input) # This workflow will fail with "Failed to authenticate Workflow request." ``` ---------------------------------------- TITLE: Starting Trial in Stripe (TypeScript/Python) DESCRIPTION: This snippet initiates a user's trial period within the Stripe platform. It uses `context.run` to encapsulate the asynchronous operation `startTrialInStripe`, ensuring it's part of the workflow context. The `email` parameter is used to identify the user for trial activation. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); ``` LANGUAGE: Python CODE: ``` async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ``` ---------------------------------------- TITLE: Customizing Request and Response Types for OpenAI Calls (TypeScript) DESCRIPTION: This example illustrates how to override the default type definitions for the request and response bodies when using `context.api.openai.call`. By defining `ResponseBodyType` and `RequestBodyType`, developers can ensure type safety for custom API interactions beyond the SDK's predefined types. SOURCE: https://upstash.com/docs/workflow/integrations/openai.mdx LANGUAGE: TypeScript CODE: ``` type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.openai.call< ResponseBodyType, RequestBodyType >( "Call OpenAI", { ... } ); ``` ---------------------------------------- TITLE: Starting Local Upstash QStash Development Server (Bash) DESCRIPTION: This command initiates a local QStash development server using the `@upstash/qstash-cli`. This server allows for local testing of Upstash Workflows without incurring production costs or affecting live logs in the Upstash Console. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi.mdx LANGUAGE: bash CODE: ``` npx @upstash/qstash-cli dev ``` ---------------------------------------- TITLE: Starting Local QStash Server DESCRIPTION: Runs the Upstash QStash CLI in development mode to start a local QStash server, providing local `QSTASH_URL` and `QSTASH_TOKEN` for testing workflows without affecting billing. SOURCE: https://upstash.com/docs/workflow/quickstarts/hono.mdx LANGUAGE: bash CODE: ``` npx @upstash/qstash-cli dev ``` ---------------------------------------- TITLE: Defining Custom AI SDK Tool for Math DESCRIPTION: Defines a custom tool compatible with AI SDK using `ai` and `zod`, allowing agents to evaluate mathematical expressions. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { z } from 'zod' import { tool } from 'ai' import * as mathjs from 'mathjs' const mathTool = tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }) ``` ---------------------------------------- TITLE: Cancelling Workflow Run with Node.js Fetch API DESCRIPTION: Illustrates cancelling a workflow run using the native Fetch API in Node.js. This method directly sends a DELETE request to the QStash API endpoint, including the necessary authorization header for authentication. SOURCE: https://upstash.com/docs/workflow/rest/runs/cancel.mdx LANGUAGE: js CODE: ``` const response = await fetch('https://qstash.upstash.io/v2/workflows/runs/wfr_TzazoUCuZmFGp2u9cdy5K', { method: 'DELETE', headers: { 'Authorization': 'Bearer ' } }); ``` ---------------------------------------- TITLE: Generating and Sending Report in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet outlines the final steps in the Upstash Workflow: generating a comprehensive report based on aggregated data and then sending it to the specified user. Both operations are wrapped in `context.run` to ensure idempotency and reliable execution within the workflow. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) ``` ---------------------------------------- TITLE: Using Custom Environment Variables with `env` Option in Python DESCRIPTION: This Python snippet demonstrates how to explicitly pass environment variables to an Upstash Workflow endpoint using the `env` option. This allows for custom environment configurations, overriding or supplementing default `os.environ` behavior. The provided dictionary will be accessible within the workflow context. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: Python CODE: ``` @serve.post( "/api/example", env={ "QSTASH_CURRENT_SIGNING_KEY": os.environ["QSTASH_CURRENT_SIGNING_KEY"], "QSTASH_NEXT_SIGNING_KEY": os.environ["QSTASH_NEXT_SIGNING_KEY"], }, ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Enabling Workflow Verbose Logging (TypeScript) DESCRIPTION: Enabling `verbose` mode provides detailed insights into the workflow's operations by generating structured log entries. This mode helps in debugging and understanding the execution flow by logging various events like endpoint starts, step executions, and third-party call results. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { verbose: true } ); ``` ---------------------------------------- TITLE: Updating Serve Method Imports in TypeScript DESCRIPTION: Demonstrates how to change the import path for the `serve` method from `@upstash/qstash/nextjs` to `@upstash/workflow/nextjs` when migrating to the new SDK. This ensures the correct `serve` function is used for Upstash Workflow. SOURCE: https://upstash.com/docs/workflow/migration.mdx LANGUAGE: typescript CODE: ``` // old import { serve } from "@upstash/qstash/nextjs" // new import { serve } from "@upstash/workflow/nextjs" ``` ---------------------------------------- TITLE: Overriding Anthropic API Call Types in TypeScript DESCRIPTION: This snippet illustrates how to customize the request and response body types for the `context.api.anthropic.call` method. By defining `ResponseBodyType` and `RequestBodyType`, developers can enforce specific data structures for the API interaction, enhancing type safety and code clarity beyond the SDK's predefined types. SOURCE: https://upstash.com/docs/workflow/integrations/anthropic.mdx LANGUAGE: TypeScript CODE: ``` type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.anthropic.call< ResponseBodyType, RequestBodyType >( "Call Anthropic", { ... } ); ``` ---------------------------------------- TITLE: Cancelling Workflow Runs by URL Prefix (TypeScript) DESCRIPTION: This snippet illustrates how to cancel all workflow runs that were initiated with a URL matching a specified prefix. The `urlStartingWith` parameter allows for broad cancellation across multiple workflows under a common endpoint. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: typescript CODE: ``` await client.cancel({ urlStartingWith: "https://your-endpoint.com" }); ``` ---------------------------------------- TITLE: Deploying Project to Vercel Production (Bash) DESCRIPTION: This command deploys the current project to Vercel's production environment. It's crucial for making the Upstash Workflow endpoint publicly accessible and functional, as preview deployments might require additional authentication and not work as expected. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi.mdx LANGUAGE: bash CODE: ``` vercel --prod ``` ---------------------------------------- TITLE: Cancelling Workflow Run with Upstash Workflow SDK (JavaScript) DESCRIPTION: Shows how to cancel a workflow run using the official `@upstash/workflow` SDK in JavaScript. This approach simplifies API interaction by abstracting the underlying HTTP requests, requiring only an Upstash token and the workflow run ID. SOURCE: https://upstash.com/docs/workflow/rest/runs/cancel.mdx LANGUAGE: js CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.cancel({ workflowRunId: "" }) ``` ---------------------------------------- TITLE: Setting Workflow Retries DESCRIPTION: This option allows specifying the number of times QStash will retry calling the workflow endpoint in case of errors. The default value for retries is 3, providing a mechanism for handling transient failures. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { retries: 3 } ); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example", retries=3) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Defining Custom WorkflowTool for Math DESCRIPTION: Defines a custom `WorkflowTool` using `@upstash/workflow` and `zod` for schema validation, enabling agents to evaluate mathematical expressions. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { WorkflowTool } from '@upstash/workflow' import { z } from 'zod' import * as mathjs from 'mathjs' const tool = new WorkflowTool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", schema: z.object({ expression: z.string() }), invoke: async ({ expression }) => mathjs.evaluate(expression), }) ``` ---------------------------------------- TITLE: Creating Environment Variable File in Terminal DESCRIPTION: Creates an empty `.env` file in the project root directory. This file will store sensitive environment variables like QStash API keys and URLs. SOURCE: https://upstash.com/docs/workflow/quickstarts/express.mdx LANGUAGE: bash CODE: ``` touch .env ``` ---------------------------------------- TITLE: Creating .env File for Environment Variables DESCRIPTION: This command creates an empty `.env` file in the project root, which is used to store sensitive environment variables like the QStash token for authentication with the Upstash service. SOURCE: https://upstash.com/docs/workflow/quickstarts/astro.mdx LANGUAGE: bash Terminal CODE: ``` touch .env ``` ---------------------------------------- TITLE: Creating .env file for environment variables DESCRIPTION: This command creates an empty `.env` file in the project root, which will be used to store sensitive environment variables like the QStash token for authentication. SOURCE: https://upstash.com/docs/workflow/quickstarts/solidjs.mdx LANGUAGE: bash CODE: ``` touch .env ``` ---------------------------------------- TITLE: Implementing Fixed Delay Before Next API Retry DESCRIPTION: To prevent overwhelming the third-party API, this snippet introduces a fixed 5-second delay before the next retry attempt. It uses `context.sleep` to pause the workflow, ensuring a controlled back-off strategy regardless of specific rate limit headers. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` await context.sleep("pause-to-avoid-spam", 5) ``` LANGUAGE: python CODE: ``` await context.sleep("pause-to-avoid-spam", 5) ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with REST API (cURL) DESCRIPTION: This Bash snippet provides a command-line example using `curl` to trigger an Upstash Workflow endpoint via a direct REST API POST request. It demonstrates how to specify the target URL and include a JSON body in the request. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: Bash CODE: ``` curl -X POST https:/// -b '{"hello": "there!"}' ``` ---------------------------------------- TITLE: Customizing Initial Payload Parsing DESCRIPTION: This option allows defining a custom function to process the initial request payload when a workflow run starts, especially if the payload is not a standard empty, string, or JSON type. The parsed result is then available via `context.requestPayload` for use within the workflow logic. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` type InitialPayload = { foo: string; bar: number; }; // 👇 1: provide initial payload type export const { POST } = serve( async (context) => { // 👇 3: parsing result is available as requestPayload const payload: InitialPayload = context.requestPayload; }, { // 👇 2: custom parsing for initial payload initialPayloadParser: (initialPayload) => { const payload: InitialPayload = parsePayload(initialPayload); return payload; }, } ); ``` LANGUAGE: Python CODE: ``` @dataclass class InitialPayload: foo: str bar: int def initial_payload_parser(initial_payload: str) -> InitialPayload: return parse_payload(initial_payload) @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[InitialPayload]) -> None: payload: InitialPayload = context.request_payload ``` ---------------------------------------- TITLE: Invoking an Upstash Workflow Endpoint via cURL DESCRIPTION: This `curl` command demonstrates how to invoke the previously defined Upstash Workflow endpoint. It sends a POST request to the specified URL with a JSON payload containing `id` and `question` parameters, simulating a user request to the workflow. SOURCE: https://upstash.com/docs/workflow/howto/monitor.mdx LANGUAGE: bash CODE: ``` curl -XPOST https://qstash-workflow.vercel.app/api/example -d '{"id": "id_123", "question": "what is the meaning of life?"}' ``` ---------------------------------------- TITLE: Defining a Minimal Hono Workflow Endpoint DESCRIPTION: Illustrates how to define a basic Upstash Workflow endpoint in a Hono application, demonstrating the use of `serve` to define sequential steps that log messages. SOURCE: https://upstash.com/docs/workflow/quickstarts/hono.mdx LANGUAGE: typescript CODE: ``` import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ``` ---------------------------------------- TITLE: Configuring Local Tunnel Environment Variables DESCRIPTION: These environment variables are used when setting up a local tunnel. `QSTASH_TOKEN` is obtained from the Upstash Console, and `UPSTASH_WORKFLOW_URL` is the public URL provided by the local tunnel, enabling production QStash integration and logging. SOURCE: https://upstash.com/docs/workflow/quickstarts/svelte.mdx LANGUAGE: txt CODE: ``` QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Invoking a Workflow Endpoint via cURL (Bash) DESCRIPTION: This snippet provides a cURL command example to trigger a workflow exposed via `serveMany` at a specific route. The route name, such as `workflow-one-route`, is inferred from the key provided when defining workflows within `serveMany`. SOURCE: https://upstash.com/docs/workflow/howto/invoke.mdx LANGUAGE: Bash CODE: ``` curl -X POST https://your-app/serve-many/workflow-one-route ``` ---------------------------------------- TITLE: Providing a Custom QStash Client DESCRIPTION: The `qstashClient` option enables passing an explicitly initialized QStash Client instance to the `serve` method. This is useful for scenarios involving multiple QStash clients within the same project, each potentially configured with different environment variables or tokens, overriding the default client initialization. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { qstashClient: new Client({ token: process.env.QSTASH_TOKEN }) } ); ``` LANGUAGE: Python CODE: ``` from qstash import AsyncQStash @serve.post("/api/example", qstash_client=AsyncQStash(os.environ["QSTASH_TOKEN"])) async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` LANGUAGE: TypeScript CODE: ``` new Client({ baseUrl: process.env.QSTASH_URL!, token: process.env.QSTASH_TOKEN!, }); ``` LANGUAGE: Python CODE: ``` AsyncQStash(os.environ["QSTASH_TOKEN"]) ``` ---------------------------------------- TITLE: Dispatching Order in Upstash Workflow DESCRIPTION: This snippet initiates the dispatch of the order after payment confirmation within the workflow. It takes the order ID and item details to prepare the order for delivery. This step relies on the `dispatchOrder` utility function to handle the logistics. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) ``` LANGUAGE: python CODE: ``` async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) ``` ---------------------------------------- TITLE: Applying Filters to Resized Images in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet applies various filters (grayscale, sepia, contrast) to each previously resized image. It iterates through resized images and filters, using `context.call` for each filter application. All these operations are collected as promises and executed in parallel using `Promise.all`. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) ``` ---------------------------------------- TITLE: Bulk Canceling Workflows with Node.js Fetch API DESCRIPTION: This Node.js example uses the native Fetch API to send a DELETE request for bulk workflow cancellation. It specifies the authorization token and content type, and includes an array of workflow run IDs in the request body to be canceled. SOURCE: https://upstash.com/docs/workflow/rest/runs/bulk-cancel.mdx LANGUAGE: javascript CODE: ``` const response = await fetch('https://qstash.upstash.io/v2/workflows/runs', { method: 'DELETE', headers: { 'Authorization': 'Bearer ', 'Content-Type': 'application/json', body: { workflowRunIds: [ "run_id_1", "run_id_2", "run_id_3", ], }, } }); ``` ---------------------------------------- TITLE: Installing Upstash Workflow SDK in Next.js DESCRIPTION: Instructions for installing the `@upstash/workflow` SDK using different package managers (npm, pnpm, bun) in a Next.js project. This is a prerequisite for utilizing Upstash Workflow functionalities. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: bash CODE: ``` npm install @upstash/workflow ``` LANGUAGE: bash CODE: ``` pnpm install @upstash/workflow ``` LANGUAGE: bash CODE: ``` bun add @upstash/workflow ``` ---------------------------------------- TITLE: Notifying Workflows with Node.js Fetch API DESCRIPTION: This Node.js example demonstrates how to send a POST request to the notify endpoint using the native `fetch` API. It sets the `method`, `body`, and `Authorization` header for the request. SOURCE: https://upstash.com/docs/workflow/rest/runs/notify.mdx LANGUAGE: javascript CODE: ``` const response = await fetch('https://qstash.upstash.io/v2/notify/myEvent', { method: 'POST', body: "Hello world!", headers: { 'Authorization': 'Bearer ' } }); ``` ---------------------------------------- TITLE: Retrieving Event Waiters (JavaScript) DESCRIPTION: This snippet shows how to retrieve a list of workflow runs that are currently waiting for a specific event. The `client.getWaiters` method takes an `eventId` and returns a list of `Waiter` objects, providing insight into which workflows are paused awaiting input. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: javascript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ``` ---------------------------------------- TITLE: Configuring Environment Variables for Local QStash Server (.env) DESCRIPTION: These lines configure the `.env` file with the `QSTASH_URL` and `QSTASH_TOKEN` obtained from the local QStash server. These variables are crucial for the application to communicate with the local QStash instance for workflow execution during development. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-fastapi.mdx LANGUAGE: bash CODE: ``` export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Sending Order Confirmation and Dispatch Notifications DESCRIPTION: This snippet handles sending an order confirmation email to the customer and a separate notification once the order has been dispatched. This improves customer experience by keeping them informed throughout the order fulfillment process. It utilizes `sendOrderConfirmation` and `sendDispatchNotification` utilities. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) ``` LANGUAGE: python CODE: ``` async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` ---------------------------------------- TITLE: Setting Up Upstash Workflow with Retry Loop DESCRIPTION: This snippet initializes an Upstash Workflow endpoint and sets up a loop to attempt an external API call multiple times. It defines the entry point for the workflow and establishes the maximum number of retry attempts. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve<{ userData: string }>(async (context) => { for (let attempt = 0; attempt < 10; attempt++) { // TODO: call API in here } }) ``` LANGUAGE: python CODE: ``` @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: for attempt in range(10): # TODO: call API in here ``` ---------------------------------------- TITLE: Configuring .env.local for Local Tunnel DESCRIPTION: Illustrates the environment variables needed when using a local tunnel for development, connecting the local endpoint to the production QStash service, which allows viewing workflow logs in the Upstash Console. SOURCE: https://upstash.com/docs/workflow/quickstarts/nuxt.mdx LANGUAGE: text CODE: ``` QSTASH_TOKEN="***" UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Configuring Local Tunnel Environment Variables (.env) DESCRIPTION: These environment variables are used in the '.env' file when setting up a local tunnel. 'QSTASH_TOKEN' is your actual production QStash token, and 'UPSTASH_WORKFLOW_URL' is the public URL provided by your local tunneling service, allowing workflow logs to appear in the Upstash Console. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-flask.mdx LANGUAGE: bash CODE: ``` export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Configuring .env for Local Tunnel with QStash DESCRIPTION: These environment variables are used when connecting to the production QStash via a local tunnel. `QSTASH_TOKEN` is the actual token from the Upstash Console, and `UPSTASH_WORKFLOW_URL` is the public URL provided by the local tunneling service. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: bash CODE: ``` export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL= ``` ---------------------------------------- TITLE: Accessing Environment Variables from Context in TypeScript DESCRIPTION: This TypeScript snippet illustrates how environment variables, when passed via the `env` option, become accessible through `context.env` within the workflow function. This method is useful for providing custom environment configurations or when `process.env` is not available. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // the env option will be available in the env field of the context: const env = context.env; }, { receiver: new Receiver({ // 👇 grab these variables from your QStash dashboard currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!, }), } ); ``` ---------------------------------------- TITLE: Calling Anthropic API with context.api - Upstash Workflow - TypeScript DESCRIPTION: This snippet illustrates how to use `context.api.anthropic.call` for type-safe integration with the Anthropic API in an Upstash Workflow. It details specifying the API key, the operation (e.g., `messages.create`), and the request body including the model, max tokens, and messages for interacting with Anthropic's models. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: typescript CODE: ``` const { status, body } = await context.api.anthropic.call( "Call Anthropic", { token: "", operation: "messages.create", body: { model: "claude-3-5-sonnet-20241022", max_tokens: 1024, messages: [ {"role": "user", "content": "Hello, world"} ] }, } ); ``` ---------------------------------------- TITLE: Importing Workflow `serve` Method (TypeScript) DESCRIPTION: Imports the `serve` method from the `@upstash/workflow` package, which is the base method for integrating Upstash Workflow with platforms not natively listed. This method allows for custom platform adjustments. SOURCE: https://upstash.com/docs/workflow/quickstarts/platforms.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow"; ``` ---------------------------------------- TITLE: Conditional Email based on Problem Solved Count (TypeScript/Python) DESCRIPTION: This function, `sendProblemSolvedEmail`, sends a conditional email based on the user's `totalProblemsSolved` count. If the user has solved no problems, a specific reminder email is sent; otherwise, an email summarizing their solved problems is sent. Both email sending operations are wrapped in `context.run`. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` async function sendProblemSolvedEmail({ context: WorkflowContext, email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } ``` LANGUAGE: Python CODE: ``` async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) ``` ---------------------------------------- TITLE: Installing Upstash Workflow SDK with npm DESCRIPTION: Instructions to install the new `@upstash/workflow` package using npm. This is the first step in migrating from the old SDK, separating Workflow development from the QStash SDK. SOURCE: https://upstash.com/docs/workflow/migration.mdx LANGUAGE: bash CODE: ``` npm install @upstash/workflow ``` ---------------------------------------- TITLE: Resizing Image to Multiple Resolutions in Upstash Workflow (Python) DESCRIPTION: This Python snippet demonstrates resizing an image to multiple resolutions (640, 1280, 1920) by sequentially calling an external image processing service. It uses `context.call` for each resolution, passing the original image URL and target width, and collects the responses for further processing. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: python CODE: ``` resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] ``` ---------------------------------------- TITLE: Starting Next.js Development Server DESCRIPTION: Command to start the Next.js development server. This makes the defined workflow endpoints accessible locally for testing and development purposes. SOURCE: https://upstash.com/docs/workflow/quickstarts/vercel-nextjs.mdx LANGUAGE: bash CODE: ``` npm run dev ``` ---------------------------------------- TITLE: Cancelling Specific Workflow Runs (TypeScript) DESCRIPTION: This snippet demonstrates how to cancel one or more specific workflow runs using the `client.cancel` method. It shows examples of canceling a single workflow by its ID or multiple workflows by providing an array of their IDs. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: typescript CODE: ``` // cancel a single workflow await client.cancel({ ids: "" }); // cancel a set of workflow runs await client.cancel({ ids: ["", ""] }); ``` ---------------------------------------- TITLE: Sending HTTP POST Request to Upstash Workflow with Requests Library (Python) DESCRIPTION: This snippet shows how to trigger an Upstash workflow by sending a direct HTTP POST request using Python's `requests` library. It includes examples for specifying a JSON body and custom headers. Note that this method will not work for secured workflow endpoints. SOURCE: https://upstash.com/docs/workflow/howto/start.mdx LANGUAGE: Python CODE: ``` import requests requests.post( "https:///", json={"foo": "bar"}, headers={"my-header": "foo"} ) ``` ---------------------------------------- TITLE: Requesting Order Processing in Upstash Workflow DESCRIPTION: This TypeScript snippet illustrates the initial step within an Upstash Workflow where an asynchronous operation, such as requesting order processing, is executed using `context.run`. This step simulates sending an email or initiating an external service for order fulfillment. SOURCE: https://upstash.com/docs/workflow/examples/waitForEvent.mdx LANGUAGE: typescript CODE: ``` await context.run("request order processing", async () => { await requestProcessing(orderId) }) ``` ---------------------------------------- TITLE: Suspending User and Sending Email - Python DESCRIPTION: This Python snippet performs the same logic as its TypeScript counterpart: it verifies if a user is suspended, and if not, it suspends the user and dispatches an email notification regarding payment failure. It utilizes `context.run` with asynchronous helper functions to manage the workflow steps. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: Python CODE: ``` async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` ---------------------------------------- TITLE: Cancelling Workflow Run with cURL DESCRIPTION: Demonstrates how to cancel a workflow run using a cURL command, sending a DELETE request to the QStash API with the workflow run ID and an authorization token. This method is suitable for command-line execution or scripting. SOURCE: https://upstash.com/docs/workflow/rest/runs/cancel.mdx LANGUAGE: sh CODE: ``` curl -XDELETE https://qstash.upstash.io/v2/workflows/runs/wfr_TzazoUCuZmFGp2u9cdy5K \ -H "Authorization: Bearer " ``` ---------------------------------------- TITLE: Fetching Workflow Message Logs with Node.js Fetch API DESCRIPTION: This Node.js snippet uses the `fetch` API to send a GET request to the `/v2/workflows/messageLogs` endpoint. It includes an `Authorization` header with a Bearer token for secure access to the workflow logs. SOURCE: https://upstash.com/docs/workflow/rest/runs/message-logs.mdx LANGUAGE: javascript CODE: ``` const response = await fetch( "https://qstash.upstash.io/v2/workflows/messageLogs", { headers: { Authorization: "Bearer ", }, } ); ``` ---------------------------------------- TITLE: Triggering Upstash Workflow Endpoint via cURL DESCRIPTION: This cURL command sends a POST request to the FastAPI workflow endpoint, initiating a new workflow run. The expected output is a JSON object containing a unique `workflowRunId` for tracking the execution. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: bash CODE: ``` curl -X POST https://localhost:8000/api/workflow # result: {"workflowRunId":"wfr_xxxxxx"} ``` ---------------------------------------- TITLE: Triggering Local Workflow with cURL DESCRIPTION: Sends a POST request to the local Express.js workflow endpoint, triggering a new workflow run. The request includes a JSON payload with a 'message' field, and the response returns a unique workflow run ID. SOURCE: https://upstash.com/docs/workflow/quickstarts/express.mdx LANGUAGE: bash CODE: ``` curl -X POST http://localhost:3000/workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ``` ---------------------------------------- TITLE: Overriding Inferred Workflow URL DESCRIPTION: The `url` option allows explicitly setting the full URL where the workflow endpoint is hosted, overriding the URL inferred from `request.url`. This is particularly useful when using proxies or local tunnels during development, ensuring correct callback routing for workflow steps. SOURCE: https://upstash.com/docs/workflow/basics/serve.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve( async (context) => { ... }, { url: "https://.com/api/workflow" } ); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example", url="https://.com/api/workflow") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Configuring .dev.vars for Local QStash Server DESCRIPTION: Adds the local QStash URL and token to the `.dev.vars` file, allowing local workflow testing without production billing impact. SOURCE: https://upstash.com/docs/workflow/quickstarts/hono.mdx LANGUAGE: txt CODE: ``` QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Configuring .env for Local QStash Server DESCRIPTION: These environment variables configure the application to use a local QStash server. `QSTASH_URL` points to the local server address, and `QSTASH_TOKEN` is the token provided by the local server for authentication. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: bash CODE: ``` export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Starting SolidJS development server DESCRIPTION: This command starts the SolidJS development server, making the application and its defined workflow endpoints accessible locally, typically at `localhost:3000`. SOURCE: https://upstash.com/docs/workflow/quickstarts/solidjs.mdx LANGUAGE: bash CODE: ``` npm run dev ``` ---------------------------------------- TITLE: Configuring .env for local QStash server DESCRIPTION: These environment variables are added to the `.env` file to configure the application to use a locally running QStash server, specifying its URL and token for local workflow execution. SOURCE: https://upstash.com/docs/workflow/quickstarts/solidjs.mdx LANGUAGE: text CODE: ``` QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Configuring Local QStash Environment Variables DESCRIPTION: These environment variables are added to `.env.local` when using the local QStash server. `QSTASH_URL` points to the local server, and `QSTASH_TOKEN` is provided by the CLI for local authentication. SOURCE: https://upstash.com/docs/workflow/quickstarts/svelte.mdx LANGUAGE: txt CODE: ``` QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Configuring .env for Local QStash Server DESCRIPTION: This snippet shows the environment variables to add to the `.env` file when using a local QStash server. It sets the local URL and token, enabling local workflow testing without production impact. SOURCE: https://upstash.com/docs/workflow/quickstarts/astro.mdx LANGUAGE: txt CODE: ``` QSTASH_URL="http://127.0.0.1:8080" QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Configuring Local QStash Environment Variables (.env) DESCRIPTION: These environment variables are added to the '.env' file when using the local QStash server. 'QSTASH_URL' points to the local server address, and 'QSTASH_TOKEN' is the token provided by the local server, enabling local development and testing of workflows. SOURCE: https://upstash.com/docs/workflow/quickstarts/nextjs-flask.mdx LANGUAGE: bash CODE: ``` export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= ``` ---------------------------------------- TITLE: Retrieving Image URL in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet demonstrates how to retrieve the URL of an uploaded image within an Upstash Workflow using `context.run`. It executes an asynchronous function `getImageUrl` to fetch the image URL based on the provided `imageId`, marking this step as a distinct operation within the workflow. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) ``` ---------------------------------------- TITLE: Using LangChain Wikipedia Query Tool DESCRIPTION: Utilizes the pre-built `WikipediaQueryRun` tool from LangChain to enable agents to query Wikipedia, configurable with result limits and content length. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { WikipediaQueryRun } from '@langchain/community/tools/wikipedia_query_run' const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) ``` ---------------------------------------- TITLE: Notifying Workflows with cURL DESCRIPTION: This cURL command demonstrates how to send a POST request to the Upstash QStash notify endpoint. It includes the `eventId` in the path, an `Authorization` header with a bearer token, and 'Hello World!' as the request body. SOURCE: https://upstash.com/docs/workflow/rest/runs/notify.mdx LANGUAGE: sh CODE: ``` curl -X POST https://qstash.upstash.io/v2/notify/myEvent \ -H "Authorization: Bearer " \ -d "Hello World!" ``` ---------------------------------------- TITLE: Sourcing Environment Variables from .env File DESCRIPTION: This command loads environment variables defined in the `.env` file into the current shell session. This is crucial for the application to access configurations like API keys and URLs during local development. SOURCE: https://upstash.com/docs/workflow/quickstarts/fastapi.mdx LANGUAGE: bash CODE: ``` source .env ``` ---------------------------------------- TITLE: Invoking Production Cloudflare Workflow Endpoint with cURL DESCRIPTION: This cURL command sends a POST request to your deployed Cloudflare Worker workflow endpoint in production. Replace `` with your actual deployed URL to verify accessibility and trigger a workflow run. SOURCE: https://upstash.com/docs/workflow/quickstarts/cloudflare-workers.mdx LANGUAGE: bash CODE: ``` curl -X POST https:/// -D '{"text": "hello world!"}' ``` ---------------------------------------- TITLE: Initializing Workflow Context with Upstash Workflow (Python) DESCRIPTION: Illustrates how to set up the workflow context in a FastAPI application using `upstash_workflow.fastapi.Serve`. The `AsyncWorkflowContext` type hint ensures proper context handling for asynchronous operations. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: ... ``` ---------------------------------------- TITLE: Applying Filters to Resized Images in Upstash Workflow (Python) DESCRIPTION: This Python snippet applies various filters (grayscale, sepia, contrast) to each resized image. It iterates through the resized images and available filters, making sequential calls to an external filtering service using `context.call` for each combination, and collects the responses. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: python CODE: ``` filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] ``` ---------------------------------------- TITLE: Handling Successful API Responses in Upstash Workflow DESCRIPTION: Upon a successful API response (status code less than 300), this snippet stores the response body in a database. It utilizes `context.run` to execute the storage operation as a separate, reliable workflow task, ensuring data persistence. SOURCE: https://upstash.com/docs/workflow/examples/customRetry.mdx LANGUAGE: typescript CODE: ``` if (response.status < 300) { await context.run("store-response-in-db", () => storeResponse(response.body)) return } ``` LANGUAGE: python CODE: ``` if response.status_code < 300: async def _store_response_in_db() -> None: await store_response(response.body) await context.run("store-response-in-db", _store_response_in_db) return ``` ---------------------------------------- TITLE: Defining OpenAI Model in Workflow Route DESCRIPTION: Defines an OpenAI model (`gpt-3.5-turbo`) within an Upstash Workflow route, serving as the core LLM for agent decisions and interactions. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: ts CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') // ... }) ``` ---------------------------------------- TITLE: Retrieving Workflow Logs using Upstash Workflow JS SDK DESCRIPTION: This JavaScript snippet utilizes the Upstash Workflow SDK to fetch execution logs. It illustrates filtering options by workflow run ID, URL, and state, requiring an API token for client initialization and secure access. SOURCE: https://upstash.com/docs/workflow/rest/runs/logs.mdx LANGUAGE: js CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // Filter by workflow run ID const { runs } = await client.logs({ workflowRunId: ""}); // Filter by workflow server url const { runs } = await client.logs({ workflowUrl: ""}); // Filter by state const { runs } = await client.logs({ state: "RUN_SUCCESS"}); ``` ---------------------------------------- TITLE: Bulk Canceling Workflows with Curl DESCRIPTION: This curl command demonstrates how to send a DELETE request to the Upstash QStash API to bulk cancel workflow runs. It sets the 'Content-Type' and 'Authorization' headers and includes a JSON body to cancel workflows with URLs starting with 'https://example.com'. SOURCE: https://upstash.com/docs/workflow/rest/runs/bulk-cancel.mdx LANGUAGE: bash CODE: ``` curl -XDELETE https://qstash.upstash.io/v2/workflows/runs \ -H "Content-Type: application/json" \ -H "Authorization: Bearer " \ -d '{"workflowUrl": "https://example.com"}' ```