TITLE: Implementing Evaluator-Optimizer Pattern with Upstash Workflow and OpenAI DESCRIPTION: This code demonstrates how to implement an evaluator-optimizer pattern using Upstash Workflow with OpenAI models. It creates two agents - a generator that produces content based on a prompt, and an evaluator that assesses the quality of the content. The system runs through multiple iterations of generation and evaluation until the output meets the evaluation criteria or reaches the maximum number of attempts. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/evaluator-optimizer.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Generator agent that generates content const generator = context.agents.agent({ model, name: 'generator', maxSteps: 1, background: 'You are an agent that generates text based on a prompt.', tools: {} }); // Evaluator agent that evaluates the text and gives corrections const evaluator = context.agents.agent({ model, name: 'evaluator', maxSteps: 1, background: 'You are an agent that evaluates the generated text and provides corrections if needed.', tools: {} }); let generatedText = ''; let evaluationResult = ''; const prompt = "Generate a short explanation of quantum mechanics."; let nextPrompt = prompt; for (let i = 0; i < 3; i++) { // Construct prompt for generator: // - If there's no evaluation, use the original prompt // - If there's an evaluation, provide the prompt, the last generated text, and the evaluator's feedback if (evaluationResult && evaluationResult !== "PASS") { nextPrompt = `Please revise the answer to the question "${prompt}". Previous answer was: "${generatedText}", which received this feedback: "${evaluationResult}".`; } // Generate content const generatedResponse = await context.agents.task({ agent: generator, prompt: nextPrompt }).run(); generatedText = generatedResponse.text // Evaluate the generated content const evaluationResponse = await context.agents.task({ agent: evaluator, prompt: `Evaluate and provide feedback for the following text: ${generatedText}` }).run(); evaluationResult = evaluationResponse.text // If the evaluator accepts the content (i.e., "PASS"), stop if (evaluationResult.includes("PASS")) { break; } } console.log(generatedText); }); ---------------------------------------- TITLE: Implementing Customer Onboarding Workflow with Next.js DESCRIPTION: This TypeScript code demonstrates how to implement a customer onboarding workflow using Upstash Workflow in a Next.js application. The workflow includes sending a welcome email, waiting for 3 days, generating a personalized follow-up message using OpenAI, and sending the follow-up email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { sendEmail } from "./emailUtils"; // Type-safety for starting our workflow interface InitialData { userId: string email: string name: string } export const { POST } = serve(async (context) => { const { userId, email, name } = context.requestPayload; // Step 1: Send welcome email await context.run("send-welcome-email", async () => { await sendEmail(email, "Welcome to our service!"); }); // Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3); // Step 3: AI-generate personalized follow-up message const { body: aiResponse } = await context.api.openai.call( "generate-personalized-message", { token: "", operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [ { role: "system", content: "You are an assistant creating personalized follow-up messages." }, { role: "user", content: `Create a short, friendly follow-up message for ${name} who joined our service 3 days ago.` } ] }, } ); const personalizedMessage = aiResponse.choices[0].message.content; // Step 4: Send personalized follow-up email await context.run("send-follow-up-email", async () => { await sendEmail(email, personalizedMessage); }); }); ---------------------------------------- TITLE: Implementing Payment Retry Workflow using TypeScript and Next.js DESCRIPTION: A complete implementation of a payment retry workflow using Upstash Workflow with Next.js. The workflow attempts to charge a customer up to 3 times with a 24-hour delay between retries, handles suspension status, and sends appropriate emails based on the payment outcome. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; type ChargeUserPayload = { email: string; }; export const { POST } = serve(async (context) => { const { email } = context.requestPayload; for (let i = 0; i < 3; i++) { // attempt to charge the user const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Unsuspend User const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } // send invoice email await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); // by retuning, we end the workflow run return; } } // suspend user if the user isn't suspended const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } }); async function sendEmail(email: string, content: string) { // Implement the logic to send an email console.log("Sending email to", email, "with content:", content); } async function checkSuspension(email: string) { // Implement the logic to check if the user is suspended console.log("Checking suspension status for", email); return true; } async function suspendUser(email: string) { // Implement the logic to suspend the user console.log("Suspending the user", email); } async function unsuspendUser(email: string) { // Implement the logic to unsuspend the user console.log("Unsuspending the user", email); } async function chargeCustomer(attempt: number) { // Implement the logic to charge the customer console.log("Charging the customer"); if (attempt <= 2) { throw new Error("Payment failed"); } return { invoiceId: "INV123", totalCost: 100, } as const; } ---------------------------------------- TITLE: Implementing AI Data Processing Workflow in TypeScript DESCRIPTION: This code snippet demonstrates how to implement an AI-powered data processing workflow using Upstash Workflow and OpenAI's GPT-4 model in TypeScript. It includes steps for downloading a dataset, processing it in chunks, aggregating results, and generating a report. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" import { downloadData, aggregateResults, generateReport, sendReport, getDatasetUrl, splitIntoChunks, } from "./utils" type OpenAiResponse = { choices: { message: { role: string, content: string } }[] } export const { POST } = serve<{ datasetId: string; userId: string }>( async (context) => { const request = context.requestPayload // Step 1: Download the dataset const datasetUrl = await context.run("get-dataset-url", async () => { return await getDatasetUrl(request.datasetId) }) // HTTP request with much longer timeout (2hrs) const { body: dataset } = await context.call("download-dataset", { url: datasetUrl, method: "GET" }) // Step 2: Process data in chunks using OpenAI const chunkSize = 1000 const chunks = splitIntoChunks(dataset, chunkSize) const processedChunks: string[] = [] for (let i = 0; i < chunks.length; i++) { const { body: processedChunk } = await context.api.openai.call( `process-chunk-${i}`, { token: process.env.OPENAI_API_KEY, operation: "chat.completions.create", body: { model: "gpt-4", messages: [ { role: "system", content: "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { role: "user", content: `Analyze this data chunk: ${JSON.stringify(chunks[i])}`, }, ], max_completion_tokens: 150, }, } ) processedChunks.push(processedChunk.choices[0].message.content!) // Every 10 chunks, we'll aggregate intermediate results if (i % 10 === 9 || i === chunks.length - 1) { await context.run(`aggregate-results${i}`, async () => { await aggregateResults(processedChunks) processedChunks.length = 0 }) } } // Step 3: Generate and send data report const report = await context.run("generate-report", async () => { return await generateReport(request.datasetId) }) await context.run("send-report", async () => { await sendReport(report, request.userId) }) } ) ---------------------------------------- TITLE: Implementing Auth Provider Webhook Handler in TypeScript DESCRIPTION: Main TypeScript implementation of the authentication webhook handler using Upstash Workflow. Handles user creation, Stripe integration, trial management, and automated email notifications through a series of coordinated steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WorkflowContext } from '@upstash/qstash/workflow' type UserCreatedPayload = { name: string; email: string; }; export const { POST } = serve(async (context) => { const { name, email } = context.requestPayload; const { userid } = await context.run("sync user", async () => { return await createUserInDatabase({ name, email }); }); await context.run("create new user in stripe", async () => { await createNewUserInStripe(email); }); await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); await context.sleep("wait", 7 * 24 * 60 * 60); const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); }); ---------------------------------------- TITLE: Handling WorkflowAbort in Try/Catch Blocks DESCRIPTION: Demonstrates how to properly handle WorkflowAbort errors when running workflow steps inside try/catch blocks. Shows the correct pattern of re-throwing WorkflowAbort while handling other errors separately. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { WorkflowAbort } from '@upstash/workflow'; try { await context.run( ... ); } catch (error) { if (error instanceof WorkflowAbort) { throw error; } else { // handle other errors } } LANGUAGE: python CODE: from upstash_workflow import WorkflowAbort try: await context.run( ... ) except Exception as e: if isinstance(e, WorkflowAbort): raise e else: # handle other errors ---------------------------------------- TITLE: Implementing Onboarding Workflow with Type-Safety in Python DESCRIPTION: This snippet demonstrates how to create a typed onboarding workflow using Upstash Workflow. It includes steps for sending a welcome email, waiting for 3 days, generating a personalized follow-up message using AI, and sending a follow-up email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#2025-04-08_snippet_2 LANGUAGE: python CODE: # Type-safety for starting our workflow class InitialData(TypedDict): user_id: str email: str name: str @serve.post("/api/onboarding") async def onboarding_workflow(context: AsyncWorkflowContext[InitialData]) -> None: data = context.request_payload user_id = data["user_id"] email = data["email"] name = data["name"] # Step 1: Send welcome email async def _send_welcome_email() -> None: await send_email(email, "Welcome to our service!") await context.run("send-welcome-email", _send_welcome_email) # Step 2: Wait for 3 days (in seconds) await context.sleep("sleep-until-follow-up", 60 * 60 * 24 * 3) # Step 3: AI-generate personalized follow-up message ai_response: CallResponse[Dict[str, str]] = await context.call( "generate-personalized-message", url="https://api.openai.com/v1/chat/completions", method="POST", headers={...}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "system", "content": "You are an assistant creating personalized follow-up messages.", }, { "role": "user", "content": f"Create a short, friendly follow-up message for {name} who joined our service 3 days ago.", }, ], }, ) personalized_message = ai_response.body["choices"][0]["message"]["content"] # Step 4: Send personalized follow-up email async def _send_follow_up_email() -> None: await send_email(email, personalized_message) await context.run("send-follow-up-email", _send_follow_up_email) ---------------------------------------- TITLE: Implementing Payment Retry Workflow using Python and FastAPI DESCRIPTION: A complete implementation of a payment retry workflow using Upstash Workflow with FastAPI. The workflow attempts to charge a customer up to 3 times with a 24-hour delay between retries, handles suspension status, and sends appropriate emails based on the payment outcome. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import TypedDict, Optional from dataclasses import dataclass from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class ChargeResult: invoice_id: str total_cost: float class ChargeUserPayload(TypedDict): email: str async def send_email(email: str, content: str) -> None: # Implement the logic to send an email print("Sending email to", email, "with content:", content) async def check_suspension(email: str) -> bool: # Implement the logic to check if the user is suspended print("Checking suspension status for", email) return True async def suspend_user(email: str) -> None: # Implement the logic to suspend the user print("Suspending the user", email) async def unsuspend_user(email: str) -> None: # Implement the logic to unsuspend the user print("Unsuspending the user", email) async def charge_customer(attempt: int) -> Optional[ChargeResult]: # Implement the logic to charge the customer print("Charging the customer") if attempt <= 2: raise Exception("Payment failed") return ChargeResult(invoice_id="INV123", total_cost=100) @serve.post("/payment-retries") async def payment_retries(context: AsyncWorkflowContext[ChargeUserPayload]) -> None: email = context.request_payload["email"] async def _check_suspension() -> bool: return await check_suspension(email) for i in range(3): # attempt to charge the user async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Unsuspend User is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) # send invoice email async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) # by returning, we end the workflow run return # suspend user if the user isn't suspended is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ---------------------------------------- TITLE: Implementing Agent Chain for Physics Information Using Upstash Workflow DESCRIPTION: This code demonstrates the implementation of three chained agents using Upstash Workflow and OpenAI. The first agent lists physicists, the second agent describes their work using Wikipedia data, and the third agent summarizes the information. The workflow uses the WikipediaQueryRun tool from LangChain for retrieving information. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/prompt-chaining.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const agent1 = context.agents.agent({ model, name: 'firstAgent', maxSteps: 1, background: 'You are an agent that lists famous physicists.', tools: {} }); const agent2 = context.agents.agent({ model, name: 'secondAgent', // set to 2 as this agent will first request tools // and then summarize them: maxSteps: 2, background: 'You are an agent that describes the work of' + ' the physicists listed in the previous prompt.', tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) } }); const agent3 = context.agents.agent({ model, name: 'thirdAgent', maxSteps: 1, background: 'You are an agent that summarizes the ' + 'works of the physicists mentioned previously.', tools: {} }); // Chaining agents const firstOutput = await context.agents.task({ agent: agent1, prompt: "List 3 famous physicists." }).run(); const secondOutput = await context.agents.task({ agent: agent2, prompt: `Describe the work of: ${firstOutput.text}` }).run(); const { text } = await context.agents.task({ agent: agent3, prompt: `Summarize: ${secondOutput.text}` }).run(); console.log(text); }); ---------------------------------------- TITLE: Implementing Multi-Agent Task with Upstash Workflow in TypeScript DESCRIPTION: This code sets up a multi-agent system using Upstash Workflow. It creates a researcher agent with Wikipedia access and a mathematician agent with calculation capabilities. The agents collaborate to provide information about Japanese cities and calculate their total population. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_12 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; import * as mathjs from 'mathjs' import { tool } from "ai"; import { z } from "zod"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const mathAgent = context.agents.agent({ model, name: "mathematician", maxSteps: 2, tools: { calculate: tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'." + "only call this tool if you need to calculate a mathematical expression." + "when writing an expression, don't use words like 'thousand' or 'million'", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }), }, background: "You are a mathematician agent which can utilize" + "a calculator to compute expressions" }) const task = context.agents.task({ model, maxSteps: 3, agents: [researcherAgent, mathAgent], prompt: "Tell me about 3 cities in Japan and calculate the sum of their populations", }); const { text } = await task.run(); console.log("result:", text) }) ---------------------------------------- TITLE: Implementing Customer Onboarding Workflow in Python with FastAPI DESCRIPTION: This code snippet showcases a customer onboarding workflow using Upstash Workflow in Python for a FastAPI application. It handles user registration, sends welcome emails, and periodically checks user activity to send appropriate follow-up emails. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import Literal, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) UserState = Literal["non-active", "active"] class InitialData(TypedDict): email: str async def send_email(message: str, email: str) -> None: # Implement email sending logic here print(f"Sending {message} email to {email}") async def get_user_state() -> UserState: # Implement user state logic here return "non-active" @serve.post("/customer-onboarding") async def customer_onboarding(context: AsyncWorkflowContext[InitialData]) -> None: email = context.request_payload["email"] async def _new_signup() -> None: await send_email("Welcome to the platform", email) await context.run("new-signup", _new_signup) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while True: async def _check_user_state() -> UserState: return await get_user_state() state: UserState = await context.run("check-user-state", _check_user_state) if state == "non-active": async def _send_email_non_active() -> None: await send_email("Email to non-active users", email) await context.run("send-email-non-active", _send_email_non_active) else: async def _send_email_active() -> None: await send_email("Send newsletter to active users", email) await context.run("send-email-active", _send_email_active) await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) ---------------------------------------- TITLE: Complete Order Fulfillment Workflow Implementation in TypeScript DESCRIPTION: Full implementation of the order fulfillment workflow using Upstash Workflow with Next.js. Handles order processing from creation to notification in a step-by-step workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" import { createOrderId, checkStockAvailability, processPayment, dispatchOrder, sendOrderConfirmation, sendDispatchNotification, } from "./utils" type OrderPayload = { userId: string items: { productId: string, quantity: number }[] } export const { POST } = serve(async (context) => { const { userId, items } = context.requestPayload; // Step 1: Create Order Id const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); // Step 2: Verify stock availability const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items); }); if (!stockAvailable) { console.warn("Some items are out of stock"); return; }; // Step 3: Process payment await context.run("process-payment", async () => { return await processPayment(orderId) }) // Step 4: Dispatch the order await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) // Step 5: Send order confirmation email await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) // Step 6: Send dispatch notification await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) }) ---------------------------------------- TITLE: Configuring Flow Control in Workflow Serve Method DESCRIPTION: This snippet demonstrates how to set up flow control in the 'serve' method of an Upstash Workflow. It limits the parallelism to 3 and the rate to 10 calls per second for all steps in the workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#2025-04-08_snippet_0 LANGUAGE: javascript CODE: export const { POST } = serve( async (context) => { await context.run("step-1", async () => { return someWork(); }); }, { flowControl: { key: "app1", parallelism: 3, ratePerSecond: 10 } } ); ---------------------------------------- TITLE: Implementing Image Processing Workflow in TypeScript DESCRIPTION: This code snippet demonstrates how to implement an image processing workflow using Upstash Workflow in TypeScript. It includes steps for retrieving an image, resizing it to multiple resolutions, applying filters, and storing the processed images. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" import { resizeImage, applyFilters, storeImage, getImageUrl, } from "./utils" type ImageResult = { imageUrl: string } export const { POST } = serve<{ imageId: string; userId: string }>( async (context) => { const { imageId, userId } = context.requestPayload // Step 1: Retrieve the uploaded image const imageUrl = await context.run("get-image-url", async () => { return await getImageUrl(imageId) }) // Step 2: Resize the image to multiple resolutions const resolutions = [640, 1280, 1920] const resizedImages: { body: ImageResult }[] = await Promise.all(resolutions.map( resolution => context.call( `resize-image-${resolution}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/resize", method: "POST", body: { imageUrl, width: resolution, } } ) )) // Step 3: Apply filters to each resized image const filters = ["grayscale", "sepia", "contrast"] const processedImagePromises: Promise[] = [] for (const resizedImage of resizedImages) { for (const filter of filters) { const processedImagePromise = context.call( `apply-filter-${filter}`, { // endpoint which returns ImageResult type in response url: "https://image-processing-service.com/filter", method: "POST", body: { imageUrl: resizedImage.body.imageUrl, filter, } } ) processedImagePromises.push(processedImagePromise) } } const processedImages: { body: ImageResult }[] = await Promise.all(processedImagePromises) // Step 4: Store processed images in cloud storage const storedImageUrls: string[] = await Promise.all( processedImages.map( processedImage => context.run(`store-image`, async () => { return await storeImage(processedImage.body.imageUrl) }) ) ) } ) ---------------------------------------- TITLE: Implementing Parallel Agent Execution with TypeScript and Upstash Workflow DESCRIPTION: Demonstrates how to create multiple worker agents that process tasks in parallel and aggregate their results using an aggregator agent. The implementation uses OpenAI's GPT-3.5-turbo model and handles quantum physics, relativity, and string theory explanations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/parallelization.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); // Define worker agents const worker1 = context.agents.agent({ model, name: 'worker1', maxSteps: 1, background: 'You are an agent that explains quantum physics.', tools: {} }); const worker2 = context.agents.agent({ model, name: 'worker2', maxSteps: 1, background: 'You are an agent that explains relativity.', tools: {} }); const worker3 = context.agents.agent({ model, name: 'worker3', maxSteps: 1, background: 'You are an agent that explains string theory.', tools: {} }); // Await results const [result1, result2, result3] = await Promise.all([ context.agents.task({ agent: worker1, prompt: "Explain quantum physics." }).run(), context.agents.task({ agent: worker2, prompt: "Explain relativity." }).run(), context.agents.task({ agent: worker3, prompt: "Explain string theory." }).run(), ]); // Aggregating results const aggregator = context.agents.agent({ model, name: 'aggregator', maxSteps: 1, background: 'You are an agent that summarizes multiple answers.', tools: {} }); const task = await context.agents.task({ agent: aggregator, prompt: `Summarize these three explanations: ${result1.text}, ${result2.text}, ${result3.text}` }) const finalSummary = await task.run(); console.log(finalSummary.text); }); ---------------------------------------- TITLE: Creating Event-Driven Order Processing Workflow in TypeScript DESCRIPTION: Main workflow implementation that handles order processing by waiting for external events. It includes steps for requesting processing, waiting for completion events, handling timeouts, and sending confirmation emails. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const { orderId, userEmail } = context.requestPayload; // Step 1: request order processing await context.run("request order processing", async () => { await requestProcessing(orderId) }) // Step 2: Wait for the order to be processed const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" // 10 minutes timeout } ); if (timeout) { // end workflow in case of timeout return; } const processedData = eventData; // Step 3: Log the processed order await context.run("process-order", async () => { console.log(`Order ${orderId} processed:`, processedData); }); // Step 4: Send a confirmation email await context.run("send-confirmation-email", async () => { await sendEmail( userEmail, "Your order has been processed!", processedData ); }); }); ---------------------------------------- TITLE: Creating Custom OpenAI Client for Upstash Workflow DESCRIPTION: Implementation of a custom OpenAI client that uses the Upstash Workflow's context.call method to make HTTP requests, ensuring durability for LLM API calls. This redirects the OpenAI SDK's network requests through Workflow's context. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { createOpenAI } from '@ai-sdk/openai'; import { HTTPMethods } from '@upstash/qstash'; import { WorkflowAbort, WorkflowContext } from '@upstash/workflow'; export const createWorkflowOpenAI = (context: WorkflowContext) => { return createOpenAI({ compatibility: "strict", fetch: async (input, init) => { try { // Prepare headers from init.headers const headers = init?.headers ? Object.fromEntries(new Headers(init.headers).entries()) : {}; // Prepare body from init.body const body = init?.body ? JSON.parse(init.body as string) : undefined; // Make network call const responseInfo = await context.call("openai-call-step", { url: input.toString(), method: init?.method as HTTPMethods, headers, body, }); // Construct headers for the response const responseHeaders = new Headers( Object.entries(responseInfo.header).reduce((acc, [key, values]) => { acc[key] = values.join(", "); return acc; }, {} as Record) ); // Return the constructed response return new Response(JSON.stringify(responseInfo.body), { status: responseInfo.status, headers: responseHeaders, }); } catch (error) { if (error instanceof WorkflowAbort) { throw error } else { console.error("Error in fetch implementation:", error); throw error; // Rethrow error for further handling } } }, }); }; ---------------------------------------- TITLE: Implementing Orchestrator-Workers Pattern with Upstash Workflow DESCRIPTION: Sets up an orchestrator pattern with three specialized worker agents for different physics topics. Uses Wikipedia as a knowledge source and OpenAI's GPT-4 for processing. The workers collaborate to create a comprehensive Q&A about advanced physics topics. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/orchestrator-workers.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-4o'); // Worker agents const worker1 = context.agents.agent({ model, name: 'worker1', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers general questions about advanced physics.' }); const worker2 = context.agents.agent({ model, name: 'worker2', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about quantum mechanics.' }); const worker3 = context.agents.agent({ model, name: 'worker3', tools: { wikiTool }, maxSteps: 3, background: 'You are a worker agent that answers questions about relativity.' }); // Synthesizing results const task = context.agents.task({ model, prompt: `Create a Q&A for advanced topics in physics`, agents: [worker1, worker2, worker3], maxSteps: 3, }) const { text } = await task.run(); console.log(text); }); ---------------------------------------- TITLE: Implementing Customer Onboarding Workflow in TypeScript with Next.js DESCRIPTION: This code snippet demonstrates a customer onboarding workflow using Upstash Workflow in TypeScript for a Next.js application. It handles user registration, sends welcome emails, and periodically checks user activity to send appropriate follow-up emails. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" type InitialData = { email: string } export const { POST } = serve(async (context) => { const { email } = context.requestPayload await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) while (true) { const state = await context.run("check-user-state", async () => { return await getUserState() }) if (state === "non-active") { await context.run("send-email-non-active", async () => { await sendEmail("Email to non-active users", email) }) } else if (state === "active") { await context.run("send-email-active", async () => { await sendEmail("Send newsletter to active users", email) }) } await context.sleep("wait-for-1-month", 60 * 60 * 24 * 30) } }) async function sendEmail(message: string, email: string) { // Implement email sending logic here console.log(`Sending ${message} email to ${email}`) } type UserState = "non-active" | "active" const getUserState = async (): Promise => { // Implement user state logic here return "non-active" } ---------------------------------------- TITLE: Implementing Authentication in FastAPI Workflow DESCRIPTION: Example of implementing authentication in a FastAPI workflow using Upstash Workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#2025-04-08_snippet_8 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.post("/auth") async def auth(context: AsyncWorkflowContext[str]) -> None: if context.headers.get("authentication") != "Bearer secret_password": print("Authentication failed.") return async def _step1() -> str: return "output 1" await context.run("step1", _step1) async def _step2() -> str: return "output 2" await context.run("step2", _step2) ---------------------------------------- TITLE: Implementing Custom Retry Logic for OpenAI API in TypeScript DESCRIPTION: This code snippet demonstrates a TypeScript implementation of custom retry logic for OpenAI API calls using Upstash Workflow. It includes dynamic request delays, response handling, and asynchronous storage of successful responses. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customRetry.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" import { storeResponse } from "@/lib/utils" const BASE_DELAY = 10; const createSystemMessage = () => ({ role: "system", content: "You are an AI assistant providing a brief summary and key insights for any given data.", }) const createUserMessage = (data: string) => ({ role: "user", content: `Analyze this data chunk: ${data}`, }) export const { POST } = serve<{ userData: string }>(async (context) => { // 👇 initial data sent along when triggering the workflow const { userData } = context.requestPayload for (let attempt = 0; attempt < 10; attempt++) { const response = await context.api.openai.call(`call-openai`, { token: process.env.OPENAI_API_KEY!, operation: "chat.completions.create", body: { model: "gpt-3.5-turbo", messages: [createSystemMessage(), createUserMessage(userData)], max_completion_tokens: 150, }, }) // Success case if (response.status < 300) { await context.run("store-response-in-db", () => storeResponse(response.body)) return } // Rate limit case - wait and retry if (response.status === 429) { const resetTime = response.header["x-ratelimit-reset-tokens"]?.[0] || response.header["x-ratelimit-reset-requests"]?.[0] || BASE_DELAY // assuming `resetTime` is in seconds await context.sleep("sleep-until-retry", Number(resetTime)) continue } // Any other scenario - pause for 5 seconds to avoid overloading OpenAI API await context.sleep("pause-to-avoid-spam", 5) } }) ---------------------------------------- TITLE: Complete Coffee Brewing Workflow with Parallel Steps in TypeScript DESCRIPTION: This code example illustrates a complete workflow for brewing coffee using Upstash Workflow. It includes parallel inventory checks, conditional brewing based on ingredient availability, and receipt printing. The workflow is designed to be served as a Next.js API route. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { checkInventory, brewCoffee, printReceipt } from "@/utils"; export const { POST } = serve(async (ctx) => { const [coffeeBeansAvailable, cupsAvailable, milkAvailable] = await Promise.all([ ctx.run("check-coffee-beans", () => checkInventory("coffee-beans")), ctx.run("check-cups", () => checkInventory("cups")), ctx.run("check-milk", () => checkInventory("milk")), ]); // If all ingedients available, brew coffee if (coffeeBeansAvailable && cupsAvailable && milkAvailable) { const price = await ctx.run("brew-coffee", async () => { return await brewCoffee({ style: "cappuccino" }); }); await printReceipt(price); } }); ---------------------------------------- TITLE: Implementing Image Processing Workflow in Python DESCRIPTION: This code snippet shows how to implement an image processing workflow using Upstash Workflow in Python. It covers steps for retrieving an image, resizing it to multiple resolutions, applying filters, and storing the processed images. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/imageProcessing.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_image, get_image_url app = FastAPI() serve = Serve(app) class ImageResult(TypedDict): image_url: str class ImageProcessingPayload(TypedDict): image_id: str user_id: str @serve.post("/process-image") async def process_image(context: AsyncWorkflowContext[ImageProcessingPayload]) -> None: payload = context.request_payload image_id = payload["image_id"] user_id = payload["user_id"] # Step 1: Retrieve the uploaded image async def _get_image_url() -> str: return await get_image_url(image_id) image_url: str = await context.run("get-image-url", _get_image_url) # Step 2: Resize the image to multiple resolutions resolutions = [640, 1280, 1920] resize_responses = [] for resolution in resolutions: response: CallResponse[ImageResult] = await context.call( f"resize-image-{resolution}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/resize", method="POST", body={"imageUrl": image_url, "width": resolution}, ) resize_responses.append(response) resized_images = [response.body for response in resize_responses] # Step 3: Apply filters to each resized image filters = ["grayscale", "sepia", "contrast"] filter_responses = [] for resized_image in resized_images: for filter in filters: response: CallResponse[ImageResult] = await context.call( f"apply-filter-{filter}", # endpoint which returns ImageResult type in response url="https://image-processing-service.com/filter", method="POST", body={"imageUrl": resized_image["imageUrl"], "filter": filter}, ) filter_responses.append(response) processed_images = [response.body for response in filter_responses] # Step 4: Store processed images in cloud storage async def _store_image() -> str: return await store_image(processed_image["imageUrl"]) stored_image_urls: List[str] = [] for processed_image in processed_images: stored_image_url = await context.run("store-image", _store_image) stored_image_urls.append(stored_image_url) ---------------------------------------- TITLE: Complete Order Fulfillment Workflow Implementation in Python DESCRIPTION: FastAPI implementation of the order fulfillment workflow using Upstash Workflow. Provides the same functionality as the TypeScript version with Python-specific typing and async handling. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import List, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import ( create_order_id, check_stock_availability, process_payment, dispatch_order, send_order_confirmation, send_dispatch_notification, ) app = FastAPI() serve = Serve(app) class OrderItem(TypedDict): product_id: str quantity: int class OrderPayload(TypedDict): user_id: str items: List[OrderItem] @serve.post("/order-fulfillment") async def order_fulfillment(context: AsyncWorkflowContext[OrderPayload]) -> None: # Get the order payload from the request payload = context.request_payload user_id = payload["user_id"] items = payload["items"] # Step 1: Create Order Id async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) # Step 2: Verify stock availability async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return # Step 3: Process payment async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) # Step 4: Dispatch the order async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) # Step 5: Send order confirmation email async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) # Step 6: Send dispatch notification async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ---------------------------------------- TITLE: Creating a Daily Backup Workflow in TypeScript (Next.js) DESCRIPTION: This snippet defines a workflow endpoint that creates and uploads a backup daily. It uses the Upstash Workflow API for Next.js to handle the backup process in two steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { createBackup, uploadBackup } from "./utils"; export const { POST } = serve( async (ctx) => { const backup = await ctx.run("create-backup", async () => { return await createBackup(); }); await ctx.run("upload-backup", async () => { await uploadBackup(backup); }); }, { failureFunction({ context, failStatus, failResponse, failHeader }) { // immediately get notified for failed backups // i.e. send an email, log to Sentry }, } ); ---------------------------------------- TITLE: Implementing AI Data Processing Workflow in Python DESCRIPTION: This code snippet shows how to implement an AI-powered data processing workflow using Upstash Workflow and OpenAI's GPT-4 model in Python. It covers steps for downloading a dataset, processing it in chunks, aggregating results, and generating a report. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI import json import os from typing import Dict, List, Any, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import ( aggregate_results, generate_report, send_report, get_dataset_url, split_into_chunks, ) app = FastAPI() serve = Serve(app) class RequestPayload(TypedDict): dataset_id: str user_id: str @serve.post("/ai-generation") async def ai_generation(context: AsyncWorkflowContext[RequestPayload]) -> None: request = context.request_payload dataset_id = request["dataset_id"] user_id = request["user_id"] # Step 1: Download the dataset async def _get_dataset_url() -> str: return await get_dataset_url(dataset_id) dataset_url = await context.run("get-dataset-url", _get_dataset_url) # HTTP request with much longer timeout (2hrs) response: CallResponse[Any] = await context.call( "download-dataset", url=dataset_url, method="GET" ) dataset = response.body # Step 2: Process data in chunks using OpenAI chunk_size = 1000 chunks = split_into_chunks(dataset, chunk_size) processed_chunks: List[str] = [] for i, chunk in enumerate(chunks): openai_response: CallResponse[Dict[str, str]] = await context.call( f"process-chunk-{i}", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [ { "role": "system", "content": "You are an AI assistant tasked with analyzing data chunks. Provide a brief summary and key insights for the given data.", }, { "role": "user", "content": f"Analyze this data chunk: {json.dumps(chunk)}", }, ], "max_tokens": 150, }, ) processed_chunks.append( openai_response.body["choices"][0]["message"]["content"] ) # Every 10 chunks, we'll aggregate intermediate results if i % 10 == 9 or i == len(chunks) - 1: async def _aggregate_results() -> None: await aggregate_results(processed_chunks) processed_chunks.clear() await context.run(f"aggregate-results{i}", _aggregate_results) # Step 3: Generate and send data report async def _generate_report() -> Any: return await generate_report(dataset_id) report = await context.run("generate-report", _generate_report) async def _send_report() -> None: await send_report(report, user_id) await context.run("send-report", _send_report) ---------------------------------------- TITLE: Implementing Workflow Route with Next.js SDK DESCRIPTION: Example implementation of a workflow route using the Upstash Next.js SDK that demonstrates parallel execution of email retrieval and LLM queries with sleep functionality. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/monitor.mdx#2025-04-08_snippet_0 LANGUAGE: javascript CODE: import { serve } from "@upstash/workflow/nextjs"; import { retrieveEmail, fetchFromLLm, UserRequest} from "../../../lib/util"; export const { POST } = serve( async (context) => { const input = context.requestPayload; await context.sleep("sleep", 10); const p1 = context.run("retrieveEmail", async () => { return retrieveEmail(input.id); }); const p2 = context.run("askllm", async () => { return fetchFromLLm(input.question); }); await Promise.all([p1, p2]) }, ); ---------------------------------------- TITLE: Initializing Workflow Context in Next.js (TypeScript) DESCRIPTION: Demonstrates how to use the serve function to create a workflow endpoint in Next.js, highlighting the workflow context object. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( // 👇 the workflow context async (context) => { // ... } ); ---------------------------------------- TITLE: Implementing Auth Provider Webhook Handler in Python DESCRIPTION: Python implementation of the authentication webhook handler using FastAPI and Upstash Workflow. Provides the same functionality as the TypeScript version with user creation, Stripe integration, and email notifications. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) class UserCreatedPayload(TypedDict): name: str email: str class UserStats(TypedDict): total_problems_solved: int most_interested_topic: str @serve.post("/auth-provider-webhook") async def auth_provider_webhook( context: AsyncWorkflowContext[UserCreatedPayload], ) -> None: payload = context.request_payload name = payload["name"] email = payload["email"] async def _sync_user() -> str: return await create_user_in_database(name, email) result = await context.run("sync user", _sync_user) userid = result["userid"] async def _create_new_user_in_stripe() -> None: await create_new_user_in_stripe(email) await context.run("create new user in stripe", _create_new_user_in_stripe) async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) await context.sleep("wait", 7 * 24 * 60 * 60) async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ---------------------------------------- TITLE: Notifying Workflows with client.notify in TypeScript DESCRIPTION: The client.notify method is used to trigger events that resume paused workflows. It requires a QStash token for client initialization and takes an event ID and event data as parameters. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "event-id", eventData: { my: "data" }, }); ---------------------------------------- TITLE: Triggering Workflow with Client DESCRIPTION: Demonstrates how to initialize the Workflow client and trigger a new workflow run with optional parameters like body, headers, run ID and flow control settings. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // optional body headers: { ... }, // optional headers workflowRunId: "my-workflow", // optional workflow run id retries: 3 // optional retries in the initial request flowControl: { // optional flow control key: "USER_GIVEN_KEY", ratePerSecond: 10, parallelism: 5 } }) console.log(workflowRunId) // prints wfr_my-workflow ---------------------------------------- TITLE: Implementing Custom Retry Logic for OpenAI API in Python DESCRIPTION: This code snippet shows a Python implementation of custom retry logic for OpenAI API calls using Upstash Workflow. It includes handling of rate limits, dynamic sleep times, and asynchronous storage of successful responses. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customRetry.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import Dict, Any, TypedDict import os from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from utils import store_response app = FastAPI() serve = Serve(app) class InitialData(TypedDict): user_data: str def create_system_message() -> Dict[str, str]: return { "role": "system", "content": "You are an AI assistant providing a brief summary and key insights for any given data.", } def create_user_message(data: str) -> Dict[str, str]: return {"role": "user", "content": f"Analyze this data chunk: {data}"} @serve.post("/custom-retry-logic") async def custom_retry_logic(context: AsyncWorkflowContext[InitialData]) -> None: # 👇 initial data sent along when triggering the workflow user_data = context.request_payload["user_data"] for attempt in range(10): response: CallResponse[Dict[str, Any]] = await context.call( "call-openai", url="https://api.openai.com/v1/chat/completions", method="POST", headers={ "authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", }, body={ "model": "gpt-4", "messages": [create_system_message(), create_user_message(user_data)], "max_tokens": 150, }, ) # Success case if response.status_code < 300: async def _store_response_in_db() -> None: await store_response(response.body) await context.run("store-response-in-db", _store_response_in_db) return # Rate limit case - wait and retry if response.status_code == 429: ratelimit_tokens_header = response.header.get("x-ratelimit-reset-tokens") ratelimit_requests_header = response.header.get( "x-ratelimit-reset-requests" ) reset_time = ( (ratelimit_tokens_header[0] if ratelimit_tokens_header else None) or (ratelimit_requests_header[0] if ratelimit_requests_header else None) or 10 ) # assuming `reset_time` is in seconds await context.sleep("sleep-until-retry", float(reset_time)) continue # Any other scenario - pause for 5 seconds to avoid overloading OpenAI API await context.sleep("pause-to-avoid-spam", 5) ---------------------------------------- TITLE: Invoking Workflows with context.invoke in TypeScript DESCRIPTION: This snippet demonstrates how to invoke another workflow from within a workflow using context.invoke. It shows the structure of the invoke method and the response object returned, containing the body, isFailed, and isCanceled properties. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const { body, // response from the invoked workflow isFailed, // whether the invoked workflow was canceled isCanceled // whether the invoked workflow failed } = await context.invoke( "analyze-content", { workflow: analyzeContent, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ) ---------------------------------------- TITLE: Charging Customer with Error Handling in Python DESCRIPTION: Code snippet showing how to attempt to charge a customer with proper error handling in Python. The function defines a nested async function with try-except block to handle errors during payment processing. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_3 LANGUAGE: python CODE: async def _charge_customer() -> Optional[ChargeResult]: try: return await charge_customer(i + 1) except Exception as e: print(f"Error: {e}") return None result = await context.run("charge customer", _charge_customer) ---------------------------------------- TITLE: Executing Parallel Steps in Upstash Workflow with TypeScript DESCRIPTION: This snippet demonstrates how to run multiple workflow steps in parallel using Promise.all syntax in Upstash Workflow. It shows the basic structure for executing three parallel steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/parallel-runs.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const [result1, result2, result3] = await Promise.all([ ctx.run("parallel-step-1", async () => { ... }), ctx.run("parallel-step-2", async () => { ... }), ctx.run("parallel-step-3", async () => { ... }), ]) ---------------------------------------- TITLE: Creating a Workflow Endpoint in TypeScript (Next.js) DESCRIPTION: Demonstrates how to use the 'serve' method to create a workflow endpoint with multiple steps in a Next.js application using TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const result = await context.run("step-1", async () => { // define a piece of business logic as step 1 }); await context.run("step-2", async () => { // define another piece of business logic as step 2 }); }); ---------------------------------------- TITLE: Payment Retry Logic in Python DESCRIPTION: Code snippet implementing a retry loop for payment processing in Python. The workflow attempts the payment up to 3 times with a 24-hour delay between retries, taking different actions based on success or failure. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: for i in range(3): # attempt to charge the customer if not result: # Wait for a day await context.sleep("wait for retry", 24 * 60 * 60) else: # Payment succeeded # Unsuspend user, send invoice email # end the workflow: return ---------------------------------------- TITLE: Implementing serveMany for Multiple Workflow Endpoints in Next.js DESCRIPTION: This snippet demonstrates how to use serveMany function to expose multiple workflows as endpoints in a Next.js application. It shows the setup of a catch-all route and how to define workflows that can invoke each other. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow, serveMany } from "@upstash/workflow/nextjs"; const workflowOne = createWorkflow(async (context) => { await context.run("say hi", () => { console.log("workflow one says hi!") }) const { body, isCanceled, isFailed } = await context.invoke("invoking other", { workflow: workflowTwo, body: "hello from workflow one", }) console.log(`received response from workflowTwo: ${body}`) }) const workflowTwo = createWorkflow(async (context: WorkflowContext) => { await context.run("say hi", () => { console.log("workflowTwo says hi!") console.log(`received: '${context.requestPayload}' in workflowTwo`) }) return "Workflow two finished!" }) export const { POST } = serveMany( { "workflow-one-route": workflowOne, "workflow-two-route": workflowTwo, } ) ---------------------------------------- TITLE: Implementing Weekly Summary Workflow in TypeScript (Next.js) DESCRIPTION: This snippet defines a workflow endpoint for generating and sending weekly user summaries using the Upstash Workflow API in Next.js. It fetches user data, generates a summary, and sends an email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { getUserData, generateSummary } from "@/utils/user-utils"; import { sendEmail } from "@/utils/email-utils"; // Type-safety for starting our workflow interface WeeklySummaryData { userId: string; } export const { POST } = serve(async (context) => { const { userId } = context.requestPayload; // Step 1: Fetch user data const user = await context.run("fetch-user-data", async () => { return await getUserData(userId); }); // Step 2: Generate weekly summary const summary = await context.run("generate-summary", async () => { return await generateSummary(userId); }); // Step 3: Send email with weekly summary await context.run("send-summary-email", async () => { await sendEmail(user.email, "Your Weekly Summary", summary); }); }); ---------------------------------------- TITLE: Payment Retry Logic in TypeScript DESCRIPTION: Code snippet implementing a retry loop for payment processing in TypeScript. The workflow attempts the payment up to 3 times with a 24-hour delay between retries, taking different actions based on success or failure. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: for (let i = 0; i < 3; i++) { // attempt to charge the user if (!result) { // Wait for a day await context.sleep("wait for retry", 24 * 60 * 60); } else { // Payment succeeded // Unsuspend user, send invoice email // end the workflow: return; } } ---------------------------------------- TITLE: Scheduling User-Specific Weekly Summaries in TypeScript (Next.js) DESCRIPTION: This snippet demonstrates how to create a user-specific schedule for sending weekly summaries using QStash in a Next.js API route. It calculates the first summary date and sets up a recurring schedule. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { signUp } from "@/utils/auth-utils"; import { Client } from "@upstash/qstash"; const client = new Client({ token: process.env.QSTASH_TOKEN! }); export async function POST(request: Request) { const userData: UserData = await request.json(); // Simulate user registration const user = await signUp(userData); // Calculate the date for the first summary (7 days from now) const firstSummaryDate = new Date(Date.now() + 7 * 24 * 60 * 60 * 1000); // Create cron expression for weekly summaries starting 7 days from signup const cron = `${firstSummaryDate.getMinutes()} ${firstSummaryDate.getHours()} * * ${firstSummaryDate.getDay()}`; // Schedule weekly account summary await client.schedules.create({ scheduleId: `user-summary-${user.email}`, destination: "https:///api/send-weekly-summary", body: { userId: user.id }, cron: cron, }); return NextResponse.json( { success: true, message: "User registered and summary scheduled" }, { status: 201 } ); } ---------------------------------------- TITLE: Basic Webhook Endpoint Setup in TypeScript and Python DESCRIPTION: Demonstrates how to create a basic webhook endpoint using the serve function from @upstash/workflow. Shows implementation for both TypeScript (Next.js) and Python (FastAPI) with payload parsing configuration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your webhook handling logic here }, { initialPayloadParser: (payload) => { return payload; }, } ); LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) def initial_payload_parser(payload): return payload @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[str]) -> None: # Your webhook handling logic here ---------------------------------------- TITLE: Sending Single Email with Resend API in Upstash Workflow (TypeScript) DESCRIPTION: This snippet demonstrates how to send a single email using the Resend API within an Upstash Workflow. It uses the context.api.resend.call method, which provides type safety and simplifies the API call process. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const { status, body } = await context.api.resend.call( "Call Resend", { token: "", body: { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, headers: { "content-type": "application/json", }, } ); ---------------------------------------- TITLE: Performing HTTP Calls with context.call in TypeScript DESCRIPTION: Demonstrates how to use context.call to perform long-running HTTP calls (up to 15 minutes or 2 hours) as a workflow step. This example shows calling OpenAI API to generate a long essay. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_9 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { status, // response status headers, // response headers body, // response body } = await context.call( "generate-long-essay", // Step name { url: "https://api.openai.com/v1/chat/completions", // Endpoint URL method: "POST", body: { // Request body model: "gpt-4o", messages: [ { role: "system", content: "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, { role: "user", content: request.topic }, ], }, headers: { // request headers authorization: `Bearer ${process.env.OPENAI_API_KEY}`, }, } ); }); ---------------------------------------- TITLE: Calling OpenAI API using context.api.openai.call in TypeScript DESCRIPTION: This snippet demonstrates how to use the type-safe method context.api.openai.call to interact with OpenAI's chat completions API. It includes setting up the API key, specifying the operation, and structuring the request body with messages. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const { status, body } = await context.api.openai.call( "Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" } ], }, } ); // get text: console.log(body.content[0].text) ---------------------------------------- TITLE: Sending Batch Emails with Resend API in Upstash Workflow (TypeScript) DESCRIPTION: This code snippet shows how to send multiple emails in a batch using the Resend API within an Upstash Workflow. It uses the context.api.resend.call method with the batch option set to true, allowing for efficient sending of multiple emails in one API call. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: const { status, body } = await context.api.resend.call( "Call Resend", { batch: true, token: "", body: [ { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, ], headers: { "content-type": "application/json", }, } ); ---------------------------------------- TITLE: Executing Serial Workflow Steps in Next.js (TypeScript) DESCRIPTION: Illustrates how to define and execute workflow steps serially using the context.run method in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const input = context.requestPayload; const result1 = await context.run("step-1", async () => { return someWork(input); }); await context.run("step-2", async () => { someOtherWork(result1); }); }); ---------------------------------------- TITLE: Scheduling User-Specific Weekly Summaries in Python (FastAPI) DESCRIPTION: This snippet shows how to create a user-specific schedule for sending weekly summaries using QStash in a FastAPI route. It calculates the first summary date and sets up a recurring schedule. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_3 LANGUAGE: python CODE: from fastapi import FastAPI, Request from fastapi.responses import JSONResponse from qstash import AsyncQStash from datetime import datetime, timedelta app = FastAPI() client = AsyncQStash("") @app.post("/api/sign-up") async def sign_up(request: Request): user_data = await request.json() # Simulate user registration user = await sign_up(user_data) # Calculate the date for the first summary (7 days from now) first_summary_date = datetime.now() + timedelta(days=7) # Create cron expression for weekly summaries starting 7 days from signup cron = f"{first_summary_date.minute} {first_summary_date.hour} * * {first_summary_date.day}" # Schedule weekly account summary await client.schedule.create_json( schedule_id=f"user-summary-{user.email}", destination="https:///api/send-weekly-summary", body={"userId": user.id}, cron=cron, ) return JSONResponse( content={"success": True, "message": "User registered and summary scheduled"}, status_code=201, ) ---------------------------------------- TITLE: Implementing QStash Verification in TypeScript DESCRIPTION: TypeScript implementation of QStash request verification using explicit signing keys in the serve function. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { // Your workflow steps... }, { receiver: new Receiver({ currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY, }), } ); ---------------------------------------- TITLE: Executing Serial Workflow Steps in FastAPI (Python) DESCRIPTION: Demonstrates how to define and execute workflow steps serially using the context.run method in Python with FastAPI. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_4 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1(): return some_work(input) result1 = await context.run("step-1", _step1) async def _step2(): return some_other_work(result1) await context.run("step-2", _step2) ---------------------------------------- TITLE: Calling Anthropic API for Text Generation in Upstash Workflow DESCRIPTION: This snippet demonstrates how to use the 'context.api.anthropic.call' method to make a request to Anthropic's text generation API. It includes setting up the API key, specifying the operation, and defining the request body with model and message parameters. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/anthropic.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const { status, body } = await context.api.anthropic.call( "Call Anthropic", { token: "", operation: "messages.create", body: { model: "claude-3-5-sonnet-20241022", max_tokens: 1024, messages: [ {"role": "user", "content": "Hello, world"} ] }, } ); // get text: console.log(body.content[0].text) ---------------------------------------- TITLE: Invoking Another Workflow with context.invoke DESCRIPTION: Shows how to use context.invoke to trigger another workflow and wait for its completion, enabling workflow composition and reuse of workflow logic. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_17 LANGUAGE: typescript CODE: const { body, isFailed, isCanceled } = await context.invoke( "invoke another workflow", { workflow: anotherWorkflow, body: "test", header: {...}, // headers to pass to anotherWorkflow (optional) retries, // number of retries (optional, default: 3) flowControl, // flow control settings (optional) workflowRunId // workflowRunId to set (optional) } ); ---------------------------------------- TITLE: Limiting External API Calls with Context.call DESCRIPTION: This example demonstrates how to use flow control to limit calls to an external API (in this case, OpenAI) within a workflow step. It uses the 'context.call' method to apply rate and parallelism limits. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#2025-04-08_snippet_2 LANGUAGE: javascript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const response = await context.call( "generate-long-essay", { url: "https://api.openai.com/v1/chat/completions", method: "POST", body: {/*****/}, flowControl: { key: "opani-call", parallelism: 3, ratePerSecond: 10 } } ); }); ---------------------------------------- TITLE: Implementing Failure Function for Workflow Error Handling in TypeScript DESCRIPTION: This code snippet demonstrates how to use the 'failureFunction' parameter in the 'serve' function to handle workflow errors gracefully. It allows for custom error handling logic, such as logging to Sentry. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureFunction: async ({ context, failStatus, failResponse, failHeaders, }) => { // Handle error, i.e. log to Sentry }, } ); ---------------------------------------- TITLE: Setting Retry Count in TypeScript and Python DESCRIPTION: Demonstrates how to set the number of retries for a workflow in both TypeScript and Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: export const { POST } = serve( async (context) => { ... }, { retries: 3 } ); LANGUAGE: python CODE: @serve.post("/api/example", retries=3) async def example(context: AsyncWorkflowContext[str]) -> None: ... ---------------------------------------- TITLE: Notifying Waiting Workflows with context.notify DESCRIPTION: Demonstrates how to use context.notify to trigger waiting workflows that have been paused with waitForEvent, allowing for coordination between multiple workflow runs. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_16 LANGUAGE: javascript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload; const { notifyResponse, // result of notify, which is a list of notified waiters } = await context.notify("notify step", "my-event-Id", payload); }); ---------------------------------------- TITLE: Configuring Flow Control in Workflow Trigger Method DESCRIPTION: This code shows how to apply flow control when triggering a workflow using the Upstash Workflow Client. It sets the same parallelism and rate limits as the 'serve' method example. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/flow-control.mdx#2025-04-08_snippet_1 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https://workflow-endpoint.com", body: "hello there!", flowControl: { key: "app1", parallelism: 3, ratePerSecond: 10 } }); ---------------------------------------- TITLE: Canceling a Workflow with context.cancel DESCRIPTION: Demonstrates how to cancel the current workflow execution using context.cancel, allowing for conditional termination of workflows based on runtime conditions. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_18 LANGUAGE: typescript CODE: export const { POST } = serve<{ topic: string }>(async (context) => { const payload = context.requestPayload const result = await context.run("check if canceled", () => { ... }); if (result.cancel) { await context.cancel() // cancel the workflow run } }) ---------------------------------------- TITLE: Pausing Workflow Execution with context.waitForEvent DESCRIPTION: Shows how to use context.waitForEvent to pause a workflow until an external notification is received or a timeout occurs, enabling event-driven workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_15 LANGUAGE: javascript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ topic: string }>(async (context) => { const request = context.requestPayload; const { eventData, // data passed in notify timeout, // boolean denoting whether the step was notified or timed out } = await context.waitForEvent("wait for some event", "my-event-id", { timeout: "1000s", // 1000 second timeout }); }); ---------------------------------------- TITLE: Correcting Non-idempotent Code with context.run - TypeScript DESCRIPTION: Example showing how to properly handle non-idempotent code by placing it inside context.run to ensure consistent behavior when the workflow endpoint is called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_11 LANGUAGE: typescript CODE: const result = await context.run(async () => { await getResultFromDb(entryId) }); if (result.return) { return; } ---------------------------------------- TITLE: Charging Customer with Error Handling in TypeScript DESCRIPTION: Code snippet showing how to attempt to charge a customer with proper error handling in TypeScript. The function uses context.run to create a workflow step and catches exceptions to implement custom logic on payment failure. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: const result = await context.run("charge customer", async () => { try { return await chargeCustomer(i + 1), } catch (e) { console.error(e); return } }); ---------------------------------------- TITLE: Configuring Failure Function in TypeScript DESCRIPTION: Shows how to define a failure function for a workflow using the 'failureFunction' option in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: export const { POST } = serve( async (context) => { ... }, { failureFunction: async ({ context, // context during failure failStatus, // failure status failResponse, // failure message failHeaders // failure headers }) => { // handle the failure } } ); ---------------------------------------- TITLE: Using Resend Integration with context.api DESCRIPTION: Demonstrates using the Resend email service integration via context.api namespace to send emails as part of a workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_14 LANGUAGE: typescript CODE: const { status, body } = await context.api.resend.call("Call Resend", { token: "", body: { from: "Acme ", to: ["delivered@resend.dev"], subject: "Hello World", html: "

