### Install Build Tools and Dependencies Source: https://github.com/restatedev/sdk-python/blob/main/README.md Installs the required build tools and project dependencies using pip. This command reads from the requirements.txt file to set up the development environment. ```shell pip install -r requirements.txt ``` -------------------------------- ### Setup Restate ASGI Application in Python Source: https://context7.com/restatedev/sdk-python/llms.txt Provides a basic structure for setting up a Restate ASGI application in Python. This allows you to deploy your Restate services using standard ASGI servers. The example shows how to import necessary components and define different types of Restate services like `Service`, `VirtualObject`, and `Workflow`. Requires the 'restate-sdk' package. ```python import restate from restate import Service, VirtualObject, Workflow # Define your services greeter = Service("greeter") counter = VirtualObject("counter") payment = Workflow("payment") # ... define handlers ... ``` -------------------------------- ### Install Restate Python SDK Source: https://github.com/restatedev/sdk-python/blob/main/README.md Installs the Restate SDK for Python projects. Ensure you have Python version 3.10 or higher. This command adds the necessary package to your project's dependencies. ```shell pip install restate_sdk ``` -------------------------------- ### Set up Python Virtual Environment Source: https://github.com/restatedev/sdk-python/blob/main/README.md Creates and activates a Python virtual environment for local development. This isolates project dependencies. It requires Python 3 to be installed. ```shell python3 -m venv .venv source .venv/bin/activate ``` -------------------------------- ### Integrate Restate Services with Python Test Harness Source: https://context7.com/restatedev/sdk-python/llms.txt Shows how to use the Restate Python SDK's test harness for integration testing. This feature spins up a Restate server in Docker, allowing you to test your services against a real environment. It requires `pytest` and `restate-sdk[harness]`. The example demonstrates testing a simple echo service. ```python import pytest from restate import Service, Context, create_test_harness, app # Define test service test_service = Service("test_service") @test_service.handler() async def echo(ctx: Context, message: str) -> str: return f"Echo: {message}" @test_service.handler() async def increment_counter(ctx: Context, value: int) -> int: current = await ctx.run("get", lambda: 0) return current + value # Create the app test_app = app(services=[test_service]) # Pytest integration test @pytest.mark.asyncio async def test_echo_service(): """Test service with real Restate server""" async with create_test_harness( test_app, follow_logs=False, # Set True to see Restate logs always_replay=True, # Force replay to catch non-determinism disable_retries=False ) as env: # env.client is a ready-to-use Restate client result = await env.client.service_call(echo, arg="Hello") assert result == "Echo: Hello" # Access URLs if needed print(f"Ingress URL: {env.ingress_url}") print(f"Admin API: {env.admin_api_url}") # Run with: pytest test_file.py -v # Requires: pip install restate-sdk[harness] ``` -------------------------------- ### Build Docker Image for Test Services Source: https://github.com/restatedev/sdk-python/blob/main/test-services/README.md Builds a Docker image for the Restate test services. This command requires Docker to be installed and accessible. It uses the Dockerfile located in the 'test-services' directory and tags the image as 'restatedev/test-services'. ```shell docker build . -f test-services/Dockerfile -t restatedev/test-services ``` -------------------------------- ### Manage External Events with Restate Awakeables (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates how to use Restate's awakeables to integrate with external systems. This allows durable promises to be completed by callbacks, webhooks, or external workflows. It covers starting a process, waiting for completion, and resolving or rejecting the awakeable. ```python from restate import Service, Context service = Service("awakeables") @service.handler() async def start_external_process(ctx: Context, task: str) -> str: """Start a process that will be completed by an external system""" # Create an awakeable - returns (id, future) tuple awakeable_id, future = ctx.awakeable(type_hint=str) # Send the awakeable ID to external system await ctx.run("notify_external", lambda: send_to_webhook(awakeable_id, task)) # Wait for external system to complete the awakeable result = await future return f"External process completed: {result}" @service.handler() async def complete_awakeable(ctx: Context, data: dict) -> str: """Complete an awakeable from within Restate (or use HTTP API)""" awakeable_id = data["awakeable_id"] result = data["result"] # Resolve the awakeable with a value ctx.resolve_awakeable(awakeable_id, result) return "Awakeable resolved" @service.handler() async def fail_awakeable(ctx: Context, data: dict) -> str: """Reject an awakeable with an error""" awakeable_id = data["awakeable_id"] ctx.reject_awakeable( awakeable_id, failure_message="External process failed", failure_code=500 ) return "Awakeable rejected" def send_to_webhook(awakeable_id: str, task: str): # Send to external system that will call back to complete the awakeable # External system can resolve via: POST /restate/awakeables/{id}/resolve print(f"Webhook called with awakeable_id={awakeable_id}, task={task}") ``` -------------------------------- ### Create and Run ASGI App with Restate Services Source: https://context7.com/restatedev/sdk-python/llms.txt This snippet demonstrates how to create an ASGI application using the Restate SDK, registering multiple services, and configuring the protocol and identity keys. It also shows how to run the application using Hypercorn, both as a command-line instruction and programmatically. ```python import restate import asyncio import hypercorn.asyncio from hypercorn import Config # Assume greeter, counter, and payment are defined Restate services # from your_services import greeter, counter, payment # Placeholder services for demonstration greeter = restate.Service("greeter") counter = restate.Service("counter") payment = restate.Service("payment") # Create ASGI app with all services app = restate.app( services=[greeter, counter, payment], # Optional: force protocol type protocol="bidi", # or "request_response" # Optional: identity keys for request verification identity_keys=["your-signing-key"] ) # Run with Hypercorn (recommended for HTTP/2) # hypercorn app:app --bind 0.0.0.0:9080 # Or programmatically: if __name__ == "__main__": config = Config() config.bind = ["0.0.0.0:9080"] asyncio.run(hypercorn.asyncio.serve(app, config)) # Register with Restate server: # curl http://localhost:9070/deployments --json '{"uri": "http://localhost:9080"}' ``` -------------------------------- ### Create and Push Git Tag for Release Source: https://github.com/restatedev/sdk-python/blob/main/README.md Creates a Git tag for a new release and pushes it to the origin. This command is used to mark a specific version of the SDK for release, following the format 'vX.Y.Z'. ```shell git tag -m "Release v0.1.0" v0.1.0 git push origin v0.1.0 ``` -------------------------------- ### Verify Code with Linting and Tests Source: https://github.com/restatedev/sdk-python/blob/main/README.md Runs linting and tests for the Restate Python SDK using the 'just' build tool. This command ensures code quality and correctness. ```shell just verify ``` -------------------------------- ### Deploy Restate Services to AWS Lambda Source: https://context7.com/restatedev/sdk-python/llms.txt This snippet shows how to deploy Restate services to AWS Lambda using the built-in adapter. It defines a simple service, creates an ASGI app, and then wraps it to be compatible with the AWS Lambda environment. It also includes logic for local development using an ASGI server. ```python from restate import Service, Context, app from restate.aws_lambda import wrap_asgi_as_lambda_handler, is_running_on_lambda import asyncio import hypercorn.asyncio from hypercorn import Config greeter = Service("greeter") @greeter.handler() async def greet(ctx: Context, name: str) -> str: return f"Hello {name} from Lambda!" # Create ASGI app asgi_app = app(services=[greeter]) # Wrap as Lambda handler handler = wrap_asgi_as_lambda_handler(asgi_app) # For local development, run with ASGI server if __name__ == "__main__" and not is_running_on_lambda(): config = Config() config.bind = ["0.0.0.0:9080"] asyncio.run(hypercorn.asyncio.serve(asgi_app, config)) # Lambda handler is automatically used when deployed to AWS Lambda # Configure Lambda with handler: your_module.handler ``` -------------------------------- ### Build Rust Module for Development Source: https://github.com/restatedev/sdk-python/blob/main/README.md Builds the Rust module for the Restate SDK and includes optional development dependencies for linting and testing. This command uses 'maturin' and might need to be re-run after pulling changes. ```shell maturin dev -E test,lint ``` -------------------------------- ### Define a Restate Service with Stateless Handlers (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates how to define a Restate Service with stateless handlers using the Python SDK. Services are suitable for request/response patterns without persistent state. It includes a basic greet handler and a handler demonstrating durable side effects using `ctx.run()`. ```python from restate import Service, Context import restate # Create a logger that hides logs during replay logger = restate.getLogger() greeter = Service("greeter") @greeter.handler() async def greet(ctx: Context, name: str) -> str: """Simple greeting handler""" logger.info("Received greeting request: %s", name) return f"Hello {name}!" @greeter.handler() async def greet_with_side_effect(ctx: Context, name: str) -> str: """Handler demonstrating durable side effects with ctx.run()""" # The run() method ensures this side effect is executed exactly once # Even if the handler retries, the result is replayed from the journal greeting = await ctx.run("fetch_greeting", lambda: fetch_from_database(name)) return greeting def fetch_from_database(name: str) -> str: # Simulate external call return f"Hello {name} from the database!" # Create ASGI app and run with Hypercorn app = restate.app(services=[greeter]) # Run the service # hypercorn example:app --bind 0.0.0.0:9080 ``` -------------------------------- ### Prepare for Package Release Source: https://github.com/restatedev/sdk-python/blob/main/README.md Updates the module version in Cargo.toml and performs a local build to update Cargo.lock. This is a prerequisite step before creating a Git tag for a new release. ```shell git checkout main && git pull # Update module version in Cargo.toml and run a local build to update the Cargo.lock too, commit it. ``` -------------------------------- ### Use Deterministic Utilities in Restate (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates the use of deterministic utilities for random number generation, UUID creation, and time retrieval within Restate. These utilities ensure consistent results across retries and replays without requiring explicit journaling, enhancing idempotency. ```python from datetime import datetime, timedelta from restate import Service, Context service = Service("deterministic") @service.handler() async def demo_deterministic(ctx: Context, name: str) -> dict: """Demonstrate deterministic utilities""" # ctx.random() returns a seeded Python Random instance # Values are deterministic across retries without journal entries random = ctx.random() random_int = random.randint(0, 100) random_float = random.random() random_choice = random.choice(["A", "B", "C"]) random_bytes = random.randbytes(16) # ctx.uuid() returns a deterministic UUID v4 unique_id = ctx.uuid() # ctx.time() returns current timestamp (journaled for durability) timestamp = await ctx.time() current_time = datetime.fromtimestamp(timestamp) return { "random_int": random_int, "random_float": random_float, "random_choice": random_choice, "random_bytes": random_bytes.hex(), "uuid": str(unique_id), "timestamp": timestamp, "datetime": current_time.isoformat() } @service.handler() async def measure_duration(ctx: Context) -> float: """Measure operation duration durably""" start = await ctx.time() # Perform some operation await ctx.sleep(timedelta(seconds=1)) end = await ctx.time() duration_seconds = end - start return duration_seconds ``` -------------------------------- ### Python Context.run() for Durable Side Effects Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates the use of `ctx.run()` in Restate SDK for Python to execute side effects durably, ensuring exactly-once execution. It supports basic execution, retries, and typed calls with options. ```python from datetime import timedelta from restate import Service, Context, RunOptions service = Service("side_effects") @service.handler() async def process_order(ctx: Context, order_id: str) -> dict: """Process order with multiple durable side effects""" # Simple side effect - result is journaled inventory = await ctx.run("check_inventory", lambda: check_inventory_api(order_id)) # Side effect with retry configuration payment = await ctx.run( "process_payment", lambda: charge_payment_api(order_id), max_attempts=3, max_retry_duration=timedelta(seconds=30) ) # Using run_typed for better type hints and passing arguments shipping = await ctx.run_typed( "create_shipment", create_shipment_api, RunOptions(max_attempts=5), order_id, # positional arguments passed to the function ) return { "order_id": order_id, "inventory": inventory, "payment": payment, "shipping": shipping } async def check_inventory_api(order_id: str) -> bool: # External API call return True async def charge_payment_api(order_id: str) -> str: # External API call return "payment_123" async def create_shipment_api(order_id: str) -> str: # External API call return "shipment_456" ``` -------------------------------- ### Service-to-Service Communication with Restate SDK (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates synchronous and asynchronous calls to services and virtual objects using the Restate SDK. Supports delayed sends and idempotency keys for reliable communication. ```python from datetime import timedelta from restate import Service, VirtualObject, Context, ObjectContext notification_service = Service("notifications") user_object = VirtualObject("user") @notification_service.handler() async def send_email(ctx: Context, message: str) -> str: return f"Email sent: {message}" @user_object.handler() async def get_profile(ctx: ObjectContext) -> dict: return {"name": await ctx.get("name"), "email": await ctx.get("email")} orchestrator = Service("orchestrator") @orchestrator.handler() async def orchestrate(ctx: Context, user_id: str) -> dict: """Demonstrate various call patterns""" # Synchronous call to a service handler - waits for response email_result = await ctx.service_call(send_email, arg="Welcome!") # Synchronous call to a virtual object - requires key profile = await ctx.object_call(get_profile, key=user_id, arg=None) # Asynchronous send - fire and forget, returns immediately handle = ctx.service_send(send_email, arg="Async notification") invocation_id = await handle.invocation_id() # Get ID for tracking # Delayed send - execute after specified duration ctx.service_send( send_email, arg="Reminder email", send_delay=timedelta(hours=24) ) # With idempotency key - prevents duplicate executions ctx.service_send( send_email, arg="Important notification", idempotency_key=f"notify-{user_id}-welcome" ) # Cancel a pending invocation # await ctx.cancel_invocation(invocation_id) return {"email": email_result, "profile": profile} ``` -------------------------------- ### Invoke Restate Services Externally with Python Client Source: https://context7.com/restatedev/sdk-python/llms.txt Demonstrates how to use the Restate Python SDK's HTTP client to invoke virtual objects from external applications. It covers synchronous calls, asynchronous sends with delays, and calls with idempotency keys. Requires the 'restate-sdk' package. ```python import asyncio from datetime import timedelta import restate from restate import VirtualObject, ObjectContext # Define the service (in your service code) counter = VirtualObject("counter") @counter.handler() async def increment(ctx: ObjectContext, value: int) -> int: n = await ctx.get("counter", type_hint=int) or 0 n += value ctx.set("counter", n) return n @counter.handler() async def get_count(ctx: ObjectContext) -> int: return await ctx.get("counter", type_hint=int) or 0 # Client code (in your application) async def main(): # Create client pointing to Restate ingress async with restate.create_client("http://localhost:8080") as client: # Synchronous call to virtual object result = await client.object_call( increment, key="my-counter", arg=5 ) print(f"New count: {result}") # Async send with delay handle = await client.object_send( increment, key="my-counter", arg=10, send_delay=timedelta(minutes=5) ) print(f"Scheduled invocation: {handle.invocation_id}") # Call with idempotency key result = await client.object_call( increment, key="my-counter", arg=1, idempotency_key="unique-operation-123" ) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Run Restate SDK Test Suite Source: https://github.com/restatedev/sdk-python/blob/main/test-services/README.md Executes the Restate SDK test suite locally. This command requires a Java Development Kit (JDK) version 17 or higher. It runs the 'restate-sdk-test-suite.jar' with specified exclusions and the Docker image containing the test services. ```shell java -jar restate-sdk-test-suite.jar run --exclusions-file test-services/exclusions.yaml restatedev/test-services ``` -------------------------------- ### Use Pydantic Models for Type-Safe Serialization in Restate (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Illustrates how to leverage Pydantic models for automatic serialization and deserialization of handler inputs and outputs in the Restate Python SDK. This ensures type safety and simplifies data handling for complex request and response structures. ```python from pydantic import BaseModel from typing import List, Optional from restate import Service, Context class OrderRequest(BaseModel): customer_id: str items: List[str] priority: Optional[str] = "normal" class OrderResponse(BaseModel): order_id: str status: str total_items: int class ShippingInfo(BaseModel): address: str carrier: str order_service = Service("orders") @order_service.handler() async def create_order(ctx: Context, request: OrderRequest) -> OrderResponse: """Handler with Pydantic input/output - automatic serialization""" order_id = str(ctx.uuid()) # Request is automatically deserialized from JSON await ctx.run("log_order", lambda: print(f"Order for {request.customer_id}")) # Response is automatically serialized to JSON return OrderResponse( order_id=order_id, status="created", total_items=len(request.items) ) @order_service.handler() async def process_order(ctx: Context, order_id: str) -> ShippingInfo: """Return complex Pydantic models""" return ShippingInfo( address="123 Main St", carrier="FastShip" ) # Invoke with JSON: # curl http://localhost:8080/orders/create_order \ # --json '{"customer_id": "cust-123", "items": ["item1", "item2"]}' ``` -------------------------------- ### Concurrent Operations with Restate SDK (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Illustrates how to manage multiple concurrent operations using Restate SDK utilities like gather, select, as_completed, and wait_completed. Ensures durable execution of parallel tasks. ```python from datetime import timedelta from restate import Service, Context, gather, select, as_completed, wait_completed service = Service("concurrent") @service.handler() async def fetch_all(ctx: Context) -> list: """Wait for all operations to complete""" f1 = ctx.service_call(slow_operation, arg="task1") f2 = ctx.service_call(slow_operation, arg="task2") f3 = ctx.service_call(slow_operation, arg="task3") # gather() waits for all futures to complete await gather(f1, f2, f3) return [await f1, await f2, await f3] @service.handler() async def fetch_first(ctx: Context) -> list: """Race multiple operations, return first to complete""" f1 = ctx.service_call(slow_operation, arg="fast") f2 = ctx.service_call(slow_operation, arg="slow") timeout = ctx.sleep(timedelta(seconds=5)) # select() returns [name, result] of first completed future match await select(fast=f1, slow=f2, timeout=timeout): case ["fast", result]: return ["fast completed", result] case ["slow", result]: return ["slow completed", result] case ["timeout", _]: return ["timed out", None] @service.handler() async def process_as_available(ctx: Context) -> list: """Process results as they become available""" futures = [ ctx.service_call(slow_operation, arg=f"task{i}") for i in range(5) ] results = [] async for completed in as_completed(*futures): result = await completed results.append(result) # Process each result immediately as it completes return results @service.handler() async def partial_completion(ctx: Context) -> dict: """Get completed and pending futures separately""" f1 = ctx.service_call(slow_operation, arg="task1") f2 = ctx.service_call(slow_operation, arg="task2") # wait_completed returns (completed, pending) tuple completed, pending = await wait_completed(f1, f2) results = [await f for f in completed] # Optionally cancel pending for f in pending: await f.cancel_invocation() return {"completed": results, "cancelled": len(pending)} @service.handler() async def slow_operation(ctx: Context, name: str) -> str: await ctx.sleep(timedelta(seconds=1)) return f"Completed: {name}" ``` -------------------------------- ### Define a Restate VirtualObject with Stateful Handlers (Python) Source: https://context7.com/restatedev/sdk-python/llms.txt Illustrates defining a Restate VirtualObject for stateful entities using the Python SDK. It shows exclusive handlers (`increment`, `reset`) for single-writer access and a shared handler (`count`) for concurrent read access. State is automatically persisted and restored. ```python from restate import VirtualObject, ObjectContext, ObjectSharedContext counter = VirtualObject("counter") @counter.handler() # Default is exclusive (single-writer) async def increment(ctx: ObjectContext, value: int) -> int: """Increment counter by value - exclusive access ensures consistency""" # Get current state (returns None if not set) n = await ctx.get("counter", type_hint=int) or 0 n += value # Set new state - automatically persisted ctx.set("counter", n) return n @counter.handler() async def reset(ctx: ObjectContext) -> None: """Reset the counter to zero""" ctx.clear("counter") @counter.handler(kind="shared") # Concurrent read access async def count(ctx: ObjectSharedContext) -> int: """Read current count - shared handler allows concurrent reads""" return await ctx.get("counter", type_hint=int) or 0 @counter.handler() async def get_all_keys(ctx: ObjectContext) -> list: """List all state keys stored for this object""" return await ctx.state_keys() # Access object by key via HTTP: # curl http://localhost:8080/counter/my-counter-id/increment --json '5' # curl http://localhost:8080/counter/my-counter-id/count ``` -------------------------------- ### Python Workflow for Payment Processing with Durable Promises Source: https://context7.com/restatedev/sdk-python/llms.txt Defines a Restate Workflow for processing payments, including human approval steps and timeouts. It uses durable promises to signal completion or failure and allows interaction through handlers. ```python from datetime import timedelta from restate import Workflow, WorkflowContext, WorkflowSharedContext from restate import select, TerminalError TIMEOUT = timedelta(seconds=60) payment = Workflow("payment") @payment.main() # Entry point - runs once per workflow instance async def pay(ctx: WorkflowContext, amount: int): """Process payment with human approval step""" workflow_key = ctx.key() ctx.set("status", "verifying payment") # Execute side effect durably def notify_payment_gateway(): print(f"Payment of ${amount} requires approval") print(f"Approve: curl http://localhost:8080/payment/{workflow_key}/approve --json '"approved"'"") print(f"Decline: curl http://localhost:8080/payment/{workflow_key}/approve --json '"declined"'"") await ctx.run_typed("notify", notify_payment_gateway) ctx.set("status", "waiting for approval") # Wait for promise resolution or timeout using select() match await select( result=ctx.promise("payment.approval").value(), timeout=ctx.sleep(TIMEOUT) ): case ["result", "approved"]: ctx.set("status", "payment approved") return {"success": True, "amount": amount} case ["result", "declined"]: ctx.set("status", "payment declined") raise TerminalError(message="Payment declined", status_code=401) case ["timeout", _]: ctx.set("status", "payment timed out") raise TerminalError(message="Payment verification timed out", status_code=410) @payment.handler() # Shared handler to interact with running workflow async def approve(ctx: WorkflowSharedContext, result: str): """Resolve the payment approval promise""" promise = ctx.promise("payment.approval", type_hint=str) await promise.resolve(result) @payment.handler() async def status(ctx: WorkflowSharedContext) -> str: """Query current workflow status""" return await ctx.get("status", type_hint=str) or "unknown" # Start workflow: # curl http://localhost:8080/payment/order-123/pay --json '100' # Check status: # curl http://localhost:8080/payment/order-123/status ``` -------------------------------- ### Handle Non-Retryable Failures with TerminalError in Python Source: https://context7.com/restatedev/sdk-python/llms.txt Illustrates how to use `TerminalError` in the Restate Python SDK to signal non-retryable failures. This prevents Restate from retrying invocations that encounter these specific errors. It also shows how to differentiate between `TerminalError` and regular exceptions for proper handling. Requires the 'restate-sdk' package. ```python from restate import Service, Context, TerminalError from restate.exceptions import is_internal_exception service = Service("error_handling") @service.handler() async def process_with_validation(ctx: Context, data: dict) -> str: """Demonstrate error handling patterns""" # Terminal errors are not retried if "required_field" not in data: raise TerminalError( message="Missing required_field", status_code=400 # HTTP status code ) # Regular exceptions are retried automatically result = await ctx.run("external_call", lambda: call_external_api(data)) return result @service.handler() async def safe_operation(ctx: Context, task: str) -> str: """Proper exception handling in Restate handlers""" try: result = await ctx.run("risky_operation", lambda: risky_api_call(task)) return result except TerminalError: # Re-raise terminal errors - they should not be retried raise except Exception as e: # Check if it's an internal SDK exception (suspension, etc.) if is_internal_exception(e): raise # Always re-raise internal exceptions # Handle application errors return f"Operation failed: {str(e)}" def call_external_api(data: dict) -> str: return "success" def risky_api_call(task: str) -> str: return f"Completed: {task}" ``` -------------------------------- ### Debug Single Test in Restate SDK Test Suite Source: https://github.com/restatedev/sdk-python/blob/main/test-services/README.md Debugs a specific test within the Restate SDK test suite. This command allows developers to run a single test suite and test case in debug mode, specifying the default service port. Ensure the Python service is running and accessible. ```shell java -jar restate-sdk-test-suite.jar debug --test-suite=lazyState --test-name=dev.restate.sdktesting.tests.State default-service=9080 ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.