TITLE: Implementing Image Processing Workflow with Upstash Workflow in TypeScript DESCRIPTION: This comprehensive TypeScript example defines an Upstash Workflow that orchestrates image processing. It handles retrieving an image, resizing it to multiple resolutions, applying various filters, and finally storing the processed images in cloud storage. It utilizes `context.run` for internal operations and `context.call` for interacting with external image processing services, demonstrating parallel execution with `Promise.all`. SOURCE: https://upstash.com/docs/workflow/examples/imageProcessing.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ``` ---------------------------------------- TITLE: Upstash Workflow for Payment Retries and Account Management DESCRIPTION: This comprehensive Upstash Workflow orchestrates a payment retry process. It attempts to charge a user, retries up to three times with a 24-hour delay if payment fails, and manages user account suspension and invoice emails based on the payment outcome. It integrates with Upstash Workflow's `serve` and `context` functionalities to ensure durable execution and state management. SOURCE: https://upstash.com/docs/workflow/examples/paymentRetry.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; type ChargeUserPayload = { email: string; }; export const { POST } = serve(async (context) => { const { email } = context.requestPayload; for (let i = 0; i < 3; i++) { // attempt to charge the user const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Unsuspend User const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } // send invoice email await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); // by retuning, we end the workflow run return; } } // suspend user if the user isn't suspended const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } }); async function sendEmail(email: string, content: string) { // Implement the logic to send an email console.log("Sending email to", email, "with content:", content); } async function checkSuspension(email: string) { // Implement the logic to check if the user is suspended console.log("Checking suspension status for", email); return true; } async function suspendUser(email: string) { // Implement the logic to suspend the user console.log("Suspending the user", email); } async function unsuspendUser(email: string) { // Implement the logic to unsuspend the user console.log("Unsuspending the user", email); } async function chargeCustomer(attempt: number) { // Implement the logic to charge the customer console.log("Charging the customer"); if (attempt <= 2) { throw new Error("Payment failed"); } return { invoiceId: "INV123", totalCost: 100, } as const; } ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import TypedDict, Optional from dataclasses import dataclass from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class ChargeResult: invoice_id: str total_cost: float class ChargeUserPayload(TypedDict): email: str async def send_email(email: str, content: str) -> None: # Implement the logic to send an email print("Sending email to", email, "with content:", content) async def check_suspension(email: str) -> bool: # Implement the logic to check if the user is suspended print("Checking suspension status for", email) return True async def suspend_user(email: str) -> None: # Implement the logic to suspend the user print("Suspending the user", email) async def unsuspend_user(email: str) -> None: # Implement the logic to unsuspend the user print("Unsuspending the user", email) async def charge_customer(attempt: int) -> Optional[ChargeResult]: # Implement the logic to charge the customer print("Charging the customer") if attempt <= 2: raise Exception("Payment failed") return ChargeResult(invoice_id="INV123", total_cost=100) @serve.post("/payment-retries") async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None: email = context.request_payload["email"] async def _check_suspension() -> bool: return await check_suspension(email) for i in range(3): # attempt to charge the user async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Unsuspend User is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) # send invoice email async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) # by returning, we end the workflow run return # suspend user if the user isn't suspended is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ``` ---------------------------------------- TITLE: Complete AI Data Processing Workflow with Upstash (Python) DESCRIPTION: This Python snippet implements an Upstash Workflow using FastAPI to manage an AI data processing pipeline. It includes steps for downloading datasets, processing data in chunks with OpenAI's GPT-4, aggregating results periodically, and generating/sending a final report. It utilizes `context.run` for idempotent operations and `context.call` for making long-timeout HTTP requests. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: python CODE: ``` from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( openai_response.body["choices"][0]["message"]["content"] ) # Every 10 chunks, we'll aggregate intermediate results if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) # Step 3: Generate and send data report async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ``` ---------------------------------------- TITLE: Executing Business Logic in context.run - TypeScript DESCRIPTION: Demonstrates placing business logic inside `context.run` for each step in an Upstash Workflow. Code outside `context.run` runs multiple times, while code inside runs once per step. Shows how to pass results between steps. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) ``` ---------------------------------------- TITLE: Implementing Auth Provider Webhook Workflow with Upstash DESCRIPTION: This comprehensive workflow processes webhook events from an authentication provider to manage user accounts. It orchestrates user creation in a database and Stripe, initiates a 14-day trial, and sends a series of automated emails for welcome, activity reminders, and trial status updates, including warnings and end notifications. The workflow leverages `context.run` for atomic operations and `context.sleep` for timed delays. SOURCE: https://upstash.com/docs/workflow/examples/authWebhook.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' /** * This can be the payload of the user created webhook event coming from your * auth provider (e.g. Firebase, Auth0, Clerk etc.) */ type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); // get user stats and send email with them const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); // wait until there are two days to the end of trial period // and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); // end the workflow if upgraded if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); async function sendProblemSolvedEmail({ context: WorkflowContext, email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } async function createUserInDatabase({ name, email, }: { name: string; email: string; }) { console.log("Creating a user in the database:", name, email); return { userid: "12345" }; } async function createNewUserInStripe(email: string) { // Implement logic to create a new user in Stripe console.log("Creating a user in Stripe for", email); } async function startTrialInStripe(email: string) { // Implement logic to start a trial in Stripe console.log("Starting a trial of 14 days in Stripe for", email); } async function getUserStats(userid: string) { // Implement logic to get user stats console.log("Getting user stats for", userid); return { totalProblemsSolved: 10_000, mostInterestedTopic: "JavaScript", }; } async function checkUpgradedPlan(email: string) { // Implement logic to check if the user has upgraded the plan console.log("Checking if the user has upgraded the plan", email); return false; } async function sendEmail(email: string, content: string) { // Implement logic to send an email console.log("Sending email to", email, content); } ``` LANGUAGE: Python CODE: ``` from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str async def create_user_in_database(name: str, email: str) -> Dict[str, str]: print("Creating a user in the database:", name, email) return {"userid": "12345"} async def create_new_user_in_stripe(email: str) -> None: # Implement logic to create a new user in Stripe print("Creating a user in Stripe for", email) async def start_trial_in_stripe(email: str) -> None: # Implement logic to start a trial in Stripe print("Starting a trial of 14 days in Stripe for", email) async def get_user_stats(userid: str) -> UserStats: # Implement logic to get user stats print("Getting user stats for", userid) return {"total_problems_solved": 10000, "most_interested_topic": "Python"} async def check_upgraded_plan(email: str) -> bool: # Implement logic to check if the user has upgraded the plan print("Checking if the user has upgraded the plan", email) return False async def send_email(email: str, content: str) -> None: # Implement logic to send an email print("Sending email to", email, content) async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) # get user stats and send email with them async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) # wait until there are two days to the end of trial period and check upgrade status await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ``` ---------------------------------------- TITLE: Scheduling Weekly User Summaries on Sign-Up (TypeScript) DESCRIPTION: This TypeScript snippet defines a Next.js API route for user sign-up. It registers a user, calculates a future date (7 days from sign-up), generates a CRON expression, and schedules a weekly account summary using `@upstash/qstash` client, ensuring each user receives their first report after 7 days. SOURCE: https://upstash.com/docs/workflow/howto/schedule.mdx LANGUAGE: typescript CODE: ``` import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Simulate user registration const user = await signUp(userData); // Calculate the date for the first summary (7 days from now) const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // Create cron expression for weekly summaries starting 7 days from signup const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ``` ---------------------------------------- TITLE: Chaining LLM Agents for Sequential Tasks in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript code demonstrates how to chain multiple Large Language Model (LLM) agents within an Upstash Workflow to perform a sequence of tasks. It initializes three distinct OpenAI `gpt-3.5-turbo` agents, each with a specific role: listing physicists, describing their work using a Wikipedia tool, and summarizing the descriptions. The output of one agent's task execution is used as the input for the subsequent agent, illustrating a prompt chaining pattern. SOURCE: https://upstash.com/docs/workflow/agents/patterns/prompt-chaining.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const agent1 = context.agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = context.agents.agent({ model, name: 'secondAgent', // set to 2 as this agent will first request tools // and then summarize them: maxSteps: 2, background: 'You are an agent that describes the work of' + ' the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = context.agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the ' + 'works of the physicists mentioned previously.', tools: {} }); // Chaining agents const firstOutput = await context.agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await context.agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await context.agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ``` ---------------------------------------- TITLE: Customer Onboarding Workflow Initialization - TypeScript/Python DESCRIPTION: This comprehensive snippet defines the entire customer onboarding workflow. It initializes the process by registering a new user and sending a welcome email, then pauses for three days. Subsequently, it enters an infinite loop to periodically check the user's activity state (active/non-active) and send appropriate follow-up emails, pausing for a month between checks. It uses `context.run` for atomic operations and `context.sleep` for non-blocking delays. SOURCE: https://upstash.com/docs/workflow/examples/customerOnboarding.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ``` ---------------------------------------- TITLE: Performing HTTP Calls with context.call - Upstash Workflow - TypeScript DESCRIPTION: This snippet demonstrates how to use `context.call` within an Upstash Workflow to make an HTTP POST request to the OpenAI API. It highlights the ability to handle long response times (up to 15 minutes or 2 hours depending on plan) and shows how to pass request body, headers, and retrieve response status, headers, and body. This method is suitable for operations that might cause a normal serverless function to timeout. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { status, // response status headers, // response headers body, // response body } = await context.call( "generate-long-essay", // Step name { url: "https://api.openai.com/v1/chat/completions", // Endpoint URL method: "POST", body: { // Request body model: "gpt-4o", messages: [ { role: "system", content: "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, { role: "user", content: request.topic }, ], }, headers: { // request headers authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); }); ``` ---------------------------------------- TITLE: Triggering Workflow Runs with Upstash Workflow Client (TypeScript) DESCRIPTION: This snippet demonstrates how to initiate a new workflow run using the `client.trigger` method. It shows how to specify the workflow URL, an optional request body, headers, a custom workflow run ID, retry attempts for the initial request, a delay, and flow control parameters for rate limiting and parallelism. The method returns the `workflowRunId` of the newly started run. SOURCE: https://upstash.com/docs/workflow/basics/client.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3, // optional retries in the initial request delay: "10s", // optional delay value flowControl: { // optional flow control key: "USER_GIVEN_KEY", rate: 10, parallelism: 5, period: "10m" }, }) console.log(workflowRunId) // prints wfr_my-workflow ``` ---------------------------------------- TITLE: Defining an Upstash Workflow Route with Next.js SDK DESCRIPTION: This snippet demonstrates how to define an Upstash Workflow route using the `@upstash/workflow/nextjs` SDK. It shows how to use `context.sleep` for a delay and `context.run` to execute asynchronous operations like `retrieveEmail` and `fetchFromLLm` in parallel using `Promise.all`. It processes an input payload of type `UserRequest`. SOURCE: https://upstash.com/docs/workflow/howto/monitor.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ``` ---------------------------------------- TITLE: Implementing E-commerce Order Fulfillment Workflow DESCRIPTION: This comprehensive code example demonstrates the full e-commerce order fulfillment workflow using Upstash Workflow. It orchestrates multiple steps including order ID creation, stock verification, payment processing, order dispatch, and customer notifications, ensuring a complete and automated process. It relies on various utility functions for each step. SOURCE: https://upstash.com/docs/workflow/examples/eCommerceOrderFulfillment.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ``` LANGUAGE: python CODE: ``` from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ``` ---------------------------------------- TITLE: Complete AI Data Processing Workflow with Upstash (TypeScript) DESCRIPTION: This TypeScript snippet defines an Upstash Workflow that orchestrates a multi-step AI data processing pipeline. It handles downloading large datasets, processing them in chunks using OpenAI's GPT-4, aggregating intermediate results, and finally generating and sending a report. It leverages `context.run` for idempotent operations and `context.call` for long-running HTTP requests. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }>( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ``` ---------------------------------------- TITLE: Processing Data Chunks with OpenAI in Upstash Workflow (TypeScript) DESCRIPTION: This TypeScript snippet illustrates how to iterate through data chunks and process each one using OpenAI's GPT-4 model within an Upstash Workflow. It utilizes `context.api.openai.call` to interact with the OpenAI API, sending a system and user message for analysis and specifying `max_completion_tokens`. SOURCE: https://upstash.com/docs/workflow/examples/allInOne.mdx LANGUAGE: typescript CODE: ``` for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) } ``` ---------------------------------------- TITLE: Implementing Customer Onboarding Workflow with Upstash Workflow in TypeScript DESCRIPTION: This TypeScript snippet demonstrates a multi-step customer onboarding workflow using Upstash Workflow. It includes sending an initial welcome email, pausing execution for three days using `context.sleep`, generating a personalized follow-up message via OpenAI's API, and finally sending that message as a follow-up email. The `context.run` function ensures individual steps are retried on failure, and `context.api.openai.call` integrates with external services. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { sendEmail } from "./emailUtils"; // Type-safety for starting our workflow interface InitialData { userId: string email: string name: string } export const { POST } = serve(async (context) => { const { userId, email, name } = context.requestPayload; // Step 1: Send welcome email await context.run("send-welcome-email", async () => { await sendEmail(email, "Welcome to our service!"); }); // Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3); // Step 3: AI-generate personalized follow-up message const { body: aiResponse } = await context.api.openai.call( "generate-personalized-message", { token: "", operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [ { role: "system", content: "You are an assistant creating personalized follow-up messages." }, { role: "user", content: `Create a short, friendly follow-up message for ${name} who joined our service 3 days ago.` } ] } } ); const personalizedMessage = aiResponse.choices[0].message.content; // Step 4: Send personalized follow-up email await context.run("send-follow-up-email", async () => { await sendEmail(email, personalizedMessage); }); }); ``` ---------------------------------------- TITLE: Implementing a Single Agent with Wikipedia Tool in TypeScript DESCRIPTION: This snippet demonstrates how to set up a single AI agent using Upstash Workflow. The `researcherAgent` is configured with an OpenAI model and a `WikipediaQueryRun` tool, enabling it to query Wikipedia for information. The agent is then assigned a task to research advanced physics topics, and its output is logged to the console. SOURCE: https://upstash.com/docs/workflow/agents/features.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const task = context.agents.task({ agent: researcherAgent, prompt: "Tell me about 5 topics in advanced physics.", }); const { text } = await task.run(); console.log("result:", text) }) ``` ---------------------------------------- TITLE: Ensuring Idempotency in context.run - TypeScript DESCRIPTION: Emphasizes that business logic within `context.run` must be idempotent. This means that executing the `someWork` function multiple times with the same input should produce the same result as executing it once, crucial for reliability in distributed systems with retries. SOURCE: https://upstash.com/docs/workflow/basics/caveats.mdx LANGUAGE: typescript CODE: ``` export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { return someWork(input) }) }) ``` ---------------------------------------- TITLE: Triggering Upstash Workflow with TypeScript SDK DESCRIPTION: This TypeScript snippet demonstrates how to programmatically trigger an Upstash Workflow endpoint using the `@upstash/workflow` SDK. It shows how to initialize the client with a QStash token and then use the `client.trigger` method to send a request, specifying the workflow URL, optional body, headers, a custom workflow run ID, and retry attempts for the initial request. SOURCE: https://upstash.com/docs/workflow/getstarted.mdx LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ``` ---------------------------------------- TITLE: Creating Custom OpenAI Client with Upstash Workflow in TypeScript DESCRIPTION: This function `createWorkflowOpenAI` customizes the OpenAI client's fetch implementation to use `context.call` from Upstash Workflow. This ensures that HTTP requests are handled by the workflow, providing durability and extended timeouts for LLM interactions. It can be generalized for other LLM SDKs. SOURCE: https://upstash.com/docs/workflow/integrations/aisdk.mdx LANGUAGE: typescript CODE: ``` import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, }); }; ``` ---------------------------------------- TITLE: Notifying Waiting Workflows with context.notify - Upstash Workflow - JavaScript DESCRIPTION: This snippet demonstrates how `context.notify` is used to send a payload to workflows that are currently waiting for a specific event ID. It shows how to define a step name, the event ID, and the payload to be sent, returning a list of `NotifyResponse` objects indicating the result of the notification for each waiting workflow. SOURCE: https://upstash.com/docs/workflow/basics/context.mdx LANGUAGE: javascript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload; const { notifyResponse, // result of notify, which is a list of notified waiters } = await context.notify("notify step", "my-event-Id", payload); }); ``` ---------------------------------------- TITLE: Performing Subsequent Operations After Webhook Event Processing DESCRIPTION: Following initial webhook event processing, this example demonstrates how to perform additional operations, such as creating a customer record in Stripe, using `context.run`. This ensures that each external API call or significant step is trackable and retryable within the Upstash Workflow, leveraging previously extracted user data. SOURCE: https://upstash.com/docs/workflow/howto/use-webhooks.mdx LANGUAGE: TypeScript CODE: ``` export const { POST } = serve(async (context) => { // ... Previous validation and user data extraction if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); ``` LANGUAGE: Python CODE: ``` @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: # ... Previous validation and user data extraction if not user: return async def _create_stripe_customer(): return await stripe.customers.create( email=user["email"], name=f"{user['first_name']} {user['last_name']}", metadata={"user_id": user["user_id"]}, ) customer = await context.run("create-stripe-customer", _create_stripe_customer) # ... Additional steps ``` ---------------------------------------- TITLE: Orchestrating Worker Agents with Upstash Workflow in TypeScript DESCRIPTION: This snippet demonstrates how to set up an Upstash Workflow orchestrator to manage multiple worker agents. It initializes a Wikipedia tool, defines three specialized worker agents for different physics topics (advanced physics, quantum mechanics, relativity), and then uses a task agent to synthesize their responses into a Q&A format. It showcases the use of @upstash/workflow/nextjs for serving the workflow and @langchain/community/tools/wikipedia_query_run for tool integration. SOURCE: https://upstash.com/docs/workflow/agents/patterns/orchestrator-workers.mdx LANGUAGE: TypeScript CODE: ``` import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); // Worker agents const worker1 = context.agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = context.agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = context.agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = context.agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ``` ---------------------------------------- TITLE: Setting Up Python Virtual Environment DESCRIPTION: These commands create a new Python virtual environment named 'venv' and then activate it. This practice isolates project dependencies, preventing conflicts with other Python projects on your system. SOURCE: https://upstash.com/docs/workflow/quickstarts/flask.mdx LANGUAGE: Bash CODE: ``` python -m venv venv source venv/bin/activate ```