TITLE: Verify Upstash QStash Webhook Signature in AWS Lambda (TypeScript) DESCRIPTION: This TypeScript code demonstrates how to verify incoming Upstash QStash webhook signatures within an AWS Lambda function. It utilizes the `@upstash/qstash` SDK's `Receiver` class, checking for the `upstash-signature` header, constructing the lambda function's URL, and using `receiver.verify` to validate the request body and signature. The function returns appropriate HTTP status codes (401 for missing/invalid signature, 200 for success) and handles potential errors during verification. SOURCE: https://upstash.com/docs/qstash/quickstarts/aws-lambda/nodejs.mdx LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash" import type { APIGatewayProxyEvent, APIGatewayProxyResult } from "aws-lambda" const receiver = new Receiver({ currentSigningKey: process.env.QSTASH_CURRENT_SIGNING_KEY ?? "", nextSigningKey: process.env.QSTASH_NEXT_SIGNING_KEY ?? "", }) export const handler = async ( event: APIGatewayProxyEvent ): Promise => { const signature = event.headers["upstash-signature"] const lambdaFunctionUrl = `https://${event.requestContext.domainName}` if (!signature) { return { statusCode: 401, body: JSON.stringify({ message: "Missing signature" }), } } try { await receiver.verify({ signature: signature, body: event.body ?? "", url: lambdaFunctionUrl, }) } catch (err) { return { statusCode: 401, body: JSON.stringify({ message: "Invalid signature" }), } } // Request is valid, perform business logic return { statusCode: 200, body: JSON.stringify({ message: "Request processed successfully" }), } } ``` ---------------------------------------- TITLE: Verify QStash Request Signature using SDK DESCRIPTION: This snippet demonstrates how to verify incoming QStash request signatures using the official QStash SDKs. It shows the initialization of the `Receiver` with `currentSigningKey` and `nextSigningKey`, and then how to use the `verify` method. The method requires the raw request body, the signature from the `Upstash-Signature` header, and optionally the request URL. It's crucial to use the raw body string to avoid verification failures. SOURCE: https://upstash.com/docs/qstash/howto/signature.mdx LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash"; const receiver = new Receiver({ currentSigningKey: "YOUR_CURRENT_SIGNING_KEY", nextSigningKey: "YOUR_NEXT_SIGNING_KEY", }); // ... in your request handler const signature = req.headers["Upstash-Signature"]; const body = req.body; const isValid = receiver.verify({ body, signature, url: "YOUR-SITE-URL", }); ``` LANGUAGE: python CODE: ``` from qstash import Receiver receiver = Receiver( current_signing_key="YOUR_CURRENT_SIGNING_KEY", next_signing_key="YOUR_NEXT_SIGNING_KEY", ) # ... in your request handler signature, body = req.headers["Upstash-Signature"], req.body receiver.verify( body=body, signature=signature, url="YOUR-SITE-URL", ) ``` LANGUAGE: go CODE: ``` import "github.com/qstash/qstash-go" import "io" receiver := qstash.NewReceiver("", "NEXT_SIGNING_KEY") // ... in your request handler signature := req.Header.Get("Upstash-Signature") body, err := io.ReadAll(req.Body) // handle err err = receiver.Verify(qstash.VerifyOptions{ Signature: signature, Body: string(body), Url: "YOUR-SITE-URL", // optional }) // handle err ``` ---------------------------------------- TITLE: Publish a Message to QStash using cURL DESCRIPTION: This code snippet demonstrates how to publish a JSON message to QStash using cURL. It requires an Authorization Bearer token and sets the Content-Type header to application/json. The message body is a simple JSON object, and the target URL is the QStash publish endpoint followed by the recipient API URL. Two examples are provided: one with a generic placeholder URL and another with a specific requestcatcher.com URL for testing. SOURCE: https://upstash.com/docs/qstash/overall/getstarted.mdx LANGUAGE: bash CODE: ``` curl -XPOST \ -H 'Authorization: Bearer ' \ -H "Content-type: application/json" \ -d '{ "hello": "world" }' \ 'https://qstash.upstash.io/v2/publish/https://' ``` LANGUAGE: bash CODE: ``` curl -XPOST \ -H 'Authorization: Bearer ' \ -H "Content-type: application/json" \ -d '{ "hello": "world" }' \ 'https://qstash.upstash.io/v2/publish/https://firstqstashmessage.requestcatcher.com/test' ``` ---------------------------------------- TITLE: Publish a message to a specific endpoint DESCRIPTION: This example demonstrates how to publish a simple message to a designated HTTP endpoint. It shows the basic structure for sending a JSON payload to a URL via QStash. SOURCE: https://upstash.com/docs/qstash/overall/apiexamples.mdx LANGUAGE: shell CODE: ``` curl -XPOST \ -H 'Authorization: Bearer XXX' \ -H "Content-type: application/json" \ -d '{ "hello": "world" }' \ 'https://qstash.upstash.io/v2/publish/https://example.com' ``` LANGUAGE: typescript CODE: ``` const client = new Client({ token: "" }); await client.publishJSON({ url: "https://example.com", body: { hello: "world", }, }); ``` LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json( url="https://example.com", body={ "hello": "world", }, ) # Async version is also available ``` ---------------------------------------- TITLE: Send QStash Publish Message Request Examples DESCRIPTION: Demonstrates how to send a POST request to the QStash publish endpoint using various programming languages, including setting authorization, content type, and custom Upstash headers for message control and forwarding. SOURCE: https://upstash.com/docs/qstash/api/publish.mdx LANGUAGE: curl CODE: ``` curl -X POST "https://qstash.upstash.io/v2/publish/https://www.example.com" \ -H "Authorization: Bearer " \ -H "Content-Type: application/json" \ -H "Upstash-Method: POST" \ -H "Upstash-Delay: 10s" \ -H "Upstash-Retries: 3" \ -H "Upstash-Forward-Custom-Header: custom-value" \ -d '{"message":"Hello, World!"}' ``` LANGUAGE: Node CODE: ``` const response = await fetch( "https://qstash.upstash.io/v2/publish/https://www.example.com", { method: "POST", headers: { Authorization: "Bearer ", "Content-Type": "application/json", "Upstash-Method": "POST", "Upstash-Delay": "10s", "Upstash-Retries": "3", "Upstash-Forward-Custom-Header": "custom-value" }, body: JSON.stringify({ message: "Hello, World!" }) } ); ``` LANGUAGE: Python CODE: ``` import requests headers = { 'Authorization': 'Bearer ', 'Content-Type': 'application/json', 'Upstash-Method': 'POST', 'Upstash-Delay': '10s', 'Upstash-Retries': '3', 'Upstash-Forward-Custom-Header': 'custom-value' } json_data = { 'message': 'Hello, World!', } response = requests.post( 'https://qstash.upstash.io/v2/publish/https://www.example.com', headers=headers, json=json_data ) ``` LANGUAGE: Go CODE: ``` var data = strings.NewReader(`{"message":"Hello, World!"}`) req, err := http.NewRequest("POST", "https://qstash.upstash.io/v2/publish/https://www.example.com", data) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") req.Header.Set("Content-Type", "application/json") req.Header.Set("Upstash-Method", "POST") req.Header.Set("Upstash-Delay", "10s") req.Header.Set("Upstash-Retries", "3") req.Header.Set("Upstash-Forward-Custom-Header", "custom-value") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` ---------------------------------------- TITLE: Batch LLM Chat Completion Requests with QStash and Anthropic DESCRIPTION: Illustrates how to send multiple LLM chat completion requests to Anthropic's API in a single operation using QStash's `batchJSON` method. Each request within the batch specifies the Anthropic provider and includes its own callback URL for individual response handling. SOURCE: https://upstash.com/docs/qstash/integrations/anthropic.mdx LANGUAGE: typescript CODE: ``` import { anthropic, Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const result = await client.batchJSON([ { api: { name: "llm", provider: anthropic({ token: "" }) }, body: { model: "claude-3-5-sonnet-20241022", messages: [ { role: "user", content: "Describe the latest in AI research.", }, ], }, callback: "https://example.com/callback1", }, { api: { name: "llm", provider: anthropic({ token: "" }) }, body: { model: "claude-3-5-sonnet-20241022", messages: [ { role: "user", content: "Outline the future of remote work.", }, ], }, callback: "https://example.com/callback2", } ]); console.log(result); ``` ---------------------------------------- TITLE: Deno Deploy QStash Webhook Receiver in TypeScript DESCRIPTION: This TypeScript code defines an asynchronous HTTP server for Deno Deploy that acts as a QStash webhook receiver. It initializes a `Receiver` with signing keys from environment variables and verifies the incoming `Upstash-Signature` header against the request body. If the signature is invalid, it returns a 401 status; otherwise, it logs success and returns 200 OK. SOURCE: https://upstash.com/docs/qstash/quickstarts/deno-deploy.mdx LANGUAGE: ts CODE: ``` import { serve } from "https://deno.land/std@0.142.0/http/server.ts"; import { Receiver } from "https://deno.land/x/upstash_qstash@v0.1.4/mod.ts"; serve(async (req: Request) => { const r = new Receiver({ currentSigningKey: Deno.env.get("QSTASH_CURRENT_SIGNING_KEY")!, nextSigningKey: Deno.env.get("QSTASH_NEXT_SIGNING_KEY")!, }); const isValid = await r .verify({ signature: req.headers.get("Upstash-Signature")!, body: await req.text(), }) .catch((err: Error) => { console.error(err); return false; }); if (!isValid) { return new Response("Invalid signature", { status: 401 }); } console.log("The signature was valid"); // do work return new Response("OK", { status: 200 }); }); ``` ---------------------------------------- TITLE: Python Function to Verify QStash Webhook Signature (JWT) DESCRIPTION: This `verify` function handles the actual validation of the QStash webhook signature, which is a JWT. It decodes the JWT, verifies its structure, checks the signature using HMAC-SHA256, validates claims like issuer, subject (URL), expiration, and 'not before' time, and finally compares the body hash if a body is present. It raises exceptions for any verification failures. SOURCE: https://upstash.com/docs/qstash/quickstarts/aws-lambda/python.mdx LANGUAGE: python CODE: ``` # @param jwt_token - The content of the `upstash-signature` header # @param signing_key - The signing key to use to verify the signature (Get it from Upstash Console) # @param body - The raw body of the request # @param url - The public URL of the lambda function def verify(jwt_token, signing_key, body, url): split = jwt_token.split(".") if len(split) != 3: raise Exception("Invalid JWT.") header, payload, signature = split message = header + '.' + payload generated_signature = base64.urlsafe_b64encode(hmac.new(bytes(signing_key, 'utf-8'), bytes(message, 'utf-8'), digestmod=hashlib.sha256).digest()).decode() if generated_signature != signature and signature + "=" != generated_signature : raise Exception("Invalid JWT signature.") decoded = jwt.decode(jwt_token, options={"verify_signature": False}) sub = decoded['sub'] iss = decoded['iss'] exp = decoded['exp'] nbf = decoded['nbf'] decoded_body = decoded['body'] if iss != "Upstash": raise Exception("Invalid issuer: {}".format(iss)) if sub.rstrip("/") != url.rstrip("/"): raise Exception("Invalid subject: {}".format(sub)) now = time.time() if now > exp: raise Exception("Token has expired.") if now < nbf: raise Exception("Token is not yet valid.") if body != None: while decoded_body[-1] == "=": decoded_body = decoded_body[:-1] m = hashlib.sha256() m.update(bytes(body, 'utf-8')) m = m.digest() generated_hash = base64.urlsafe_b64encode(m).decode() if generated_hash != decoded_body and generated_hash != decoded_body + "=" : raise Exception("Body hash doesn't match.") ``` ---------------------------------------- TITLE: Initialize Upstash QStash Client DESCRIPTION: This TypeScript snippet demonstrates how to import the `Client` class from the `@upstash/qstash` library and initialize a new client instance. A valid QStash token is required for authentication and must be provided during client instantiation. SOURCE: https://upstash.com/docs/qstash/sdks/ts/gettingstarted.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "", }); ``` ---------------------------------------- TITLE: Queue Image Processing Task in Next.js with QStash DESCRIPTION: This code snippet demonstrates how to upload an image and then asynchronously queue an image processing task using QStash. It shows publishing a JSON message with a target URL and body containing the image ID, returning a response with the QStash message ID. The URL needs to be publicly available for QStash to call. SOURCE: https://upstash.com/docs/qstash/quickstarts/vercel-nextjs.mdx LANGUAGE: tsx CODE: ``` import { Client } from "@upstash/qstash" import { NextResponse } from "next/server" const client = new Client({ token: process.env.QSTASH_TOKEN! }) export const POST = async (req: Request) => { // Image uploading logic // 👇 Once uploading is done, queue an image processing task const result = await client.publishJSON({ url: "https://your-api-endpoint.com/process-image", body: { imageId: "123" } }) return NextResponse.json({ message: "Image queued for processing!", qstashMessageId: result.messageId }) } ``` ---------------------------------------- TITLE: Enqueue LLM Chat Completion Request with QStash and Anthropic DESCRIPTION: Shows how to enqueue an LLM chat completion request for asynchronous processing using QStash's `enqueueJSON` method. This example leverages a named queue and configures the request with the Anthropic provider for LLM interactions. SOURCE: https://upstash.com/docs/qstash/integrations/anthropic.mdx LANGUAGE: typescript CODE: ``` import { anthropic, Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const result = await client.queue({ queueName: "your-queue-name" }).enqueueJSON({ api: { name: "llm", provider: anthropic({ token: "" }) }, body: { model: "claude-3-5-sonnet-20241022", messages: [ { role: "user", content: "Generate ideas for a marketing campaign.", }, ], }, callback: "https://example.com/callback", }); console.log(result); ``` ---------------------------------------- TITLE: Configure Webhook with QStash Publish URL DESCRIPTION: This example shows how to redirect a webhook call through QStash by constructing a special publish URL. The original webhook endpoint is embedded within the QStash URL, along with a QStash token for authentication. This allows QStash to receive the request and forward it with additional configurations. SOURCE: https://upstash.com/docs/qstash/howto/webhook.mdx LANGUAGE: HTTP CODE: ``` https://qstash.upstash.io/v2/publish/https://example.com/api/webhook?qstash_token= ``` ---------------------------------------- TITLE: Verify QStash Request Signature in Next.js API Route DESCRIPTION: This snippet demonstrates how to create a Next.js API route (`/api/long-task`) that simulates a long-running background task. It integrates `@upstash/qstash/nextjs` to verify the incoming request's signature, ensuring the request originates from QStash. The route processes JSON data and performs a series of `fetch` calls with a delay to simulate work. SOURCE: https://upstash.com/docs/qstash/quickstarts/vercel-nextjs.mdx LANGUAGE: ts CODE: ``` import { verifySignatureAppRouter } from "@upstash/qstash/nextjs" async function handler(request: Request) { const data = await request.json() for (let i = 0; i < 10; i++) { await fetch("https://firstqstashmessage.requestcatcher.com/test", { method: "POST", body: JSON.stringify(data), headers: { "Content-Type": "application/json" } }) await new Promise((resolve) => setTimeout(resolve, 500)) } return Response.json({ success: true }) } export const POST = verifySignatureAppRouter(handler) ``` ---------------------------------------- TITLE: Set Custom Retry Limit for QStash Messages DESCRIPTION: Demonstrates how to specify a custom maximum number of retries for a message using the 'Upstash-Retries' header in cURL, and the 'retries' parameter in the TypeScript and Python SDKs. This allows overriding the default retry behavior. SOURCE: https://upstash.com/docs/qstash/features/retry.mdx LANGUAGE: cURL CODE: ``` curl -XPOST \ -H 'Authorization: Bearer XXX' \ -H "Content-type: application/json" \ -H "Upstash-Retries: 2" \ -d '{ "hello": "world" }' \ 'https://qstash.upstash.io/v2/publish/https://my-api...' ``` LANGUAGE: TypeScript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const res = await client.publishJSON({ url: "https://my-api...", body: { hello: "world" }, retries: 2, }); ``` LANGUAGE: Python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json( url="https://my-api...", body={ "hello": "world", }, retries=2, ) ``` ---------------------------------------- TITLE: Complete Cloudflare Worker for QStash Webhook Reception DESCRIPTION: The full TypeScript code for a Cloudflare Worker that integrates QStash webhook reception. It includes importing the `Receiver`, defining environment variables, initializing the receiver, and verifying incoming webhook signatures, with error handling for invalid signatures. SOURCE: https://upstash.com/docs/qstash/quickstarts/cloudflare-workers.mdx LANGUAGE: ts CODE: ``` import { Receiver } from "@upstash/qstash"; export interface Env { QSTASH_CURRENT_SIGNING_KEY: string; QSTASH_NEXT_SIGNING_KEY: string; } export default { async fetch( request: Request, env: Env, ctx: ExecutionContext ): Promise { const c = new Receiver({ currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY, nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY, }); const body = await request.text(); const isValid = await c .verify({ signature: request.headers.get("Upstash-Signature")!, body, }) .catch((err) => { console.error(err); return false; }); if (!isValid) { return new Response("Invalid signature", { status: 401 }); } console.log("The signature was valid"); // do work here return new Response("Hello World!"); } }; ``` ---------------------------------------- TITLE: Enqueueing a Single LLM Chat Completion Request to a QStash Queue DESCRIPTION: Illustrates how to enqueue a single OpenAI-compatible LLM chat completion request to a named QStash queue. This approach uses `enqueueJSON` and is suitable for scenarios where messages need to be processed from a queue, leveraging QStash's queueing capabilities. SOURCE: https://upstash.com/docs/qstash/integrations/llm.mdx LANGUAGE: JavaScript CODE: ``` import { Client, upstash } from "@upstash/qstash"; const client = new Client({ token: "", }); const result = await client.queue({ queueName: "queue-name" }).enqueueJSON({ api: { name: "llm", provider: openai({ token: "_OPEN_AI_TOKEN_"}) }, body: { "model": "gpt-3.5-turbo", messages: [ { role: "user", content: "Write a hello world program in Rust.", }, ], }, callback: "https://abc.requestcatcher.com", }); console.log(result); ``` LANGUAGE: Python CODE: ``` from qstash import QStash from qstash.chat import upstash q = QStash("") result = q.message.enqueue_json( queue="queue-name", api={"name": "llm", "provider": openai("")}, body={ "model": "gpt-3.5-turbo", "messages": [ { "role": "user", "content": "Write a hello world program in Rust.", } ], }, callback="https://abc.requestcatcher.com", ) print(result) ``` ---------------------------------------- TITLE: Receive and Verify QStash Message in Next.js DESCRIPTION: This snippet shows how to create an endpoint to receive and process messages from QStash. It includes verifying the message signature using `verifySignatureAppRouter` to ensure the message originates from QStash, then parsing the request body to extract the image ID for processing. SOURCE: https://upstash.com/docs/qstash/quickstarts/vercel-nextjs.mdx LANGUAGE: tsx CODE: ``` import { verifySignatureAppRouter } from "@upstash/qstash/nextjs" // 👇 Verify that this messages comes from QStash export const POST = verifySignatureAppRouter(async (req: Request) => { const body = await req.json() const { imageId } = body as { imageId: string } // Image processing logic, i.e. using sharp return new Response(`Image with id "${imageId}" processed successfully.`) }) ``` ---------------------------------------- TITLE: Initialize QStash Receiver with Signing Keys DESCRIPTION: Instantiates the `Receiver` class, providing it with the `currentSigningKey` and `nextSigningKey` retrieved from the Cloudflare Worker's environment variables. This setup prepares the receiver for signature validation. SOURCE: https://upstash.com/docs/qstash/quickstarts/cloudflare-workers.mdx LANGUAGE: ts CODE: ``` const receiver = new Receiver({ currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY, nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY, }); ``` ---------------------------------------- TITLE: Next.js API Route for Bitcoin Price Scraping DESCRIPTION: Implements a serverless Next.js API route (`/pages/api/cron.ts`) that periodically fetches the current Bitcoin price from `blockchain.info`, stores it in an Upstash Redis database with a timestamp, and is secured using `verifySignature` from `@upstash/qstash/nextjs` to ensure requests originate from Upstash. The `bodyParser` is disabled to allow raw request body access for signature verification. SOURCE: https://upstash.com/docs/qstash/recipes/periodic-data-updates.mdx LANGUAGE: typescript CODE: ``` import { NextApiRequest, NextApiResponse } from "next"; import { Redis } from "@upstash/redis"; import { verifySignature } from "@upstash/qstash/nextjs"; /** * You can use any database you want, in this case we use Redis */ const redis = Redis.fromEnv(); /** * Load the current bitcoin price in USD and store it in our database at the * current timestamp */ async function handler(_req: NextApiRequest, res: NextApiResponse) { try { /** * The API returns something like this: * ```json * { * "USD": { * "last": 123 * }, * ... * } * ``` */ const raw = await fetch("https://blockchain.info/ticker"); const prices = await raw.json(); const bitcoinPrice = prices["USD"]["last"] as number; /** * After we have loaded the current bitcoin price, we can store it in the * database together with the current time */ await redis.zadd("bitcoin-prices", { score: Date.now(), member: bitcoinPrice, }); res.send("OK"); } catch (err) { res.status(500).send(err); } finally { res.end(); } } /** * Wrap your handler with `verifySignature` to automatically reject all * requests that are not coming from Upstash. */ export default verifySignature(handler); /** * To verify the authenticity of the incoming request in the `verifySignature` * function, we need access to the raw request body. */ export const config = { api: { bodyParser: false, }, }; ``` ---------------------------------------- TITLE: Handle Burst Rate Limit Error in TypeScript DESCRIPTION: This TypeScript example illustrates how to catch and handle `QstashRatelimitError` when the burst API rate limit is exceeded. It logs the reset time for the burst limit, suggesting the implementation of exponential backoff or delays before retrying requests to avoid continuous rate limiting. SOURCE: https://upstash.com/docs/qstash/api/api-ratelimiting.mdx LANGUAGE: typescript CODE: ``` import { QstashRatelimitError } from "@upstash/qstash"; try { // Example of a request that could hit the burst rate limit const result = await client.publishJSON({ url: "https://my-api...", // or urlGroup: "the name or id of a url group" body: { hello: "world", }, }); } catch (error) { if (error instanceof QstashRatelimitError) { console.log("Burst rate limit exceeded. Retry after:", error.reset); // Implement exponential backoff or delay before retrying } else { console.error("An unexpected error occurred:", error); } } ``` ---------------------------------------- TITLE: Initialize and Use Synchronous QStash Client DESCRIPTION: This snippet demonstrates how to initialize the synchronous QStash client by providing your QStash token. It then shows a basic example of publishing a JSON message. This client blocks execution until the message publishing operation completes. SOURCE: https://upstash.com/docs/qstash/sdks/py/gettingstarted.mdx LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json(...) ``` ---------------------------------------- TITLE: Implement QStash Webhook Receiver in Go DESCRIPTION: Complete Go code for setting up an HTTP server, handling incoming QStash webhooks, importing necessary packages, and verifying the QStash signature using JWT and body hash validation. This includes the main function for the server and the helper function for signature verification. SOURCE: https://upstash.com/docs/qstash/quickstarts/fly-io/go.mdx LANGUAGE: go CODE: ``` package main import ( "crypto/sha256" "encoding/base64" "fmt" "github.com/golang-jwt/jwt/v4" "io" "net/http" "os" "time" ) ``` LANGUAGE: go CODE: ``` func main() { port := os.Getenv("PORT") if port == "" { port = "8080" } http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() currentSigningKey := os.Getenv("QSTASH_CURRENT_SIGNING_KEY") nextSigningKey := os.Getenv("QSTASH_NEXT_SIGNING_KEY") tokenString := r.Header.Get("Upstash-Signature") body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } err = verify(body, tokenString, currentSigningKey) if err != nil { fmt.Printf("Unable to verify signature with current signing key: %v", err) err = verify(body, tokenString, nextSigningKey) } if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) return } // handle your business logic here w.WriteHeader(http.StatusOK) }) fmt.Println("listening on", port) err := http.ListenAndServe(":"+port, nil) if err != nil { panic(err) } } ``` LANGUAGE: go CODE: ``` func verify(body []byte, tokenString, signingKey string) error { token, err := jwt.Parse( tokenString, func(token *jwt.Token) (interface{}, error) { if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { return nil, fmt.Errorf("Unexpected signing method: %v", token.Header["alg"]) } return []byte(signingKey), nil }) if err != nil { return err } claims, ok := token.Claims.(jwt.MapClaims) if !ok || !token.Valid { return fmt.Errorf("Invalid token") } if !claims.VerifyIssuer("Upstash", true) { return fmt.Errorf("invalid issuer") } if !claims.VerifyExpiresAt(time.Now().Unix(), true) { return fmt.Errorf("token has expired") } if !claims.VerifyNotBefore(time.Now().Unix(), true) { return fmt.Errorf("token is not valid yet") } bodyHash := sha256.Sum256(body) if claims["body"] != base64.URLEncoding.EncodeToString(bodyHash[:]) { return fmt.Errorf("body hash does not match") } return nil } ``` ---------------------------------------- TITLE: Implement QStash signature verification function DESCRIPTION: This TypeScript function (`verify`) manually validates a QStash signature (JWT) against a provided signing key, request body, and URL. It checks the JWT structure, signature, issuer, subject (URL), expiration, and 'not before' times. It also verifies the body hash if a body is present, ensuring data integrity. This function is crucial for secure processing of QStash webhooks. SOURCE: https://upstash.com/docs/qstash/quickstarts/aws-lambda/nodejs.mdx LANGUAGE: ts CODE: ``` /** * @param jwt - The content of the `upstash-signature` header (JWT) * @param signingKey - The signing key to use to verify the signature (Get it from Upstash Console) * @param body - The raw body of the request * @param url - The public URL of the lambda function */ async function verify( jwt: string, signingKey: string, body: string | null, url: string ): Promise { const split = jwt.split(".") if (split.length != 3) { throw new Error("Invalid JWT") } const [header, payload, signature] = split if ( signature != createHmac("sha256", signingKey) .update(`${header}.${payload}`) .digest("base64url") ) { throw new Error("Invalid JWT signature") } // JWT is verified, start looking at payload claims const p: { sub: string iss: string exp: number nbf: number body: string } = JSON.parse(Buffer.from(payload, "base64url").toString()) if (p.iss !== "Upstash") { throw new Error(`invalid issuer: ${p.iss}, expected "Upstash"`) } if (p.sub !== url) { throw new Error(`invalid subject: ${p.sub}, expected "${url}"`) } const now = Math.floor(Date.now() / 1000) if (now > p.exp) { throw new Error("token has expired") } if (now < p.nbf) { throw new Error("token is not yet valid") } if (body != null) { if ( p.body.replace(/=+$/, "") != createHash("sha256").update(body).digest("base64url") ) { throw new Error("body hash does not match") } } } ``` ---------------------------------------- TITLE: Publish Message with Failure Callback DESCRIPTION: Demonstrates how to publish a message to QStash with a specified failure callback URL. The callback URL will be invoked if the message delivery fails after all retries. Examples are provided for cURL, TypeScript, and Python. SOURCE: https://upstash.com/docs/qstash/features/callbacks.mdx LANGUAGE: bash CODE: ``` curl -X POST \ https://qstash.upstash.io/v2/publish/ \ -H 'Content-Type: application/json' \ -H 'Authorization: Bearer ' \ -H 'Upstash-Failure-Callback: ' \ -d '{ "hello": "world" }' ``` LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const res = await client.publishJSON({ url: "https://my-api...", body: { hello: "world" }, failureCallback: "https://my-callback...", }); ``` LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json( url="https://my-api...", body={ "hello": "world", }, failure_callback="https://my-callback...", ) ``` ---------------------------------------- TITLE: Publishing QStash Messages with Callback URL DESCRIPTION: Demonstrates how to publish a message to QStash and specify a callback URL using the `Upstash-Callback` header (cURL) or the `callback` parameter (TypeScript/Python SDKs). The callback URL will receive the response once the original message processing is complete. SOURCE: https://upstash.com/docs/qstash/features/callbacks.mdx LANGUAGE: bash CODE: ``` curl -X POST \ https://qstash.upstash.io/v2/publish/https://my-api... \ -H 'Content-Type: application/json' \ -H 'Authorization: Bearer ' \ -H 'Upstash-Callback: ' \ -d '{ "hello": "world" }' ``` LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const res = await client.publishJSON({ url: "https://my-api...", body: { hello: "world" }, callback: "https://my-callback...", }); ``` LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json( url="https://my-api...", body={ "hello": "world", }, callback="https://my-callback...", ) ``` ---------------------------------------- TITLE: Verify QStash Message Signature with Typescript SDK DESCRIPTION: This code snippet demonstrates how to initialize the `Receiver` class from the `@upstash/qstash` SDK and use its `verify` method to validate an incoming QStash message's signature. It requires providing your current and next signing keys, the message body, the signature extracted from the `Upstash-Signature` header, and your site's URL. SOURCE: https://upstash.com/docs/qstash/sdks/ts/examples/receiver.mdx LANGUAGE: typescript CODE: ``` import { Receiver } from "@upstash/qstash"; const receiver = new Receiver({ currentSigningKey: "YOUR_CURRENT_SIGNing_KEY", nextSigningKey: "YOUR_NEXT_SIGNING_KEY", }); // ... in your request handler const signature = req.headers["Upstash-Signature"]; const body = req.body; const isValid = receiver.verify({ body, signature, url: "YOUR-SITE-URL", }); ``` ---------------------------------------- TITLE: Publish with Callback URL and Custom Method (TypeScript) DESCRIPTION: [Callbacks](/qstash/features/callbacks) are useful for long running functions. Here, QStash will return the response of the publish request to the callback URL. We also change the `method` to `GET` in this use case so QStash will make a `GET` request to the `url`. The default is `POST`. SOURCE: https://upstash.com/docs/qstash/sdks/ts/examples/publish.mdx LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const res = await client.publishJSON({ url: "https://my-api...", body: { hello: "world" }, callback: "https://my-callback...", failureCallback: "https://my-failure-callback...", method: "GET", }); ``` ---------------------------------------- TITLE: Pause and Resume a queue using QStash Python SDK DESCRIPTION: This snippet demonstrates how to control the operational state of a QStash queue by pausing and resuming it. It first ensures the queue exists (or creates it) with a specified parallelism. Then, it uses `client.queue.pause()` to halt message processing, verifies the paused state, and finally uses `client.queue.resume()` to restart message processing for the queue. SOURCE: https://upstash.com/docs/qstash/sdks/py/examples/queues.mdx LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") queue_name = "upstash-queue" client.queue.upsert(queue_name, parallelism=1) client.queue.pause(queue_name) queue = client.queue.get(queue_name) print(queue.paused) # prints True client.queue.resume(queue_name) ``` ---------------------------------------- TITLE: Create a queue with parallelism using QStash Python SDK DESCRIPTION: This snippet demonstrates how to create or update a queue using the QStash Python SDK, specifically setting its parallelism. It initializes the QStash client, defines a queue name, and then uses `client.queue.upsert()` to create or modify the queue with a specified parallelism level. Finally, it retrieves and prints the queue's details. SOURCE: https://upstash.com/docs/qstash/sdks/py/examples/queues.mdx LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") queue_name = "upstash-queue" client.queue.upsert(queue_name, parallelism=2) print(client.queue.get(queue_name)) ``` ---------------------------------------- TITLE: Implement AWS Lambda handler for QStash verification DESCRIPTION: This TypeScript code defines an AWS Lambda handler function (`handler`) that processes incoming API Gateway events. It attempts to verify the `upstash-signature` header using both current and next signing keys to allow for key rotation. If verification fails, it returns a 400 error; otherwise, it proceeds with business logic and returns a 200 success. SOURCE: https://upstash.com/docs/qstash/quickstarts/aws-lambda/nodejs.mdx LANGUAGE: ts CODE: ``` import type { APIGatewayEvent, APIGatewayProxyResult } from "aws-lambda" import { createHash, createHmac } from "node:crypto" export const handler = async ( event: APIGatewayEvent, ): Promise => { const signature = event.headers["upstash-signature"] ?? "" const currentSigningKey = process.env.QSTASH_CURRENT_SIGNING_KEY ?? "" const nextSigningKey = process.env.QSTASH_NEXT_SIGNING_KEY ?? "" const url = `https://${event.requestContext.domainName}` try { // Try to verify the signature with the current signing key and if that fails, try the next signing key // This allows you to roll your signing keys once without downtime await verify(signature, currentSigningKey, event.body, url).catch((err) => { console.error( `Failed to verify signature with current signing key: ${err}` ) return verify(signature, nextSigningKey, event.body, url) }) } catch (err) { const message = err instanceof Error ? err.toString() : err return { statusCode: 400, body: JSON.stringify({ error: message }), } } // Add your business logic here return { statusCode: 200, body: JSON.stringify({ message: "Request processed successfully" }), } } ``` ---------------------------------------- TITLE: Publish LLM Request with QStash and Anthropic DESCRIPTION: Demonstrates how to publish a single LLM chat completion request to Anthropic's API using QStash's `publishJSON` method. It shows how to specify the LLM API with the Anthropic provider and include an asynchronous callback URL for handling responses. SOURCE: https://upstash.com/docs/qstash/integrations/anthropic.mdx LANGUAGE: typescript CODE: ``` import { anthropic, Client } from "@upstash/qstash"; const client = new Client({ token: "" }); await client.publishJSON({ api: { name: "llm", provider: anthropic({ token: "" }) }, body: { model: "claude-3-5-sonnet-20241022", messages: [{ role: "user", content: "Summarize recent tech trends." }], }, callback: "https://example.com/callback", }); ``` ---------------------------------------- TITLE: Publishing a QStash Message with a Failure Callback DESCRIPTION: This snippet demonstrates how to publish a message to QStash while specifying a failure callback URL. The provided callback URL will be invoked by QStash if the message fails to be delivered, allowing for custom error handling such as logging or alerting. It requires a QStash token, a destination URL, and the callback URL. SOURCE: https://upstash.com/docs/qstash/howto/handling-failures.mdx LANGUAGE: bash CODE: ``` curl -X POST \ https://qstash.upstash.io/v2/publish/ \ -H 'Content-Type: application/json' \ -H 'Authorization: Bearer ' \ -H 'Upstash-Failure-Callback: ' \ -d '{ "hello": "world" }' ``` LANGUAGE: typescript CODE: ``` import { Client } from "@upstash/qstash"; const client = new Client({ token: "" }); const res = await client.publishJSON({ url: "https://my-api...", body: { hello: "world" }, failureCallback: "https://my-callback..." }); ``` LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.publish_json( url="https://my-api...", body={ "hello": "world", }, failure_callback="https://my-callback...", ) ``` ---------------------------------------- TITLE: Implement AWS Lambda Handler for QStash Webhook Verification DESCRIPTION: This Python `lambda_handler` function processes incoming AWS Lambda events. It parses QStash signature and signing keys from environment variables and headers, then attempts to verify the webhook signature using both current and next signing keys. If verification fails, it returns a 400 error; otherwise, it proceeds with custom logic. SOURCE: https://upstash.com/docs/qstash/quickstarts/aws-lambda/python.mdx LANGUAGE: python CODE: ``` def lambda_handler(event, context): # parse the inputs current_signing_key = os.environ['QSTASH_CURRENT_SIGNING_KEY'] next_signing_key = os.environ['QSTASH_NEXT_SIGNING_KEY'] headers = event['headers'] signature = headers['upstash-signature'] url = "https://{}{}".format(event["requestContext"]["domainName"], event["rawPath"]) body = None if 'body' in event: body = event['body'] # check verification now try: verify(signature, current_signing_key, body, url) except Exception as e: print("Failed to verify signature with current signing key:", e) try: verify(signature, next_signing_key, body, url) except Exception as e2: return { "statusCode": 400, "body": json.dumps({ "error": str(e2), }), } # Your logic here... return { "statusCode": 200, "body": json.dumps({ "message": "ok", }), } ``` ---------------------------------------- TITLE: Publish to a URL with a 3 second delay and headers/body DESCRIPTION: This snippet shows how to publish a JSON message to a specified URL using the QStash client. It includes examples of adding a delay, custom headers, and a JSON body. The message ID of the published message is then printed. SOURCE: https://upstash.com/docs/qstash/sdks/py/examples/publish.mdx LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") res = client.message.publish_json( url="https://my-api...", body={ "hello": "world", }, headers={ "test-header": "test-value", }, delay="3s", ) print(res.message_id) ``` ---------------------------------------- TITLE: Next.js Server Action: Publish QStash Message DESCRIPTION: This Next.js server action is responsible for publishing a message to QStash. It initializes the QStash client with an environment variable token and uses `publishJSON` to send a payload to a specified URL, returning the `messageId` or `null` on error. SOURCE: https://upstash.com/docs/qstash/quickstarts/vercel-nextjs.mdx LANGUAGE: ts CODE: ``` "use server" import { Client } from "@upstash/qstash"; const qstashClient = new Client({ token: process.env.QSTASH_TOKEN!, }); export async function startBackgroundJob() { try { const response = await qstashClient.publishJSON({ "url": "https://qstash-bg-job.vercel.app/api/long-task", body: { "hello": "world" } }); return response.messageId; } catch (error) { console.error(error); return null; } } ``` ---------------------------------------- TITLE: Update Next.js UI to Invoke Background Job Server Action DESCRIPTION: Modifies the `src/app/page.tsx` component to integrate the `startBackgroundJob` server action. The `handleClick` function is now defined to asynchronously call this action when the button is clicked, initiating the QStash message publication. SOURCE: https://upstash.com/docs/qstash/quickstarts/vercel-nextjs.mdx LANGUAGE: tsx CODE: ``` "use client" import { startBackgroundJob } from "@/app/actions" export default function Home() { async function handleClick() { await startBackgroundJob() } return (
) } ``` ---------------------------------------- TITLE: Create QStash Schedule with cURL, Node.js, Python, and Go DESCRIPTION: Practical code examples demonstrating how to create a scheduled message using the QStash API. These examples show how to make a POST request to the schedules endpoint, including the Authorization header with a Bearer token and the Upstash-Cron header for defining the schedule. SOURCE: https://upstash.com/docs/qstash/api/schedules/create.mdx LANGUAGE: curl CODE: ``` curl -XPOST https://qstash.upstash.io/v2/schedules/https://www.example.com/endpoint \ -H "Authorization: Bearer " \ -H "Upstash-Cron: */5 * * * *" ``` LANGUAGE: js CODE: ``` const response = await fetch('https://qstash.upstash.io/v2/schedules/https://www.example.com/endpoint', { method: 'POST', headers: { 'Authorization': 'Bearer ', 'Upstash-Cron': '*/5 * * * *' } }); ``` LANGUAGE: python CODE: ``` import requests headers = { 'Authorization': 'Bearer ', 'Upstash-Cron': '*/5 * * * *' } response = requests.post( 'https://qstash.upstash.io/v2/schedules/https://www.example.com/endpoint', headers=headers ) ``` LANGUAGE: go CODE: ``` req, err := http.NewRequest("POST", "https://qstash.upstash.io/v2/schedules/https://www.example.com/endpoint", nil) if err != nil { log.Fatal(err) } req.Header.Set("Authorization", "Bearer ") req.Header.Set("Upstash-Cron", "*/5 * * * *") resp, err := http.DefaultClient.Do(req) if err != nil { log.Fatal(err) } defer resp.Body.Close() ``` ---------------------------------------- TITLE: Authenticate QStash API with Bearer Token using cURL DESCRIPTION: This snippet demonstrates how to authenticate requests to the QStash API by including the QSTASH_TOKEN as a Bearer token in the 'Authorization' HTTP header when using cURL. The token is obtained from the Upstash console. SOURCE: https://upstash.com/docs/qstash/api/authentication.mdx LANGUAGE: bash CODE: ``` curl https://qstash.upstash.io/v2/publish/... \ -H "Authorization: Bearer " ``` ---------------------------------------- TITLE: Enqueue Message to QStash Queue for Ordered Delivery DESCRIPTION: This snippet demonstrates how to enqueue a message into a QStash queue, ensuring ordered delivery (FIFO). Messages are sent to a specified URL via the queue. It shows examples using cURL, TypeScript, and Python. SOURCE: https://upstash.com/docs/qstash/features/queues.mdx LANGUAGE: bash CODE: ``` curl -XPOST -H 'Authorization: Bearer XXX' \ -H "Content-type: application/json" \ 'https://qstash.upstash.io/v2/enqueue/my-queue/https://example.com' -d '{"message":"Hello, World!"}' ``` LANGUAGE: typescript CODE: ``` const client = new Client({ token: "" }); const queue = client.queue({ queueName: "my-queue" }) await queue.enqueueJSON({ url: "https://example.com", body: { "Hello": "World" } }) ``` LANGUAGE: python CODE: ``` from qstash import QStash client = QStash("") client.message.enqueue_json( queue="my-queue", url="https://example.com", body={ "Hello": "World", }, ) ```