It works!

", }, headers: { "content-type": "application/json", }, }); ---------------------------------------- TITLE: Correcting Non-idempotent Code with context.run - Python DESCRIPTION: Python example showing how to properly handle non-idempotent code by placing it inside context.run to ensure consistent behavior when the workflow endpoint is called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_12 LANGUAGE: python CODE: async def _get_result_from_db(): return await get_result_from_db(entry_id) result = await context.run("get-result-from-db", _get_result_from_db) if result.should_return: return ---------------------------------------- TITLE: Implementing Weekly Summary Workflow in Python (FastAPI) DESCRIPTION: This snippet defines a workflow endpoint for generating and sending weekly user summaries using the Upstash Workflow API in FastAPI. It fetches user data, generates a summary, and sends an email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import get_user_data, generate_summary, send_email app = FastAPI() serve = Serve(app) @dataclass class WeeklySummaryData: user_id: str @serve.post("/api/send-weekly-summary") async def send_weekly_summary(context: AsyncWorkflowContext[WeeklySummaryData]) -> None: user_id = context.request_payload.user_id # Step 1: Fetch user data async def _step1(): return await get_user_data(user_id) user = await context.run("fetch_user_data", _step1) # Step 2: Generate weekly summary async def _step2(): return await generate_summary(user_id) summary = await context.run("generate_summary", _step2) # Step 3: Send email with weekly summary async def _step3(): await send_email(user.email, "Your Weekly Summary", summary) await context.run("send_summary_email", _step3) ---------------------------------------- TITLE: Implementing Event Wait with Timeout DESCRIPTION: Code showing how to implement waiting for an event with a timeout using context.waitForEvent method. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: const { eventData, timeout } = await context.waitForEvent( "wait for order processing", `order-${orderId}`, { timeout: "10m" // 10 minutes timeout } ); if (timeout) { // end workflow in case of timeout return; } ---------------------------------------- TITLE: Ensuring Idempotency in context.run - TypeScript DESCRIPTION: Example showing how business logic should be placed inside context.run to ensure idempotent behavior, as workflow steps may be retried in distributed systems. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_13 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { return someWork(input) }) }) ---------------------------------------- TITLE: Creating Custom AI SDK Tool for Mathematical Expressions DESCRIPTION: Example of creating a custom tool using the AI SDK for evaluating mathematical expressions. This demonstrates compatibility with AI SDK tools. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { z } from 'zod' import { tool } from 'ai' import * as mathjs from 'mathjs' const mathTool = tool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", parameters: z.object({ expression: z.string() }), execute: async ({ expression }) => mathjs.evaluate(expression), }) ---------------------------------------- TITLE: Implementing Call Functionality in FastAPI Workflow DESCRIPTION: Example of implementing call functionality in a FastAPI workflow using Upstash Workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#2025-04-08_snippet_7 LANGUAGE: python CODE: from fastapi import FastAPI from typing import Dict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse app = FastAPI() serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.post("/get-data") async def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.post("/call") async def call(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = await context.run("step1", _step1) response: CallResponse[Dict[str, str]] = await context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) async def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output await context.run("step2", _step2) ---------------------------------------- TITLE: Creating Workflow Endpoint with Next.js App Router DESCRIPTION: TypeScript implementation of workflow endpoints using Next.js App Router, showing both minimal and request object examples SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { NextRequest } from "next/server"; export const POST = async (request: NextRequest) => { // do something with the native request object const { POST: handler } = serve(async (context) => { // Your workflow steps }); return await handler(request); } ---------------------------------------- TITLE: Defining Custom WorkflowTool for Mathematical Expressions DESCRIPTION: Implementation of a custom WorkflowTool for evaluating mathematical expressions. This tool uses the mathjs library to perform calculations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: import { WorkflowTool } from '@upstash/workflow' import { z } from 'zod' import * as mathjs from 'mathjs' const tool = new WorkflowTool({ description: 'A tool for evaluating mathematical expressions. ' + 'Example expressions: ' + "'1.2 * (2 + 4.5)', '12.7 cm to inch', 'sin(45 deg) ^ 2'.", schema: z.object({ expression: z.string() }), invoke: async ({ expression }) => mathjs.evaluate(expression), }) ---------------------------------------- TITLE: Starting Local QStash Server DESCRIPTION: Command to start a local QStash server for development SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nextjs-fastapi.mdx#2025-04-08_snippet_4 LANGUAGE: bash CODE: npx @upstash/qstash-cli dev ---------------------------------------- TITLE: Suspending User and Sending Notification in TypeScript DESCRIPTION: This TypeScript code checks if a user is suspended, suspends them if not, and sends a notification email. It uses a context object to run tasks asynchronously. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_10 LANGUAGE: typescript CODE: const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (!isSuspended) { await context.run("suspend user", async () => { await suspendUser(email); }); await context.run("send suspended email", async () => { await sendEmail( context.requestPayload.email, "Your account has been suspended due to payment failure. Please update your payment method." ); }); } ---------------------------------------- TITLE: Implementing Workflow Endpoint in Express.js DESCRIPTION: Setting up an Express.js workflow endpoint with Upstash Workflow. This example defines two steps where the first step processes a message and the second step logs the result. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/express.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/express"; import express from 'express'; import { config } from 'dotenv'; config(); const app = express(); app.use( express.json() ); app.post( '/workflow', serve<{ message: string }>( async (context) => { const res1 = await context.run("step1", async () => { const message = context.requestPayload.message; return message; }) await context.run("step2", async () => { console.log(res1); }) } ) ); app.listen(3000, () => { console.log('Server running on port 3000'); }); ---------------------------------------- TITLE: Creating a Daily Backup Workflow in Python (FastAPI) DESCRIPTION: This snippet defines a workflow endpoint for creating and uploading a backup daily using FastAPI and the Upstash Workflow API for Python. It demonstrates how to structure the workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/schedule.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from utils import create_backup, upload_backup app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context: AsyncWorkflowContext[str]) -> None: async def _step1(): return await create_backup() backup = await context.run("create_backup", _step1) async def _step2(): await upload_backup(backup) await context.run("upload_backup", _step2) ---------------------------------------- TITLE: Implementing SleepUntil in Workflow Steps (Python) DESCRIPTION: Shows how to use the context.sleep_until method to pause workflow execution until a specific timestamp in Python with FastAPI. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_8 LANGUAGE: python CODE: from fastapi import FastAPI from datetime import datetime, timedelta from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Calculate the date for one week from now one_week_from_now = datetime.now() + timedelta(days=7) # 👇 Wait until the calculated date await context.sleep_until("wait-for-one-week", one_week_from_now) async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ---------------------------------------- TITLE: Sending Welcome Email in TypeScript DESCRIPTION: This code snippet demonstrates how to send a welcome email to a newly signed-up user using Upstash Workflow in TypeScript. It uses the context.run method to execute the email sending task. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: await context.run("new-signup", async () => { await sendEmail("Welcome to the platform", email) }) ---------------------------------------- TITLE: Implementing Basic Workflow Endpoint in FastAPI DESCRIPTION: Example of a basic workflow endpoint implementation using FastAPI and Upstash Workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve app = FastAPI() serve = Serve(app) @serve.post("/api/workflow") async def workflow(context) -> None: async def _step1() -> None: print("initial step ran") await context.run("initial-step", _step1) async def _step2() -> None: print("second step ran") await context.run("second-step", _step2) ---------------------------------------- TITLE: Getting Workflow Logs DESCRIPTION: Shows how to retrieve workflow run logs using the client's log method with optional filtering parameters like run ID, count, state, and URL. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const { runs, cursor } = await client.logs({ // Id of the workflow run to get workflowRunId, // Number of workflows to get count, // Workflow state to filter for. // One of "RUN_STARTED", "RUN_SUCCESS", "RUN_FAILED", "RUN_CANCELED" state, // Workflow url to search for. should be an exact match workflowUrl, // Unix timestamp when the run was created workflowCreatedAt, // Cursor from a previous request to continue the search cursor }) ---------------------------------------- TITLE: Sending Welcome Email using TypeScript and Python DESCRIPTION: This snippet shows how to send a welcome email to a new user. It uses a context object to run the email sending operation asynchronously. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: await context.run("send welcome email", async () => { await sendEmail( email, "Welcome to our platform!, You have 14 days of free trial." ); }); LANGUAGE: python CODE: async def _send_welcome_email() -> None: await send_email( email, "Welcome to our platform!, You have 14 days of free trial." ) await context.run("send welcome email", _send_welcome_email) ---------------------------------------- TITLE: Implementing Call Functionality in Flask Workflow DESCRIPTION: Example of a Flask workflow that demonstrates the use of call functionality. It processes input, makes an HTTP call, and processes the response. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_4 LANGUAGE: python CODE: from flask import Flask from typing import Dict from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext, CallResponse app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @app.route("/get-data", methods=["POST"]) def get_data() -> Dict[str, str]: return {"message": "get data response"} @serve.route("/call") def call(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) response: CallResponse[Dict[str, str]] = context.call( "get-data", url=f"{context.env.get('UPSTASH_WORKFLOW_URL', 'http://localhost:8000')}/get-data", method="POST", body={"message": result1}, ) def _step2() -> str: output = some_work(response.body["message"]) print("step 2 input", response, "output", output) return output context.run("step2", _step2) ---------------------------------------- TITLE: Inter-Workflow Notification with context.notify in TypeScript DESCRIPTION: The context.notify method allows one workflow to notify another workflow, triggering its resume. It takes a step name, event ID, and event data parameters. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: const { notifyResponse } = await context.notify( "notify step", // notify step name "event-Id", // event id { my: "data" } // event data ); ---------------------------------------- TITLE: Notifying Waiting Workflows DESCRIPTION: Shows how to notify a workflow that is waiting for an event using the notify method with event ID and data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#2025-04-08_snippet_3 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.notify({ eventId: "my-event-id", eventData: "my-data", // data passed to the workflow run }); ---------------------------------------- TITLE: Setting Failure URL for Workflow Error Handling in TypeScript DESCRIPTION: This code snippet shows how to use the 'failureUrl' parameter in the 'serve' function to handle cases where the service hosting the workflow URL is unavailable. It sends a workflow failure notification to another reachable endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/failures.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: export const { POST } = serve( async (context) => { // Your workflow logic... }, { failureUrl: "https:///workflow-failure", } ); ---------------------------------------- TITLE: Stock Verification Implementation DESCRIPTION: Code snippets showing the stock verification step in both TypeScript and Python. Creates an order ID and checks stock availability before proceeding. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: const orderId = await context.run("create-order-id", async () => { return await createOrderId(userId); }); const stockAvailable = await context.run("check-stock", async () => { return await checkStockAvailability(items) }) if (!stockAvailable) { console.warn("Some items are out of stock") return; } LANGUAGE: python CODE: async def _create_order_id(): return await create_order_id(user_id) order_id: str = await context.run("create-order-id", _create_order_id) async def _check_stock(): return await check_stock_availability(items) stock_available: bool = await context.run("check-stock", _check_stock) if not stock_available: print("Some items are out of stock") return ---------------------------------------- TITLE: Initializing Workflow Context in FastAPI (Python) DESCRIPTION: Shows how to set up a workflow endpoint using FastAPI and the Upstash Workflow library in Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: ... ---------------------------------------- TITLE: Creating Workflow Endpoint in SvelteKit DESCRIPTION: TypeScript code for defining a workflow endpoint in a SvelteKit project, including two example steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/svelte" import { env } from "$env/dynamic/private" export const { POST } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, { env } ) ---------------------------------------- TITLE: Payment Processing Implementation DESCRIPTION: Code snippets demonstrating the payment processing step in both TypeScript and Python implementations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: await context.run("process-payment", async () => { return await processPayment(orderId) }) LANGUAGE: python CODE: async def _process_payment(): return await process_payment(order_id) await context.run("process-payment", _process_payment) ---------------------------------------- TITLE: Performing HTTP Calls with context.call in Python DESCRIPTION: Shows how to use the Python equivalent of context.call in Upstash Workflow with FastAPI to make long-running HTTP requests to external services. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_10 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @dataclass class Request: topic: str @serve.post("/api/example") async def example(context: AsyncWorkflowContext[Request]) -> None: request: Request = context.request_payload result = await context.call( "generate-long-essay", url="https://api.openai.com/v1/chat/completions", method="POST", body={ "model": "gpt-4o", "messages": [ { "role": "system", "content": "You are a helpful assistant writing really long essays that would cause a normal serverless function to timeout.", }, {"role": "user", "content": request["topic"]}, ], }, headers={ "authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", }, ) status, headers, body = result.status, result.headers, result.body ---------------------------------------- TITLE: Creating a Workflow Endpoint with Hono Context DESCRIPTION: Advanced implementation of a workflow endpoint with typed bindings, environment variables access, and payload type definition for enhanced type safety. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/hono.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: import { Hono } from "hono" import { serve, WorkflowBindings } from "@upstash/workflow/hono" import { env } from "hono/adapter" interface Bindings extends WorkflowBindings { ENVIRONMENT: "development" | "production" } const app = new Hono<{ Bindings: Bindings }>() app.post("/workflow", (c) => { // 👇 access Honos native context, i.e. getting an env variable const { ENVIRONMENT } = env(c) // 👇 `unknown` represents your initial payload data type const handler = serve( async (context) => { ... } ) return await handler(c) }) export default app ---------------------------------------- TITLE: Implementing Workflow Notification Client DESCRIPTION: Example showing how to notify a waiting workflow using the Upstash Client. This code demonstrates sending event data to resume a paused workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) const orderId = "1324" await client.notify({ eventId: `order-${orderId}`, eventData: { deliveryTime: "2 days" } }); ---------------------------------------- TITLE: Customer Notification Implementation DESCRIPTION: Code snippets demonstrating the customer notification steps for order confirmation and dispatch notification in both TypeScript and Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: await context.run("send-confirmation", async () => { return await sendOrderConfirmation(userId, orderId) }) await context.run("send-dispatch-notification", async () => { return await sendDispatchNotification(userId, orderId) }) LANGUAGE: python CODE: async def _send_confirmation(): return await send_order_confirmation(user_id, order_id) await context.run("send-confirmation", _send_confirmation) async def _send_dispatch_notification(): return await send_dispatch_notification(user_id, order_id) await context.run("send-dispatch-notification", _send_dispatch_notification) ---------------------------------------- TITLE: Sending Trial Ended Email using TypeScript and Python DESCRIPTION: This snippet shows how to send a trial ended email after the trial period has expired. It waits for 2 days after the trial warning before sending the final email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_7 LANGUAGE: typescript CODE: await context.sleep("wait for trial end", 2 * 24 * 60 * 60); await context.run("send trial end email", async () => { await sendEmail( email, "Your trial has ended. Please upgrade your plan to keep using our platform." ); }); LANGUAGE: python CODE: await context.sleep("wait for trial end", 2 * 24 * 60 * 60) async def _send_trial_end_email() -> None: await send_email( email, "Your trial has ended. Please upgrade your plan to keep using our platform.", ) await context.run("send trial end email", _send_trial_end_email) ---------------------------------------- TITLE: Custom Authorization in TypeScript DESCRIPTION: Implementation of custom bearer token authorization in TypeScript workflow endpoints. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { console.error("Authentication failed."); return; } // Your workflow steps.. }, { failureFunction: async () => { const authHeader = context.headers.get("authorization"); const bearerToken = authHeader?.split(" ")[1]; if (!isValid(bearerToken)) { // ... } }, } ); ---------------------------------------- TITLE: Creating Type-Safe Workflows with createWorkflow in TypeScript DESCRIPTION: This snippet shows how to use createWorkflow to create type-safe workflow objects that can be invoked by other workflows. It demonstrates defining a workflow with typed request and response, and invoking it from another workflow with type safety. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { WorkflowContext } from "@upstash/workflow"; import { createWorkflow } from "@upstash/workflow/nextjs"; const anotherWorkflow = createWorkflow( // Define the workflow logic, specifying the type of the initial request body. // In this case, the body is a string: async (context: WorkflowContext) => { await context.sleep("wait 1 second", 1) // Return a response from the workflow. The type of this // response will be available when `context.invoke` is // called with `anotherWorkflow`. return { message: "This is the data returned by the workflow" }; } ); const someWorkflow = createWorkflow(async (context) => { // Invoke anotherWorkflow with a string body and get the response // The types of the body parameter and the response are // typesafe and inferred from anotherWorkflow const { body } = await context.invoke( "invoke anotherWorkflow", { workflow: anotherWorkflow, body: "user-1" } ), }); ---------------------------------------- TITLE: Configuring QStash Receiver for Request Verification DESCRIPTION: Demonstrates how to set up a QStash Receiver to verify incoming requests. The receiver ensures that requests are legitimate and originate from QStash by validating signing keys. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_11 LANGUAGE: typescript CODE: import { Receiver } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { receiver: new Receiver({ // 👇 grab these variables from your QStash dashboard currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY!, nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY!, }) } ); LANGUAGE: python CODE: from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ---------------------------------------- TITLE: Creating Workflow Endpoint with Next.js Pages Router DESCRIPTION: TypeScript implementation of workflow endpoints using Next.js Pages Router, showing both minimal and request object examples SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/vercel-nextjs.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { servePagesRouter } from "@upstash/workflow/nextjs"; const { handler } = servePagesRouter( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) } ) export default handler; LANGUAGE: typescript CODE: import type { NextApiRequest, NextApiResponse } from "next"; import { servePagesRouter } from "@upstash/workflow/nextjs"; export default async function handler( req: NextApiRequest, res: NextApiResponse ) { // do something with the native request object const { handler } = servePagesRouter( async (context) => { // Your workflow steps } ) await handler(req, res) } ---------------------------------------- TITLE: Implementing QStash Verification in Python DESCRIPTION: Python implementation of QStash request verification using explicit signing keys with the serve decorator. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#2025-04-08_snippet_3 LANGUAGE: python CODE: from qstash import Receiver @serve.post( "/api/example", receiver=Receiver( current_signing_key=os.environ["QSTASH_CURRENT_SIGNING_KEY"], next_signing_key=os.environ["QSTASH_NEXT_SIGNING_KEY"], ), ) async def example(context: AsyncWorkflowContext[str]) -> None: ... ---------------------------------------- TITLE: Using OpenAI Integration with context.api DESCRIPTION: Demonstrates the OpenAI integration via context.api namespace, allowing typesafe API calls to OpenAI services within a workflow. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_12 LANGUAGE: typescript CODE: const { status, body } = await context.api.openai.call("Call OpenAI", { token: "", operation: "chat.completions.create", body: { model: "gpt-4o", messages: [ { role: "system", content: "Assistant says 'hello!'", }, { role: "user", content: "User shouts back 'hi!'" }, ], }, }); ---------------------------------------- TITLE: Executing Business Logic in context.run in TypeScript and Python DESCRIPTION: Example showing how to properly place business logic inside context.run functions. The code outside context.run executes multiple times during a workflow run, while code inside only runs once per step. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", () => { return { success: true } }) console.log("This log will appear multiple times") await context.run("step-2", () => { console.log("This log will appear just once") console.log("Step 1 status is:", result.success) }) }) LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return {"success": True} result = await context.run("step-1", _step_1) print("This log will appear multiple times") async def _step_2() -> None: print("This log will appear just once") print("Step 1 status is:", result["success"]) await context.run("step-2", _step_2) ---------------------------------------- TITLE: Requesting Order Processing Step DESCRIPTION: Code snippet showing the implementation of the first step in the workflow where order processing is requested. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: await context.run("request order processing", async () => { await requestProcessing(orderId) }) ---------------------------------------- TITLE: Implementing Sleep in Workflow Steps (Python) DESCRIPTION: Demonstrates how to use the context.sleep method to pause workflow execution for a specified duration in Python with FastAPI. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_6 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext from onboarding_utils import sign_in, send_email app = FastAPI() serve = Serve(app) @serve.post("/api/onboarding") async def onboarding(context: AsyncWorkflowContext[User]) -> None: user_data = context.request_payload async def _sign_in(): return await sign_in(user_data) user = await context.run("sign-in", _sign_in) # 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d") async def _send_email(): return await send_email(user.name, user.email) await context.run("send-welcome-email", _send_email) ---------------------------------------- TITLE: Using LangChain Wikipedia Query Tool DESCRIPTION: Demonstration of using a pre-built LangChain tool for Wikipedia queries. This tool allows agents to fetch information from Wikipedia. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_8 LANGUAGE: typescript CODE: import { WikipediaQueryRun } from '@langchain/community/tools/wikipedia_query_run' const wikiTool = new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) ---------------------------------------- TITLE: Fetching Workflow Runs - HTTP GET Endpoint DESCRIPTION: HTTP GET endpoint for retrieving workflow run details from QStash. Supports filtering by various parameters including run ID, workflow URL, state, and date range. Includes pagination support via cursor parameter. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#2025-04-08_snippet_0 LANGUAGE: http CODE: GET https://qstash.upstash.io/v2/workflows/logs ---------------------------------------- TITLE: Order Dispatch Implementation DESCRIPTION: Code snippets showing the order dispatch step in both TypeScript and Python implementations. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/eCommerceOrderFulfillment.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: await context.run("dispatch-order", async () => { return await dispatchOrder(orderId, items) }) LANGUAGE: python CODE: async def _dispatch_order(): return await dispatch_order(order_id, items) await context.run("dispatch-order", _dispatch_order) ---------------------------------------- TITLE: Customizing Response and Request Body Types for OpenAI API Call in TypeScript DESCRIPTION: This snippet shows how to override the predefined types for the request and response body when calling the OpenAI API. It allows for more flexibility in handling custom data structures. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/openai.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.openai.call< ResponseBodyType, RequestBodyType >( "Call OpenAI", { ... } ); ---------------------------------------- TITLE: Sending Reminder Email after 7 Days using TypeScript and Python DESCRIPTION: This snippet demonstrates how to send a reminder email after 7 days. It checks user stats and calls a function to send an appropriate email based on the user's activity. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: await context.sleep("wait", 7 * 24 * 60 * 60); const stats = await context.run("get user stats", async () => { return await getUserStats(userid); }); await sendProblemSolvedEmail({context, email, stats}); LANGUAGE: python CODE: await context.sleep("wait", 7 * 24 * 60 * 60) async def _get_user_stats() -> UserStats: return await get_user_stats(userid) stats: UserStats = await context.run("get user stats", _get_user_stats) await send_problem_solved_email(context, email, stats) ---------------------------------------- TITLE: Correct Variable Handling in Workflow Steps - TypeScript DESCRIPTION: Example showing the correct approach to pass data between workflow steps by returning values from context.run and using them in subsequent steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload const result = await context.run("step-1", async () => { return await someWork(input) }) await context.run("step-2", async () => { someOtherWork(result) }) }) ---------------------------------------- TITLE: Cancelling an Upstash Workflow Run Programmatically with JavaScript DESCRIPTION: This code demonstrates how to cancel a running Upstash Workflow by its ID using the JavaScript client. It requires an Upstash QStash token for authentication and the workflow run ID that you want to cancel. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/cancel.mdx#2025-04-08_snippet_0 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); await client.cancel({ ids: "" }); ---------------------------------------- TITLE: Webhook Event Processing Implementation DESCRIPTION: Demonstrates how to process webhook events using context.run method with discrete, trackable steps. Includes user creation event handling and subsequent operations like Stripe customer creation. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const user = await context.run( "handle-webhook-event", async () => { if (event.type === "user.created") { const { id: clerkUserId, email_addresses, first_name } = event.data; const primaryEmail = email_addresses.find( (email) => (email.id = event.data.primary_email_address_id) ); if (!primaryEmail) { return false; } return { event: event.type, userId: clerkUserId, email: primaryEmail.email_address, firstName: first_name, } as UserPayload; } return false; } ); }); LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _handle_webhook_event(): if event.type == "user.created": clerk_user_id = event.data["id"] email_addresses = event.data["email_addresses"] first_name = event.data["first_name"] primary_email = next( ( email for email in email_addresses if email.id == event.data["primary_email_address_id"] ), None, ) if not primary_email: return False return { "event": event.type, "user_id": clerk_user_id, "email": primary_email["email_address"], "first_name": first_name, } return False user = await context.run("handle-webhook-event", _handle_webhook_event) LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { if (!user) { return; } const customer = await context.run("create-stripe-customer", async () => { return await stripe.customers.create({ email: user.email, name: `${user.firstName} ${user.lastName}`, metadata: { userId: user.userId, }, }); }); /// ... Additional steps }); LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: if not user: return async def _create_stripe_customer(): return await stripe.customers.create( email=user["email"], name=f"{user['first_name']} {user['last_name']}", metadata={"user_id": user["user_id"]}, ) customer = await context.run("create-stripe-customer", _create_stripe_customer) # ... Additional steps ---------------------------------------- TITLE: Fetching Workflow Message Logs API Endpoint DESCRIPTION: HTTP GET endpoint for retrieving detailed message logs from QStash workflows. The endpoint requires bearer authentication and supports various query parameters for filtering results. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/message-logs.mdx#2025-04-08_snippet_0 LANGUAGE: http CODE: GET https://qstash.upstash.io/v2/workflows/messageLogs ---------------------------------------- TITLE: User Suspension Check and Management in Python DESCRIPTION: Code snippet showing how to check if a user is suspended and unsuspend them if needed in Python. Uses nested functions and workflow steps for both the suspension check and the unsuspend action. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_7 LANGUAGE: python CODE: async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if is_suspended: async def _unsuspend_user() -> None: await unsuspend_user(email) await context.run("unsuspend user", _unsuspend_user) ---------------------------------------- TITLE: Starting Trial in Stripe using TypeScript and Python DESCRIPTION: This snippet demonstrates how to start a trial in Stripe for a user. It uses a context object to run the operation asynchronously. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: await context.run("start trial in Stripe", async () => { await startTrialInStripe(email); }); LANGUAGE: python CODE: async def _start_trial_in_stripe() -> None: await start_trial_in_stripe(email) await context.run("start trial in Stripe", _start_trial_in_stripe) ---------------------------------------- TITLE: User Suspension Check and Management in TypeScript DESCRIPTION: Code snippet showing how to check if a user is suspended and unsuspend them if needed in TypeScript. Uses workflow steps for both the suspension check and the unsuspend action. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: const isSuspended = await context.run("check suspension", async () => { return await checkSuspension(email); }); if (isSuspended) { await context.run("unsuspend user", async () => { await unsuspendUser(email); }); } ---------------------------------------- TITLE: Sending Invoice Email in TypeScript DESCRIPTION: Code snippet showing how to send an invoice email to the user after successful payment in TypeScript. Uses the payment result data to include invoice ID and total cost in the email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_8 LANGUAGE: typescript CODE: await context.run("send invoice email", async () => { await sendEmail( email, `Payment successfull. Incoice: ${result.invoiceId}, Total cost: $${result.totalCost}` ); }); ---------------------------------------- TITLE: Defining Custom Tool with Context Steps DESCRIPTION: Example of creating a custom tool that uses context.call within its invoke method. This demonstrates how to integrate Workflow-specific functionality into tools. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_9 LANGUAGE: typescript CODE: import { WorkflowTool } from '@upstash/workflow' import { serve } from '@upstash/workflow/nextjs' export const { POST } = serve(async (context) => { const tool = new WorkflowTool({ description: ..., schema: ..., invoke: ( ... ) => { // make HTTP call inside the tool with context.call: await context.call( ... ) }, executeAsStep: false }) // pass the tool to agent }) ---------------------------------------- TITLE: Notify Workflow Using Upstash SDK DESCRIPTION: Example of using the Upstash Workflow SDK to notify a workflow event. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#2025-04-08_snippet_2 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.notify({ eventId: "my-event-id", eventData: "Hello World!" }); ---------------------------------------- TITLE: Configuring Environment Variables for Upstash Workflow with Flask DESCRIPTION: Steps to create and populate a .env file with necessary configuration for Upstash Workflow, including QStash token and URL. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_1 LANGUAGE: bash CODE: touch .env LANGUAGE: bash CODE: export QSTASH_URL="http://127.0.0.1:8080" export QSTASH_TOKEN= LANGUAGE: bash CODE: export QSTASH_TOKEN="***" export UPSTASH_WORKFLOW_URL= ---------------------------------------- TITLE: Suspending User and Sending Notification in Python DESCRIPTION: This Python code performs the same tasks as the TypeScript version. It checks for suspension, suspends the user if necessary, and sends a notification email, all using asynchronous functions and a context object. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_11 LANGUAGE: python CODE: async def _check_suspension() -> bool: return await check_suspension(email) is_suspended = await context.run("check suspension", _check_suspension) if not is_suspended: async def _suspend_user() -> None: await suspend_user(email) await context.run("suspend user", _suspend_user) async def _send_suspended_email() -> None: await send_email( email, "Your account has been suspended due to payment failure. Please update your payment method.", ) await context.run("send suspended email", _send_suspended_email) ---------------------------------------- TITLE: Defining OpenAI Model in Workflow Route DESCRIPTION: TypeScript code snippet demonstrating how to define an OpenAI model within a Workflow route. This sets up the base model for the agent to use. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo') // ... }) ---------------------------------------- TITLE: External Workflow Notification Implementation DESCRIPTION: Example of how to implement external notification to resume a waiting workflow using the Upstash Client. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const orderId = "1324"; await client.notify({ eventId: `order-${orderId}`, eventData: { deliveryTime: "2 days" } }); ---------------------------------------- TITLE: Triggering Workflow using Python Requests DESCRIPTION: Shows how to trigger a workflow using the Python requests library with JSON payload and custom headers. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: import requests requests.post( "https:///", json={"foo": "bar"}, headers={"my-header": "foo"} ) ---------------------------------------- TITLE: Integrating Agentic Weather Tools DESCRIPTION: Snippet showing how to integrate Agentic weather tools into the Workflow. This demonstrates using pre-built tools from external libraries. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: import { createAISDKTools } from '@agentic/ai-sdk' import { WeatherClient } from '@agentic/weather' const weather = new WeatherClient() const tools = createAISDKTools(weather) ---------------------------------------- TITLE: Implementing Sleep in Workflow Steps (TypeScript) DESCRIPTION: Shows how to use the context.sleep method to pause workflow execution for a specified duration in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { const signedInUser = await signIn(userData); return signedInUser; }); // 👇 Wait for one day (in seconds) await context.sleep("wait-until-welcome-email", "1d"); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ---------------------------------------- TITLE: Setting up Virtual Environment and Installing Dependencies for Flask and Upstash Workflow DESCRIPTION: Instructions for creating a virtual environment and installing the necessary packages including Flask and the Upstash Workflow SDK. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_0 LANGUAGE: bash CODE: python -m venv venv source venv/bin/activate LANGUAGE: bash CODE: pip install fastapi uvicorn upstash-workflow ---------------------------------------- TITLE: Publishing Workflow Message with QStash in Python DESCRIPTION: Demonstrates publishing a workflow message using the QStash client in Python with async functionality. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_2 LANGUAGE: python CODE: from qstash import AsyncQStash client = AsyncQStash("") res = await client.message.publish_json( url="https:///", body={"hello": "there!"}, headers={...}, retries=3, ) message_id = res.message_id ---------------------------------------- TITLE: Sending Trial Warning Email using TypeScript and Python DESCRIPTION: This snippet demonstrates how to send a trial warning email 2 days before the trial ends. It checks if the user has upgraded their plan and sends an email if they haven't. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: await context.sleep("wait for trial warning", 5 * 24 * 60 * 60); const isUpgraded = await context.run("check upgraded plan", async () => { return await checkUpgradedPlan(email); }); if (isUpgraded) return; await context.run("send trial warning email", async () => { await sendEmail( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform." ); }); LANGUAGE: python CODE: await context.sleep("wait for trial warning", 5 * 24 * 60 * 60) async def _check_upgraded_plan() -> bool: return await check_upgraded_plan(email) is_upgraded = await context.run("check upgraded plan", _check_upgraded_plan) # end the workflow if upgraded if is_upgraded: return async def _send_trial_warning_email() -> None: await send_email( email, "Your trial is about to end in 2 days. Please upgrade your plan to keep using our platform.", ) await context.run("send trial warning email", _send_trial_warning_email) ---------------------------------------- TITLE: Assigning Task to Single Agent DESCRIPTION: Demonstration of assigning a task to a single agent and running it. This example shows how to use a researcher agent to provide information on advanced physics topics. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_11 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WikipediaQueryRun } from "@langchain/community/tools/wikipedia_query_run"; export const { POST } = serve(async (context) => { const model = context.agents.openai('gpt-3.5-turbo'); const researcherAgent = context.agents.agent({ model, name: 'academic', maxSteps: 2, tools: { wikiTool: new WikipediaQueryRun({ topKResults: 1, maxDocContentLength: 500, }) }, background: 'You are researcher agent with access to Wikipedia. ' + 'Utilize Wikipedia as much as possible for correct information', }); const task = context.agents.task({ agent: researcherAgent, prompt: "Tell me about 5 topics in advanced physics.", }); const { text } = await task.run(); console.log("result:", text) }) ---------------------------------------- TITLE: Getting Event Waiters DESCRIPTION: Demonstrates how to retrieve a list of workflow runs waiting for a specific event using the getWaiters method. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#2025-04-08_snippet_4 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const result = await client.getWaiters({ eventId: "my-event-id", }); ---------------------------------------- TITLE: Implementing Sleep in Workflow with TypeScript DESCRIPTION: This snippet demonstrates how to pause the workflow for a specific duration using context.sleep in TypeScript. It's used to wait for 3 days before proceeding with the next steps in the onboarding process. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ---------------------------------------- TITLE: Basic Text Generation with Upstash Workflow and Vercel AI SDK DESCRIPTION: Implementation of a workflow endpoint that uses the custom OpenAI client to generate text from a prompt. This demonstrates the basic pattern of using generateText with proper error handling for Workflow integration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { WorkflowAbort } from '@upstash/workflow'; import { generateText, ToolExecutionError } from 'ai'; import { createWorkflowOpenAI } from './model'; export const { POST } = serve<{ prompt: string }>(async (context) => { const openai = createWorkflowOpenAI(context); // Important: Must have a step before generateText const prompt = await context.run("get prompt", async () => { return context.requestPayload.prompt; }); try { const result = await generateText({ model: openai('gpt-3.5-turbo'), maxTokens: 2048, prompt, }); await context.run("text", () => { console.log(`TEXT: ${result.text}`); return result.text; }); } catch (error) { if (error instanceof ToolExecutionError && error.cause instanceof WorkflowAbort) { throw error.cause; } else { throw error; } } }); ---------------------------------------- TITLE: Sending Invoice Email in Python DESCRIPTION: Code snippet showing how to send an invoice email to the user after successful payment in Python. Uses the payment result data to include invoice ID and total cost in the email. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/paymentRetry.mdx#2025-04-08_snippet_9 LANGUAGE: python CODE: async def _send_invoice_email() -> None: await send_email( email, f"Payment successful. Invoice: {result.invoice_id}, Total cost: ${result.total_cost}", ) await context.run("send invoice email", _send_invoice_email) ---------------------------------------- TITLE: Canceling Workflow Runs DESCRIPTION: Examples of different methods to cancel workflow runs - by IDs, URL pattern, or canceling all runs. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/client.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: // cancel a single workflow await client.cancel({ ids: "" }); // cancel a set of workflow runs await client.cancel({ ids: ["", ""] }); // cancel by URL pattern await client.cancel({ urlStartingWith: "https://your-endpoint.com" }); // cancel all workflows await client.cancel({ all: true }); ---------------------------------------- TITLE: Using Non-Blocking HTTP Calls in Workflow Context DESCRIPTION: Demonstrates the usage of context.call for making non-blocking API requests to optimize serverless execution time and costs. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/allInOne.mdx#2025-04-08_snippet_2 LANGUAGE: javascript CODE: context.call ---------------------------------------- TITLE: Implementing SleepUntil in Workflow Steps (TypeScript) DESCRIPTION: Illustrates how to use the context.sleepUntil method to pause workflow execution until a specific timestamp in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_7 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs"; import { signIn, sendEmail } from "@/utils/onboarding-utils"; export const { POST } = serve(async (context) => { const userData = context.requestPayload; const user = await context.run("sign-in", async () => { return signIn(userData); }); // 👇 Calculate the date for one week from now const oneWeekFromNow = new Date(); oneWeekFromNow.setDate(oneWeekFromNow.getDate() + 7); // 👇 Wait until the calculated date await context.sleepUntil("wait-for-one-week", oneWeekFromNow); await context.run("send-welcome-email", async () => { return sendEmail(user.name, user.email); }); }); ---------------------------------------- TITLE: Example Output from Evaluator-Optimizer Pattern DESCRIPTION: This snippet shows the final output generated by the evaluator-optimizer pattern for a prompt about quantum mechanics. It demonstrates the quality of content that can be produced after multiple iterations of generation and evaluation. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/evaluator-optimizer.mdx#2025-04-08_snippet_1 LANGUAGE: text CODE: Quantum mechanics is a branch of physics that describes the behavior of particles at the smallest scales, such as atoms and subatomic particles. It introduces the concept of quantized energy levels, wave-particle duality, and probabilistic nature of particles. In quantum mechanics, particles can exist in multiple states simultaneously until measured, and their behavior is governed by mathematical equations known as wave functions. This theory has revolutionized our understanding of the fundamental building blocks of the universe and has led to the development of technologies like quantum computing and quantum cryptography. ---------------------------------------- TITLE: Avoiding Time-dependent Code Outside context.run - TypeScript DESCRIPTION: Example showing problematic time-dependent code that makes different decisions based on the current time, which will lead to inconsistent workflow execution when the endpoint is called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: time-dependent code if (Date.now() % 5 == 2) { await context.run("step-1", () => { // ... }) } else { await context.run("step-2", () => { // ... }) } }) ---------------------------------------- TITLE: Creating a Workflow Endpoint in Python (FastAPI) DESCRIPTION: Shows how to use the 'serve' decorator to create a workflow endpoint with multiple steps in a FastAPI application using Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _step1() -> str: # define a piece of business logic as step 1 return "step 1 result" result = await context.run("step-1", _step1) async def _step2() -> None: # define another piece of business logic as step 2 pass await context.run("step-2", _step2) ---------------------------------------- TITLE: Notify Workflow Using Python DESCRIPTION: Example of notifying a workflow event using Python requests library. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#2025-04-08_snippet_4 LANGUAGE: python CODE: import requests headers = { 'Authorization': 'Bearer ', } response = requests.post( 'https://qstash.upstash.io/v2/notify/myEvent', headers=headers ) ---------------------------------------- TITLE: Generated Physics Q&A Output DESCRIPTION: Sample output showing the structured Q&A content generated by the worker agents, covering quantum mechanics and relativity topics. Demonstrates the comprehensive nature of the multi-agent collaboration. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/orchestrator-workers.mdx#2025-04-08_snippet_1 LANGUAGE: text CODE: ### Quantum Mechanics **Q: What is quantum mechanics?** A: Quantum mechanics is a fundamental theory in physics that describes the behavior of nature at the atomic and subatomic levels. It serves as the foundation for all quantum physics, including quantum chemistry, quantum field theory, quantum technology, and quantum information science. **Q: What are some key principles of quantum mechanics?** A: 1. **Wave-Particle Duality**: Particles exhibit both wave-like and particle-like properties. 2. **Uncertainty Principle**: Certain pairs of physical properties, like position and momentum, cannot be simultaneously measured with arbitrary precision. 3. **Quantum Superposition**: A quantum system can exist in multiple states at once until it is measured. 4. **Quantum Entanglement**: Particles become interconnected such that the state of one influences the state of another, regardless of distance. 5. **Quantization**: Energy levels in quantum systems are discrete. 6. **Probability and Wave Functions**: Quantum systems are described by wave functions, which provide probabilities of finding a system in a particular state. 7. **Observer Effect**: Measurement affects the system being observed. ### Relativity **Q: What is the theory of relativity?** A: Developed by Albert Einstein, the theory of relativity encompasses two interrelated theories: special relativity and general relativity. **Q: What is special relativity?** A: Proposed by Einstein in 1905, special relativity addresses the relationship between space and time in the absence of gravity. It is based on two key postulates: the invariance of physical laws in all inertial frames and the constancy of the speed of light in a vacuum. **Q: What is general relativity?** A: Published by Einstein in 1915, general relativity is a geometric theory of gravitation. It describes gravity as a geometric property of space and time, or four-dimensional spacetime, and explains how massive objects cause a distortion in spacetime. These topics challenge classical intuitions and have led to significant advancements in our understanding of the universe and the development of new technologies. ---------------------------------------- TITLE: Avoiding Time-dependent Code Outside context.run - Python DESCRIPTION: Python example showing problematic time-dependent code that makes different decisions based on the current time, leading to inconsistent workflow execution when the endpoint is called multiple times. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_9 LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload # 👇 Problem: time-dependent code if time.time() % 5 == 2: await context.run("step-1", lambda: ...) else: await context.run("step-2", lambda: ...) ---------------------------------------- TITLE: Processing Order Data DESCRIPTION: Implementation of order processing step that logs the processed order data. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: await context.run("process-order", async () => { console.log(`Order ${orderId} processed:`, processedData); }); ---------------------------------------- TITLE: Declaring Result Type for context.call in TypeScript DESCRIPTION: Example showing how to declare the expected result type for the context.call method in TypeScript, allowing for type safety in the response handling. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_11 LANGUAGE: typescript CODE: type ResultType = { field1: string, field2: number }; const result = await context.call( ... ); ---------------------------------------- TITLE: Avoiding Random Code Outside context.run - TypeScript DESCRIPTION: Example showing problematic random-based conditional code that can lead to inconsistent workflow execution when the endpoint is called multiple times, as different branches may be followed in each call. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_7 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload // 👇 Problem: random code if (Math.floor(Math.random() * 10) % 5 == 2) { await context.run("step-1", () => { // ... }) } else { await context.run("step-2", () => { // ... }) } }) ---------------------------------------- TITLE: Defining Workflow Endpoint in Nuxt.js DESCRIPTION: TypeScript code for creating a workflow endpoint in a Nuxt.js project, defining steps and handling the workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/nuxt.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/h3" const { handler } = serve( async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }, ) export default handler; ---------------------------------------- TITLE: Avoiding Nested Context Methods - Python DESCRIPTION: Python example showing the incorrect pattern of nesting context methods (sleep, run, call) within another context.run call, which should be avoided for proper workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_16 LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: await context.sleep(...) # ❌ INCORRECT await context.run(...) # ❌ INCORRECT await context.call(...) # ❌ INCORRECT await context.run("step-1", _step_1) ---------------------------------------- TITLE: Using Anthropic Integration with context.api DESCRIPTION: Shows how to use the Anthropic integration via context.api namespace for typesafe interactions with Anthropic's Claude models. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_13 LANGUAGE: typescript CODE: const { status, body } = await context.api.anthropic.call( "Call Anthropic", { token: "", operation: "messages.create", body: { model: "claude-3-5-sonnet-20241022", max_tokens: 1024, messages: [ {"role": "user", "content": "Hello, world"} ] }, } ); ---------------------------------------- TITLE: Avoiding Nested Context Methods - TypeScript DESCRIPTION: Example showing the incorrect pattern of nesting context methods (sleep, run, call) within another context.run call, which should be avoided for proper workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_15 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve(async (context) => { const input = context.requestPayload await context.run("step-1", async () => { await context.sleep(...) // ❌ INCORRECT await context.run(...) // ❌ INCORRECT await context.call(...) // ❌ INCORRECT }) }) ---------------------------------------- TITLE: Ensuring Idempotency in context.run - Python DESCRIPTION: Python example showing how business logic should be placed inside context.run to ensure idempotent behavior, as workflow steps may be retried in distributed systems. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_14 LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> None: return await some_work(input) await context.run("step-1", _step_1) ---------------------------------------- TITLE: Executing Parallel Workflow Steps in Next.js (TypeScript) DESCRIPTION: Shows how to run workflow steps in parallel using Promise.all with the context.run method in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/context.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/nextjs" export const { POST } = serve( async (context) => { const input = context.requestPayload; const promise1 = context.run("step-1", async () => { return someWork(input); }); const promise2 = context.run("step-2", async () => { return someOtherWork(input); }); await Promise.all([promise1, promise2]); }, ); ---------------------------------------- TITLE: Pausing Workflows with context.waitForEvent in TypeScript DESCRIPTION: The waitForEvent method pauses workflow execution until an external event occurs. It takes an event description, event ID, and timeout parameters, returning the event data when resumed. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/events.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: const { eventData, timeout } = await context.waitForEvent( "description of event", "event-id", { timeout: timeoutInSeconds, } ); ---------------------------------------- TITLE: Upstash Workflow SDK Implementation DESCRIPTION: Examples using the official Upstash Workflow SDK to cancel workflows using different filtering methods including specific IDs, URL prefix, or all workflows. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#2025-04-08_snippet_2 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; // cancel a set of workflow runs await client.cancel({ ids: [ "", "", ]}) // cancel workflows starting with a url await client.cancel({ urlStartingWith: "https://your-endpoint.com" }) // cancel all workflows await client.cancel({ all: true }) ---------------------------------------- TITLE: Example Workflow Message Logs Response DESCRIPTION: This JSON snippet illustrates the structure of a successful response when fetching workflow message logs. It includes a cursor for pagination and an array of events containing detailed information about workflow execution steps and their states. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/message-logs.mdx#2025-04-08_snippet_5 LANGUAGE: json CODE: { "cursor": "", "events": [ { "time": 1738788333107, "state": "CREATED", "workflowRunId": "wfr_6MXE3GfddpBMWJM7s5WSRPqwcFm8", "workflowUrl": "http://my-workflow-server.com/workflowEndpoint", "workflowCreatedAt": 1738788333105, "stepInfo": { "stepName": "init", "stepType": "Initial", "callType": "step", "messageId": "msg_2KxeAKGVEjwDjNK1TVPormoRf7shRyNBpPThVbpvkuZNqri4cXp5nwSajNzAs6UWakvbco3qEPvtjQU3qxqjWarm2kisK", "concurrent": 1, "createdAt": 1738788333106 }, "nextDeliveryTime": 1738788333106 }, { "time": 1738788333107, "state": "RUN_STARTED", ---------------------------------------- TITLE: Sending Confirmation Email DESCRIPTION: Code showing how to send a confirmation email after order processing is complete. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/waitForEvent.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: await context.run("send-confirmation-email", async () => { await sendEmail( userEmail, "Your order has been processed!", processedData ); }); ---------------------------------------- TITLE: Setting Up Workflow with FastAPI in Python DESCRIPTION: This Python code snippet shows the beginning of a FastAPI implementation for Upstash Workflow. It demonstrates how to initialize the FastAPI app and set up the Workflow serve instance for handling workflow requests. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#2025-04-08_snippet_1 LANGUAGE: python CODE: from fastapi import FastAPI from typing import Dict, TypedDict from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext, CallResponse from email_utils import send_email app = FastAPI() serve = Serve(app) ---------------------------------------- TITLE: Implementing Sleep Functionality in Flask Workflow DESCRIPTION: Example of a Flask workflow that demonstrates the use of sleep functionality. It processes input, sleeps for specified durations, and continues processing. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_3 LANGUAGE: python CODE: from flask import Flask import time from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/sleep") def sleep(context: WorkflowContext[str]) -> None: input = context.request_payload def _step1() -> str: output = some_work(input) print("step 1 input", input, "output", output) return output result1: str = context.run("step1", _step1) context.sleep_until("sleep1", time.time() + 3) def _step2() -> str: output = some_work(result1) print("step 2 input", result1, "output", output) return output result2: str = context.run("step2", _step2) context.sleep("sleep2", 2) def _step3() -> None: output = some_work(result2) print("step 3 input", result2, "output", output) context.run("step3", _step3) ---------------------------------------- TITLE: Implementing Problem Solved Email Function in TypeScript and Python DESCRIPTION: This snippet shows the implementation of the sendProblemSolvedEmail function. It sends different emails based on whether the user has solved any problems or not. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/authWebhook.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: async function sendProblemSolvedEmail({ context: WorkflowContext email: string, stats: { totalProblemsSolved: number } }) { if (stats.totalProblemsSolved === 0) { await context.run("send no answers email", async () => { await sendEmail( email, "Hey, you haven't solved any questions in the last 7 days..." ); }); } else { await context.run("send stats email", async () => { await sendEmail( email, `You have solved ${stats.totalProblemsSolved} problems in the last 7 days. Keep it up!` ); }); } } LANGUAGE: python CODE: async def send_problem_solved_email( context: AsyncWorkflowContext[UserCreatedPayload], email: str, stats: UserStats ) -> None: if stats["total_problems_solved"] == 0: async def _send_no_answers_email() -> None: await send_email( email, "Hey, you haven't solved any questions in the last 7 days..." ) await context.run("send no answers email", _send_no_answers_email) else: async def _send_stats_email() -> None: await send_email( email, f"You have solved {stats['total_problems_solved']} problems in the last 7 days. Keep it up!", ) await context.run("send stats email", _send_stats_email) ---------------------------------------- TITLE: Triggering Workflow via Fetch API DESCRIPTION: Demonstrates triggering a workflow using the Fetch API in TypeScript/JavaScript with custom headers and JSON body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_4 LANGUAGE: javascript CODE: await fetch("https:///", { method: "POST", body: JSON.stringify({ "foo": "bar" }), headers: { "my-header": "foo" } }); ---------------------------------------- TITLE: Calling Workflow Endpoint Using Client DESCRIPTION: TypeScript code demonstrating how to call the workflow endpoint using the Upstash Workflow Client. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/getting-started.mdx#2025-04-08_snippet_6 LANGUAGE: ts CODE: import { Client } from "@upstash/workflow"; const client = new Client({ baseUrl: process.env.QSTASH_URL, token: process.env.QSTASH_TOKEN!, }) const workflowRunId = await client.trigger({ url: "http://127.0.0.1:3000/workflow", body: { prompt: "Explain the future of space exploration" }, }) console.log(workflowRunId); ---------------------------------------- TITLE: Fixed Request Payload Handling DESCRIPTION: Demonstrates the correct approach to handle request payload by wrapping it in a workflow step to ensure it remains accessible throughout the workflow execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/troubleshooting/general.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { // Payload will never be undefined const payload = await context.run("get payload", () => context.requestPayload) // ... steps or any other code context.call( ... ) }) LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: async def _get_payload() -> str: return context.request_payload # Payload will never be None payload = await context.run("get payload", _get_payload) # ... steps or any other code context.call( ... ) ---------------------------------------- TITLE: Implementing Sleep in Workflow with Python DESCRIPTION: This snippet shows how to pause the workflow for a specific duration using context.sleep in Python. It's used to wait for 3 days before proceeding with the next steps in the onboarding process. SOURCE: https://github.com/upstash/docs/blob/main/workflow/examples/customerOnboarding.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: await context.sleep("wait-for-3-days", 60 * 60 * 24 * 3) ---------------------------------------- TITLE: Creating Workflow Endpoint in Cloudflare Workers DESCRIPTION: TypeScript code example for defining a workflow endpoint using Upstash Workflow in a Cloudflare Workers project. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/cloudflare-workers.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/cloudflare" interface Env { ENVIRONMENT: "development" | "production" } export default serve<{ text: string }>( async (context) => { const initialPayload = context.requestPayload.text const result = await context.run("initial-step", async () => { console.log(`Step 1 running with payload: ${initialPayload}`) return { text: "initial step ran" } } ) await context.run("second-step", async () => { console.log(`Step 2 running with result from step 1: ${result.text}`) }) } ) ---------------------------------------- TITLE: Implementing Authentication in Flask Workflow DESCRIPTION: Example of a Flask workflow that demonstrates basic authentication. It checks for a specific authentication header before proceeding with the workflow steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: from flask import Flask from upstash_workflow.flask import Serve from upstash_workflow import WorkflowContext app = Flask(__name__) serve = Serve(app) def some_work(input: str) -> str: return f"processed '{input}'" @serve.route("/auth") def auth(context: WorkflowContext[str]) -> None: if context.headers.get("Authentication") != "Bearer secret_password": print("Authentication failed.") return def _step1() -> str: return "output 1" context.run("step1", _step1) def _step2() -> str: return "output 2" context.run("step2", _step2) ---------------------------------------- TITLE: Updating context.call usage in JavaScript DESCRIPTION: Demonstrates the changes in how context.call is used and what it returns in the new SDK. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#2025-04-08_snippet_5 LANGUAGE: javascript CODE: // old const result = await context.call("call step", "", "POST", ...) // new const { status, // response status headers, // response headers body // response body } = await context.call("call step", { url: "", method: "POST", ... }) ---------------------------------------- TITLE: Cancelling Workflow Run Using Upstash Workflow SDK DESCRIPTION: This example shows how to cancel a workflow run using the official Upstash Workflow JavaScript SDK. It requires importing the Client from the @upstash/workflow package and providing a valid QStash token. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/cancel.mdx#2025-04-08_snippet_1 LANGUAGE: js CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }) await client.cancel({ workflowRunId: "" }) ---------------------------------------- TITLE: Creating a Minimal Workflow Endpoint with Hono DESCRIPTION: Basic implementation of a workflow endpoint using Hono, defining two sequential steps that will be executed and retried automatically. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/hono.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { Hono } from "hono" import { serve } from "@upstash/workflow/hono" const app = new Hono() app.post("/workflow", serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ) export default app ---------------------------------------- TITLE: Cancelling Workflow Run Using Go HTTP Client DESCRIPTION: This example demonstrates how to cancel a workflow run using Go's standard HTTP client. It creates a DELETE request with the appropriate authorization header and executes it. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/cancel.mdx#2025-04-08_snippet_4 LANGUAGE: go CODE: req, err := http.NewRequest("DELETE", "https://qstash.upstash.io/v2/workflows/runs/wfr_TzazoUCuZmFGp2u9cdy5K", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ---------------------------------------- TITLE: Updating imports for serve method in TypeScript DESCRIPTION: Demonstrates how to update the import statement for the serve method from @upstash/qstash to @upstash/workflow in TypeScript. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: // old import { serve } from "@upstash/qstash/nextjs" // new import { serve } from "@upstash/workflow/nextjs" ---------------------------------------- TITLE: Defining Notify Response Type in TypeScript DESCRIPTION: TypeScript type definition for the API response when notifying workflows of an event. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: type NotifyResponse = { waiter: Waiter, messageId: string }[] ---------------------------------------- TITLE: Running SvelteKit Development Server DESCRIPTION: Command to start the SvelteKit development server for testing the workflow endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/svelte.mdx#2025-04-08_snippet_6 LANGUAGE: bash CODE: npm run dev ---------------------------------------- TITLE: Webhook Request Validation Implementation DESCRIPTION: Shows how to validate incoming webhook requests using Svix for Clerk webhooks. Includes validation function implementation and error handling in both TypeScript and Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/use-webhooks.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const payloadString = context.requestPayload; const headerPayload = context.headers; let event: WebhookEvent; try { event = await validateRequest(payloadString, headerPayload); } catch { return } // Next steps based on the event }) LANGUAGE: typescript CODE: import { Webhook } from "svix"; import { WebhookEvent } from "@clerk/nextjs/server"; const webhookSecret = "YOUR_WEBHOOK_SECRET"; async function validateRequest(payloadString: string, headerPayload: Headers) { const svixHeaders = { "svix-id": headerPayload.get("svix-id") as string, "svix-timestamp": headerPayload.get("svix-timestamp") as string, "svix-signature": headerPayload.get("svix-signature") as string, }; const wh = new Webhook(webhookSecret); return wh.verify(payloadString, svixHeaders) as WebhookEvent; } LANGUAGE: python CODE: async def validate_request(payload_string: str, header_payload: dict): # Validate the request pass @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: payload_string = context.request_payload header_payload = context.headers try: event = await validate_request(payload_string, header_payload) except: return # Next steps based on the event ---------------------------------------- TITLE: Querying Workflow Logs with Upstash SDK DESCRIPTION: Uses the official Upstash Workflow JS SDK to fetch logs with different filters including workflow run ID, workflow URL and execution state SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#2025-04-08_snippet_6 LANGUAGE: javascript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); // Filter by workflow run ID const { runs } = await client.logs({ workflowRunId: ""}); // Filter by workflow server url const { runs } = await client.logs({ workflowUrl: ""}); // Filter by state const { runs } = await client.logs({ state: "RUN_SUCCESS"}); ---------------------------------------- TITLE: Creating a Workflow Endpoint in SolidJS DESCRIPTION: Code example showing how to define a workflow endpoint in SolidJS with sequential steps that are automatically retried on failure. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/solidjs.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/solidjs" export const { POST } = serve(async (context) => { await context.run("initial-step", () => { console.log("initial step ran") }) await context.run("second-step", () => { console.log("second step ran") }) }) ---------------------------------------- TITLE: Go HTTP Client Implementation DESCRIPTION: Example showing how to cancel workflows using Go's HTTP client with specific workflow run IDs. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/bulk-cancel.mdx#2025-04-08_snippet_5 LANGUAGE: go CODE: var data = strings.NewReader(`{ "workflowRunIds": [ "run_id_1", "run_id_2", "run_id_3" ] }`) req, err := http.NewRequest("DELETE", "https://qstash.upstash.io/v2/workflows/runs", data) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ---------------------------------------- TITLE: Custom Authorization in Python DESCRIPTION: Implementation of custom bearer token authorization in Python FastAPI workflow endpoints. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/security.mdx#2025-04-08_snippet_5 LANGUAGE: python CODE: from fastapi import FastAPI from upstash_workflow.fastapi import Serve from upstash_workflow import AsyncWorkflowContext app = FastAPI() serve = Serve(app) @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: auth_header = context.headers.get("authorization") bearer_token = auth_header.split(" ")[1] if auth_header else None if not is_valid(bearer_token): print("Authentication failed.") return # Your workflow steps... ---------------------------------------- TITLE: Triggering Workflow with REST API DESCRIPTION: This snippet shows how to trigger a workflow using a simple REST API call with curl. It demonstrates sending a POST request to the workflow endpoint with a JSON body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#2025-04-08_snippet_5 LANGUAGE: bash CODE: curl -X POST https:/// -b '{"hello": "there!"}' ---------------------------------------- TITLE: Configuring Custom QStash Client in TypeScript and Python DESCRIPTION: Demonstrates how to use a custom QStash client for a workflow in both TypeScript and Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_10 LANGUAGE: typescript CODE: import { Client } from "@upstash/qstash"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve( async (context) => { ... }, { qstashClient: new Client({ token: process.env.QSTASH_TOKEN }) } ); LANGUAGE: python CODE: from qstash import AsyncQStash @serve.post("/api/example", qstash_client=AsyncQStash(os.environ["QSTASH_TOKEN"])) async def example(context: AsyncWorkflowContext[str]) -> None: ... ---------------------------------------- TITLE: Displaying Workflow Incompatibility Error in Bash DESCRIPTION: This snippet shows the error message that occurs when an in-progress workflow attempts to continue from a step that no longer exists due to code modifications. It demonstrates the HTTP status 400 error for incompatible step names. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/changes.mdx#2025-04-08_snippet_0 LANGUAGE: bash CODE: HTTP status 400. Incompatible step name. Expected , got ---------------------------------------- TITLE: Creating a Basic Workflow Endpoint in Flask DESCRIPTION: Example of defining a simple workflow endpoint in Flask using the Upstash Workflow SDK. This workflow contains two steps that print messages. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/flask.mdx#2025-04-08_snippet_2 LANGUAGE: python CODE: from flask import Flask from upstash_workflow.flask import Serve app = Flask(__name__) serve = Serve(app) @serve.route("/api/workflow") def workflow(context) -> None: def _step1() -> None: print("initial step ran") context.run("initial-step", _step1) def _step2() -> None: print("second step ran") context.run("second-step", _step2) ---------------------------------------- TITLE: Triggering Workflow Using Client in TypeScript DESCRIPTION: Demonstrates how to trigger a workflow using the recommended @upstash/workflow Client method, which returns a workflow run ID and reduces QStash publishes. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_0 LANGUAGE: typescript CODE: import { Client } from "@upstash/workflow"; const client = new Client({ token: "" }); const { workflowRunId } = await client.trigger({ url: "https:///", body: "hello there!", // Optional body headers: { ... }, // Optional headers workflowRunId: "my-workflow", // Optional workflow run ID retries: 3 // Optional retries for the initial request }); ---------------------------------------- TITLE: Triggering Workflow via CURL DESCRIPTION: Shows how to trigger a workflow using a direct CURL HTTP request with custom headers and body. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_3 LANGUAGE: bash CODE: curl -X POST https:/// \ -H "my-header: foo" \ -b '{"foo": "bar"}' ---------------------------------------- TITLE: Correct Variable Handling in Workflow Steps - Python DESCRIPTION: Python example showing the correct approach to pass data between workflow steps by returning values from context.run functions and using them in subsequent steps. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_4 LANGUAGE: python CODE: @serve.post("/api/example") async def example(context: AsyncWorkflowContext[str]) -> None: input = context.request_payload async def _step_1() -> Dict: return await some_work(input) result = await context.run("step-1", _step_1) async def _step_2() -> None: await some_other_work(result) await context.run("step-2", _step_2) ---------------------------------------- TITLE: Using Custom OpenAI-Compatible Provider DESCRIPTION: Example of using an OpenAI-compatible provider by specifying a custom baseURL and API key. This allows for flexibility in choosing LLM providers. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/features.mdx#2025-04-08_snippet_3 LANGUAGE: typescript CODE: const model = context.agents.openai('gpt-3.5-turbo', { baseURL: "https://api.deepseek.com", apiKey: process.env.DEEPSEEK_API_KEY }) ---------------------------------------- TITLE: Fetching Workflow Message Logs using Node.js DESCRIPTION: This JavaScript code snippet shows how to fetch workflow message logs using the Fetch API in Node.js. It sends a GET request to the Upstash QStash API endpoint, including the necessary authorization header. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/message-logs.mdx#2025-04-08_snippet_2 LANGUAGE: javascript CODE: const response = await fetch( "https://qstash.upstash.io/v2/workflows/messageLogs", { headers: { Authorization: "Bearer ", }, } ); ---------------------------------------- TITLE: Incorrect Variable Handling in Workflow Steps DESCRIPTION: Example showing incorrect variable handling between workflow steps where a variable is modified inside a step but not returned, causing it to be uninitialized when the endpoint is called again. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/caveats.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: export const { POST } = serve(async (context) => { const input = context.requestPayload let result await context.run("step-1", async () => { result = await someWork(input) }) await context.run("step-2", async () => { await someOtherWork(result) }) }) ---------------------------------------- TITLE: Tool Implementation Pattern for Vercel AI SDK with Workflow DESCRIPTION: Example pattern for implementing tool execution functions that are wrapped in Workflow context.run calls for proper durability and tracing. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/aisdk.mdx#2025-04-08_snippet_6 LANGUAGE: typescript CODE: execute: ({ location }) => context.run("weather tool", () => { // Mock data, replace with actual weather API call return { location, temperature: 72 + Math.floor(Math.random() * 21) - 10, }; }) ---------------------------------------- TITLE: Publishing Workflow Message with QStash in TypeScript DESCRIPTION: Shows how to publish a workflow message using the QStash client in TypeScript, including JSON body and custom headers. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/start.mdx#2025-04-08_snippet_1 LANGUAGE: typescript CODE: import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const { messageId } = await client.publishJSON({ url: "https:///", body: { hello: "there!" }, headers: { ... }, retries: 3 }); ---------------------------------------- TITLE: Example Output from Chained Physics Agents DESCRIPTION: Sample output generated by the three chained agents, showing the summarized information about three famous physicists - Albert Einstein, Isaac Newton, and Marie Curie - including their major contributions to physics. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/prompt-chaining.mdx#2025-04-08_snippet_1 LANGUAGE: plaintext CODE: Albert Einstein was a German physicist known for his theory of relativity and the famous equation E=mc^2. He made significant contributions to quantum mechanics and was awarded the Nobel Prize in Physics in 1921. Isaac Newton, an English polymath, was a key figure in the Scientific Revolution and the Enlightenment. He is famous for his laws of motion and universal gravitation, as outlined in his book "Philosophiæ Naturalis Principia Mathematica." Marie Curie, a Polish-French physicist and chemist, conducted pioneering research on radioactivity and was the first woman to win a Nobel Prize. She is the only person to win Nobel Prizes in two scientific fields and her work has had a lasting impact on physics and chemistry. ---------------------------------------- TITLE: Invoking Workflows via HTTP in Bash DESCRIPTION: This snippet shows how to invoke a workflow exposed by serveMany using a curl command to send a POST request to the specified endpoint. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#2025-04-08_snippet_3 LANGUAGE: bash CODE: curl -X POST https://your-app/serve-many/workflow-one-route ---------------------------------------- TITLE: Defining Workflow Endpoint in Next.js DESCRIPTION: TypeScript code for defining a workflow endpoint in Next.js, including an agent with a communication tool and task execution. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/getting-started.mdx#2025-04-08_snippet_4 LANGUAGE: ts CODE: import { z } from "zod"; import { tool } from "ai"; import { serve } from "@upstash/workflow/nextjs"; export const { POST } = serve<{ prompt: string }>(async (context) => { const prompt = context.requestPayload.prompt const model = context.agents.openai('gpt-3.5-turbo') const communicatorAgent = context.agents.agent({ model, name: 'communicatorAgent', maxSteps: 2, tools: { communicationTool: tool({ description: 'A tool for informing the caller about your inner thoughts', parameters: z.object({ message: z.string() }), execute: async ({ message }) => { console.log("Inner thought:", message) return "success" } }) }, background: 'Answer questions directed towards you.' + ' You have access to a tool to share your inner thoughts' + ' with the caller. Utilize this tool at least once before' + ' answering the prompt. In your inner thougts, briefly' + ' explain what you will talk about and why. Keep your' + ' answers brief.', }) const task = context.agents.task({ agent: communicatorAgent, prompt }) const { text } = await task.run() console.log("Final response:", text); }) ---------------------------------------- TITLE: Implementing Workflow Endpoint in Astro DESCRIPTION: TypeScript code implementing a minimal workflow endpoint in Astro using the Upstash Workflow SDK SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/astro.mdx#2025-04-08_snippet_5 LANGUAGE: typescript CODE: import { serve } from "@upstash/workflow/astro"; export const { POST } = serve(async (context) => { const result1 = await context.run("initial-step", () => { console.log("initial step ran") return "hello world!" }) await context.run("second-step", () => { console.log(`second step ran with value ${result1}`) }) }, { env: { ...process.env, ...import.meta.env } }) ---------------------------------------- TITLE: Triggering Workflow with Python SDK DESCRIPTION: This snippet demonstrates how to trigger a workflow using the Upstash Workflow Python SDK. It shows the setup of an asynchronous client and publishing a JSON message to trigger the workflow with optional parameters. SOURCE: https://github.com/upstash/docs/blob/main/workflow/getstarted.mdx#2025-04-08_snippet_4 LANGUAGE: python CODE: from qstash import AsyncQStash client = AsyncQStash("") res = await client.message.publish_json( url="https:///", body={"hello": "there!"}, headers={...}, retries=3, ) ---------------------------------------- TITLE: Configuring Workflow Options in TypeScript DESCRIPTION: This snippet demonstrates how to pass configuration options to both createWorkflow and serveMany functions, showing how options can be customized for individual workflows and the overall service. SOURCE: https://github.com/upstash/docs/blob/main/workflow/howto/invoke.mdx#2025-04-08_snippet_4 LANGUAGE: typescript CODE: const workflowOne = createWorkflow( async (context) => { // ... }, { retries: 0 } ) export const { POST } = serveMany( { workflowOne }, { failureUrl: "https://some-url" } ) ---------------------------------------- TITLE: Configuring Initial Payload Parser in TypeScript and Python DESCRIPTION: Shows how to set up a custom initial payload parser for a workflow in both TypeScript and Python. SOURCE: https://github.com/upstash/docs/blob/main/workflow/basics/serve.mdx#2025-04-08_snippet_7 LANGUAGE: typescript CODE: type InitialPayload = { foo: string; bar: number; }; export const { POST } = serve( async (context) => { const payload: InitialPayload = context.requestPayload; }, { initialPayloadParser: (initialPayload) => { const payload: InitialPayload = parsePayload(initialPayload); return payload; }, } ); LANGUAGE: python CODE: @dataclass class InitialPayload: foo: str bar: int def initial_payload_parser(initial_payload: str) -> InitialPayload: return parse_payload(initial_payload) @serve.post("/api/example", initial_payload_parser=initial_payload_parser) async def example(context: AsyncWorkflowContext[InitialPayload]) -> None: payload: InitialPayload = context.request_payload ---------------------------------------- TITLE: Triggering Workflow Endpoint via cURL DESCRIPTION: Command to trigger the workflow endpoint using cURL. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/fastapi.mdx#2025-04-08_snippet_10 LANGUAGE: bash CODE: curl -X POST https://localhost:8000/api/workflow ---------------------------------------- TITLE: Notify Workflow Using Node.js Fetch DESCRIPTION: Example of notifying a workflow event using Node.js fetch API with authentication. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/notify.mdx#2025-04-08_snippet_3 LANGUAGE: javascript CODE: const response = await fetch('https://qstash.upstash.io/v2/notify/myEvent', { method: 'POST', body: "Hello world!", headers: { 'Authorization': 'Bearer ' } }); ---------------------------------------- TITLE: Customizing Types for Resend API Call in Upstash Workflow (TypeScript) DESCRIPTION: This snippet illustrates how to customize the types for the Resend API call in an Upstash Workflow. It allows for overriding the default types for the request body, response body, and specifying whether it's a batch operation, providing more flexibility in API usage. SOURCE: https://github.com/upstash/docs/blob/main/workflow/integrations/resend.mdx#2025-04-08_snippet_2 LANGUAGE: typescript CODE: type IsBatch = true; // Set to either true or false type ResponseBodyType = { ... }; // Define your response body type type RequestBodyType = { ... }; // Define your request body type const { status, body } = await context.api.resend.call< IsBatch, ResponseBodyType, RequestBodyType >( "Call Resend", { ... } ); ---------------------------------------- TITLE: Fetching Workflow Message Logs using Python DESCRIPTION: This Python code snippet demonstrates how to retrieve workflow message logs using the requests library. It sends a GET request to the Upstash QStash API endpoint with the required authorization header. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/message-logs.mdx#2025-04-08_snippet_3 LANGUAGE: python CODE: import requests headers = { 'Authorization': 'Bearer ', } response = requests.get( 'https://qstash.upstash.io/v2/workflows/messageLogs', headers=headers ) ---------------------------------------- TITLE: Retrieving Flow-Control Information with curl in Shell DESCRIPTION: A curl command that demonstrates how to make a GET request to retrieve flow-control information from the QStash API. The request requires Bearer token authentication and specifies the flow-control key in the URL path. SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/flow-control/get.mdx#2025-04-08_snippet_0 LANGUAGE: shell CODE: curl -X GET https://qstash.upstash.io/v2/flowControl/YOUR_FLOW_CONTROL_KEY -H "Authorization: Bearer " ---------------------------------------- TITLE: Agent Response Output DESCRIPTION: The combined response from the parallel execution of agents, summarizing explanations of quantum physics, relativity, and string theory. SOURCE: https://github.com/upstash/docs/blob/main/workflow/agents/patterns/parallelization.mdx#2025-04-08_snippet_1 LANGUAGE: plaintext CODE: Quantum physics explores the behavior of very small particles, such as atoms and subatomic particles, in the strange world of quantum mechanics, where particles can exist in multiple states simultaneously. Key principles include superposition and entanglement, leading to technological advancements like quantum computers. Relativity, developed by Albert Einstein, consists of special relativity and general relativity, explaining the behavior of objects moving at high speeds and the warping of spacetime by massive objects. String theory proposes that the universe's fundamental building blocks are tiny vibrating strings, aiming to unify the four fundamental forces of nature and suggest extra dimensions beyond the familiar ones. ---------------------------------------- TITLE: Verifying Production Workflow Endpoint DESCRIPTION: cURL command to verify that the workflow endpoint is accessible after deployment to production. SOURCE: https://github.com/upstash/docs/blob/main/workflow/quickstarts/express.mdx#2025-04-08_snippet_8 LANGUAGE: bash CODE: curl -X POST /workflow \ -H "Content-Type: application/json" \ -d '{"message": "Hello from the workflow!"}' ---------------------------------------- TITLE: Step Execution Types Table DESCRIPTION: Lists the different types of step executions and their corresponding context functions SOURCE: https://github.com/upstash/docs/blob/main/workflow/rest/runs/logs.mdx#2025-04-08_snippet_3 LANGUAGE: markdown CODE: | Value | Function | | ------------ | -------------------------------------------- | | `Initial` | The default step which created automatically | | `Run` | context.run() | | `Call` | context.call() | | `SleepFor` | context.sleepFor() | | `SleepUntil` | context.sleepUntil() | | `Wait` | context.waitForEvent() | | `Notify` | context.notify() | | `Invoke` | context.invoke() | ---------------------------------------- TITLE: Updating serve method usage in Next.js DESCRIPTION: Shows the changes required in how the serve method is used in Next.js applications. SOURCE: https://github.com/upstash/docs/blob/main/workflow/migration.mdx#2025-04-08_snippet_4 LANGUAGE: javascript CODE: // old export const POST = serve(...); // new export const { POST } = serve(...);