### Install Dependencies Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Installs the necessary Python dependencies for the example, including Azure Blob Payloads support and Azure Managed client. ```bash pip install -e ".[azure-blob-payloads]" -e ./durabletask-azuremanaged ``` -------------------------------- ### Navigate to Example Directory Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Change the current directory to the in-memory backend example folder. ```bash cd in_memory_backend_example ``` -------------------------------- ### Start DTS Emulator Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Starts the Durable Task Framework (DTS) emulator using Docker. This is required for running the example locally. ```bash docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:latest ``` -------------------------------- ### Install Dependencies and Generate Protobufs Source: https://github.com/microsoft/durabletask-python/blob/main/docs/development.md Installs development dependencies and compiles protobuf files. Ensure 'make' is installed. ```shell pip3 install -r dev-requirements.txt make gen-proto ``` -------------------------------- ### Install Dependencies Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Installs project dependencies from the requirements.txt file. ```bash pip install -r requirements.txt ``` -------------------------------- ### Execute a Durable Task Example Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Executes a Python script that demonstrates a durable task orchestration. Ensure all prerequisites and setup steps are completed. ```bash python activity_sequence.py ``` -------------------------------- ### Run the Example Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Executes the Python application that demonstrates the large payload externalization feature. ```bash python app.py ``` -------------------------------- ### Install Durable Task SDK with Large Payload Support Source: https://github.com/microsoft/durabletask-python/blob/main/README.md Install the SDK with the `azure-blob-payloads` extra to enable automatic offloading of oversized orchestration payloads to Azure Blob Storage. Refer to feature documentation and examples for usage details. ```bash pip install durabletask[azure-blob-payloads] ``` -------------------------------- ### Start Azurite Blob Service with Docker Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Starts the Azurite storage emulator for the blob service using a Docker image. This is an alternative to running Azurite directly. ```bash docker run -d -p 10000:10000 \ mcr.microsoft.com/azure-storage/azurite \ azurite-blob --blobHost 0.0.0.0 ``` -------------------------------- ### Start Azurite Local Storage Emulator Source: https://github.com/microsoft/durabletask-python/blob/main/examples/history_export/README.md Start the Azurite local Azure Storage emulator. This command ensures the emulator is running and listening on the specified blob port, which is required for the history export sample to function without a real Azure Storage account. ```bash azurite --silent --blobPort 10000 ``` -------------------------------- ### Install Local Packages (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Install the Durable Task Scheduler Python packages locally from the repository root using pip. ```bash pip install -e . -e ./durabletask-azuremanaged ``` -------------------------------- ### Start Jaeger (Docker) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Starts the Jaeger all-in-one Docker image, which serves the UI on port 16686 and accepts OTLP over gRPC on port 4317. ```bash docker run --name jaeger -d \ -p 4317:4317 \ -p 16686:16686 \ jaegertracing/all-in-one:latest ``` -------------------------------- ### Start Jaeger (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Starts the Jaeger all-in-one Docker image using PowerShell. This is an alternative to the bash command for Windows users. ```powershell docker run --name jaeger -d ` -p 4317:4317 ` -p 16686:16686 ` jaegertracing/all-in-one:latest ``` -------------------------------- ### Install Local Packages (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Install the Durable Task Scheduler Python packages locally from the repository root using pip. ```powershell pip install -e . -e .\durabletask-azuremanaged ``` -------------------------------- ### Install OpenTelemetry Extras Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Installs the OpenTelemetry optional dependency for the Durable Task Python SDK. This is required for emitting trace spans. ```bash pip install "durabletask[opentelemetry]" ``` -------------------------------- ### Run App Against Emulator Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Starts the Durable Task Scheduler application using the emulator. ```bash python -m src.app ``` -------------------------------- ### Run the History Export Sample Script Source: https://github.com/microsoft/durabletask-python/blob/main/examples/history_export/README.md Execute the main Python script for the history export sample. This script orchestrates the setup, execution, and monitoring of the history export process. ```bash python app.py ``` -------------------------------- ### Run Declarer App (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Execute the main application script to register sandbox activity metadata, start the orchestrator, and run the remote worker. ```bash python examples/sandboxes/main_app.py ``` -------------------------------- ### Start Azurite Blob Service Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Starts the Azurite storage emulator for the blob service. This is used to store large payloads when not using a live Azure Storage account. ```bash azurite-blob --location /tmp/azurite --blobPort 10000 ``` -------------------------------- ### Install Local Python Packages in Editable Mode Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Installs local Python packages in editable mode, useful when working with a local clone of the repository. Run from the repository root. ```bash pip install -e . -e ./durabletask-azuremanaged ``` -------------------------------- ### Run Tests Source: https://github.com/microsoft/durabletask-python/blob/main/docs/development.md Executes all project tests from the root directory. Requires 'make' to be installed. ```shell make test ``` -------------------------------- ### Configure InMemoryOrchestrationBackend Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Configures the in-memory backend with custom port and history size. The backend is started automatically when using `create_test_backend`. ```python backend = InMemoryOrchestrationBackend( port=50051, max_history_size=100000 # Support larger orchestrations ) backend.start() ``` ```python backend = create_test_backend(port=50051, max_history_size=10000) ``` -------------------------------- ### Install DurableTask with History Export Extension Source: https://github.com/microsoft/durabletask-python/blob/main/examples/history_export/README.md Install the durabletask library with the necessary extension for Azure history export. This command ensures all dependencies for the history export feature are included. ```bash pip install durabletask[history-export-azure] ``` -------------------------------- ### Install Azure Blob Payloads Dependency Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Install the optional dependency for Azure Blob Storage integration with Durable Task Python. ```bash pip install durabletask[azure-blob-payloads] ``` -------------------------------- ### Configure TracerProvider Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Configures the OpenTelemetry TracerProvider with a BatchSpanProcessor and an OTLPSpanExporter. This setup must occur before any spans are created. ```python from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.resources import Resource from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter resource = Resource.create({"service.name": "my-app"}) provider = TracerProvider(resource=resource) provider.add_span_processor( BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True)) ) trace.set_tracer_provider(provider) ``` -------------------------------- ### Install Local Packages in Editable Mode Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Installs local Python packages in editable mode, including OpenTelemetry extras and the durabletask-azuremanaged package. This is useful when working with a local clone of the repository. ```bash pip install -e ".[opentelemetry]" -e ./durabletask-azuremanaged ``` -------------------------------- ### Run Declarer App (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Execute the main application script to register sandbox activity metadata, start the orchestrator, and run the remote worker. ```powershell python examples\sandboxes\main_app.py ``` -------------------------------- ### Set Storage Connection String (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Sets the STORAGE_CONNECTION_STRING environment variable using Bash. This is used to configure the example to use a live Azure Storage account instead of Azurite. ```bash export STORAGE_CONNECTION_STRING="DefaultEndpointsProtocol=https;..." ``` -------------------------------- ### Implement Custom File System History Writer Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Implement the HistoryWriter protocol to send exports to custom destinations like the local file system. This example shows how to write history payloads to a specified directory. ```python from durabletask.extensions.history_export import HistoryWriter class LocalFileSystemHistoryWriter: def __init__(self, root_dir: str) -> None: self._root = root_dir def write( self, *, instance_id: str, container: str, blob_name: str, payload: bytes, content_type: str, content_encoding: str | None, ) -> None: import os # ``container`` is the destination's logical container name # (an ExportDestination.container). Per-job routing writers # combine it with ``blob_name``; writers that pin to a fixed # location at construction time may ignore it. path = os.path.join(self._root, container, blob_name) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fp: fp.write(payload) export_client = ExportHistoryClient( dt_client, LocalFileSystemHistoryWriter("/var/exports") ) ``` -------------------------------- ### Start Durable Task Worker Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Starts the Durable Task worker process. Ensure the TASKHUB and ENDPOINT environment variables are correctly set in your shell before execution. ```bash python3 ./worker.py ``` -------------------------------- ### Pull Emulator Docker Image Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Pulls the Docker image for the Durable Task Scheduler emulator. Ensure Docker is installed before running. ```bash docker pull mcr.microsoft.com/dts/dts-emulator:v0.0.6 ``` -------------------------------- ### Set Storage Connection String (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/large_payload/README.md Sets the STORAGE_CONNECTION_STRING environment variable using PowerShell. This is used to configure the example to use a live Azure Storage account instead of Azurite. ```powershell $env:STORAGE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;..." ``` -------------------------------- ### Start Durable Task Orchestrator Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Initiates the orchestrator process for Durable Task. This command should be run in a separate shell after the worker is active, with environment variables configured. ```bash python3 ./orchestrator.py ``` -------------------------------- ### Test External Event Handling in Orchestration Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Tests an orchestrator that waits for an external event. It schedules the orchestration, waits for it to start, raises an event, and then waits for completion, verifying the output. ```python def test_external_events(backend): client = TaskHubGrpcClient(host_address="localhost:50051") worker = TaskHubGrpcWorker(host_address="localhost:50051") def wait_for_event_orchestrator(ctx, _): event_data = yield ctx.wait_for_external_event("approval") return event_data worker.add_orchestrator(wait_for_event_orchestrator) worker.start() try: instance_id = client.schedule_new_orchestration(wait_for_event_orchestrator) # Wait for orchestration to start client.wait_for_orchestration_start(instance_id, timeout=5) # Raise an event client.raise_orchestration_event(instance_id, "approval", data="approved") # Wait for completion state = client.wait_for_orchestration_completion(instance_id, timeout=10) assert state.runtime_status == OrchestrationStatus.COMPLETED assert state.serialized_output == '"approved"' finally: worker.stop() ``` -------------------------------- ### Configure Logging for Durable Task Scheduler Worker Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Customize the logging output for Durable Task workers by providing a custom log handler and formatter. This example configures logs to be written to 'durable.log' at DEBUG level. ```python log_handler = logging.FileHandler('durable.log', encoding='utf-8') log_handler.setLevel(logging.DEBUG) with DurableTaskSchedulerWorker( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential, log_handler=log_handler, ) as w: ``` -------------------------------- ### Set Environment Variables for Azure Deployment Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Configures the TASKHUB and ENDPOINT environment variables required for connecting to a deployed Durable Task Scheduler. Adjust values based on your Azure setup. ```bash export TASKHUB= export ENDPOINT= ``` ```powershell $env:TASKHUB = "" $env:ENDPOINT = "" ``` -------------------------------- ### Enable Auto-Generated Work Item Filters Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Opt-in to work item filtering by calling use_work_item_filters() before starting the worker. This automatically generates filters based on the worker's registered orchestrators and activities. ```python with DurableTaskSchedulerWorker(...) as w: w.add_orchestrator(my_orchestrator) w.add_activity(my_activity) w.use_work_item_filters() # auto-generate from registry w.start() ``` -------------------------------- ### Configure BlobPayloadStore with Connection String Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Instantiate BlobPayloadStore using a connection string and configure options like container name and size thresholds. This store is then passed to the worker and client. ```python from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions store = BlobPayloadStore(BlobPayloadStoreOptions( connection_string="DefaultEndpointsProtocol=https;...", container_name="durabletank-payloads", # default threshold_bytes=900_000, # default (900 KB) max_stored_payload_bytes=10_485_760, # default (10 MB) enable_compression=True, # default )) with DurableTaskSchedulerWorker( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential, payload_store=store, ) as w: # ... register orchestrators and activities ... w.start() c = DurableTaskSchedulerClient( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential, payload_store=store, ) ``` -------------------------------- ### Create and Activate Virtual Environment (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Creates and activates a Python virtual environment using Bash. ```bash python -m venv .venv source .venv/bin/activate ``` -------------------------------- ### Create and Use In-Memory Backend for Testing Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Sets up an in-memory backend for testing Durable Task orchestrations. Requires pytest for fixtures. The backend is created, used, and then stopped and reset. ```python import pytest from durabletask.testing import create_test_backend from durabletask.client import TaskHubGrpcClient, OrchestrationStatus from durabletask.worker import TaskHubGrpcWorker @pytest.fixture def backend(): """Create an in-memory backend for testing.""" backend = create_test_backend(port=50051) yield backend backend.stop() backend.reset() def test_simple_orchestration(backend): # Create client and worker client = TaskHubGrpcClient(host_address="localhost:50051") worker = TaskHubGrpcWorker(host_address="localhost:50051") # Define orchestrator and activity def hello_orchestrator(ctx, _): result = yield ctx.call_activity(say_hello, input="World") return result def say_hello(ctx, name: str): return f"Hello, {name}!" # Register orchestrator and activity with the worker worker.add_orchestrator(hello_orchestrator) worker.add_activity(say_hello) # Start worker worker.start() try: # Schedule orchestration instance_id = client.schedule_new_orchestration(hello_orchestrator) # Wait for completion state = client.wait_for_orchestration_completion(instance_id, timeout=10) # Verify results assert state.runtime_status == OrchestrationStatus.COMPLETED assert state.serialized_output == '"Hello, World!"' finally: worker.stop() ``` -------------------------------- ### Create and Activate Virtual Environment (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Creates and activates a Python virtual environment using PowerShell. ```powershell python -m venv .venv .\.venv\Scripts\Activate.ps1 ``` -------------------------------- ### Folder Structure Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Illustrates the recommended folder structure for a Durable Task Python project using the in-memory backend. ```text in_memory_backend_example/ ├── README.md ├── requirements.txt ├── src/ │ ├── __init__.py │ ├── workflows.py # Orchestrators & activities (pure logic, no infra) │ └── app.py # Runs workflows against a real DTS backend └── test/ ├── __init__.py └── test_workflows.py # Unit tests using the in-memory backend ``` -------------------------------- ### Configure BlobPayloadStore with Account URL and Credential Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Alternatively, configure BlobPayloadStore using an account URL and an Azure identity TokenCredential for authentication. ```python from azure.identity import DefaultAzureCredential store = BlobPayloadStore(BlobPayloadStoreOptions( account_url="https://.blob.core.windows.net", credential=DefaultAzureCredential(), )) ``` -------------------------------- ### Configure Blob Payload Store Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Sets up the BlobPayloadStore for handling large payloads by configuring connection strings and options. ```python from durabletask.extensions.azure_blob_payloads import BlobPayloadStore, BlobPayloadStoreOptions from durabletask.azuremanaged.client import DurableTaskSchedulerClient from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker # Configure the blob payload store store = BlobPayloadStore(BlobPayloadStoreOptions( connection_string="DefaultEndpointsProtocol=https;...", )) ``` -------------------------------- ### Activate Python Virtual Environment (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Activates a Python virtual environment using the 'source' command. This is a recommended step for managing project dependencies. ```bash source .venv/bin/activate ``` -------------------------------- ### Test Orchestration with Random Ports Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Demonstrates how to use a random port for the in-memory backend to avoid conflicts, useful in test environments. The backend and port are yielded by the fixture. ```python import random import pytest from durabletask.testing import create_test_backend from durabletask.client import TaskHubGrpcClient from durabletask.worker import TaskHubGrpcWorker @pytest.fixture def backend(): # Use a random port to avoid conflicts port = random.randint(50000, 60000) backend = create_test_backend(port=port) yield backend, port backend.stop() backend.reset() def test_orchestration(backend): backend_instance, port = backend client = TaskHubGrpcClient(host_address=f"localhost:{port}") worker = TaskHubGrpcWorker(host_address=f"localhost:{port}") # ... ``` -------------------------------- ### Activate Python Virtual Environment (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Activates a Python virtual environment using PowerShell. This is a recommended step for managing project dependencies. ```powershell .\.venv\Scripts\Activate.ps1 ``` -------------------------------- ### Initialize Worker and Client with Payload Store Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Demonstrates initializing the DurableTaskSchedulerWorker and DurableTaskSchedulerClient with a payload store for handling large inputs. Payloads exceeding the threshold are automatically externalized to blob storage. ```python with DurableTaskSchedulerWorker( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential, payload_store=store, ) as w: w.add_orchestrator(my_orchestrator) w.add_activity(process_large_data) w.start() c = DurableTaskSchedulerClient( host_address=endpoint, secure_channel=secure_channel, taskhub=taskhub_name, token_credential=credential, payload_store=store, ) # This large input is automatically externalized to blob storage large_input = "x" * 1_000_000 # 1 MB string instance_id = c.schedule_new_orchestration(my_orchestrator, input=large_input) state = c.wait_for_orchestration_completion(instance_id, timeout=60) ``` -------------------------------- ### Create Azure Durable Task Scheduler Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Creates an Azure Durable Task Scheduler resource. Replace placeholders like , , and with your specific values. ```bash az durabletask scheduler create --resource-group \ --name \ --location --ip-allowlist "[0.0.0.0/0]" --sku-capacity 1 \ --sku-name "Dedicated" --tags "{'myattribute':'myvalue'}" ``` -------------------------------- ### Create Virtual Environment Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Creates a Python virtual environment named '.venv'. This is a standard practice for managing project dependencies. ```bash python -m venv .venv ``` -------------------------------- ### Configure and Run an Orchestration History Export Job Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Configure an Azure Blob writer and an export client, register the worker, create a batch export job for a time window, and wait for its completion. This snippet demonstrates setting up the writer, client, and initiating a job. ```python from datetime import datetime, timedelta, timezone from durabletask import client, worker from durabletask.extensions.history_export import ( ExportDestination, ExportFormat, ExportFormatKind, ExportHistoryClient, ExportJobCreationOptions, ExportMode, ) from durabletask.extensions.history_export.azure_blob import ( AzureBlobHistoryExportWriter, AzureBlobHistoryExportWriterOptions, ) writer = AzureBlobHistoryExportWriter( AzureBlobHistoryExportWriterOptions( container_name="orchestration-history", connection_string="DefaultEndpointsProtocol=https;...", ) ) dt_client = client.TaskHubGrpcClient(host_address="localhost:4001") export_client = ExportHistoryClient(dt_client, writer) with worker.TaskHubGrpcWorker(host_address="localhost:4001") as w: export_client.register_worker(w) w.start() now = datetime.now(timezone.utc) desc = export_client.create_job(ExportJobCreationOptions( mode=ExportMode.BATCH, completed_time_from=now - timedelta(days=1), completed_time_to=now, destination=ExportDestination(container="orchestration-history", prefix="2026-05"), format=ExportFormat(kind=ExportFormatKind.JSONL_GZIP), max_instances_per_batch=100, )) final = export_client.wait_for_job(desc.job_id, timeout=600) print(final.status, final.exported_instances, final.failed_instances) ``` -------------------------------- ### Set Environment Variables (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Configure necessary environment variables for the Durable Task Scheduler connection string and sandbox container image details before running the declarer app. ```bash export DURABLE_TASK_SCHEDULER_CONNECTION_STRING="Endpoint=https://;TaskHub=;Authentication=DefaultAzure" export DTS_SANDBOX_CONTAINER_IMAGE="" export DTS_SANDBOX_IMAGE_PULL_UMI_CLIENT_ID="" export DTS_SANDBOX_SCHEDULER_UMI_CLIENT_ID="" ``` -------------------------------- ### Create Azure Durable Task Scheduler (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Creates an Azure Durable Task Scheduler resource using the Azure CLI. Replace placeholders with your specific values. ```bash az durabletask scheduler create \ --resource-group \ --name \ --location \ --ip-allowlist "[0.0.0.0/0]" \ --sku-capacity 1 \ --sku-name "Dedicated" \ --tags "{'myattribute':'myvalue'}" ``` -------------------------------- ### Build Remote Worker Image (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Build the Docker image for the remote worker using the provided Containerfile and push it to a container registry. Ensure the image pull identity has access. ```bash docker build \ -f examples/sandboxes/Containerfile \ -t \ . docker push ``` -------------------------------- ### Create Azure Durable Taskhub (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Creates a taskhub within an existing Azure Durable Task Scheduler using the Azure CLI. Ensure the scheduler is already created. ```bash az durabletask taskhub create \ --resource-group \ --scheduler-name \ --name ``` -------------------------------- ### Version-Aware Orchestrator Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md A version-aware orchestrator capable of processing both old and new orchestration versions by checking the context version. ```python def my_orchestrator(ctx: task.OrchestrationContext, order: Order): """Version-aware dummy orchestrator capable of processing both old and new orchestrations""" yield ctx.call_activity(activity_one) if ctx.version > '1.0.0': yield ctx.call_activity(activity_three) yield ctx.call_activity(activity_two) ``` -------------------------------- ### Run Tests Source: https://github.com/microsoft/durabletask-python/blob/main/examples/in_memory_backend_example/README.md Executes all tests in the 'test/' directory using pytest. ```bash pytest test/ ``` -------------------------------- ### Run Durable Task Emulator Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Runs the Durable Task Scheduler emulator in detached mode, exposing port 8080. Wait for the container to be ready before proceeding. ```bash docker run --name dtsemulator -d -p 8080:8080 mcr.microsoft.com/dts/dts-emulator:v0.0.6 ``` -------------------------------- ### Create Azure Durable Task Hub Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sub-orchestrations-with-fan-out-fan-in/README.md Creates a task hub within an existing Azure Durable Task Scheduler. Ensure the scheduler and resource group are already provisioned. ```bash az durabletask taskhub create --resource-group \ --scheduler-name --name ``` -------------------------------- ### Build Remote Worker Image (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Build the Docker image for the remote worker using the provided Containerfile and push it to a container registry. Ensure the image pull identity has access. ```powershell docker build ` -f examples\sandboxes\Containerfile ` -t ` . docker push ``` -------------------------------- ### Implement Custom Payload Store Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Subclass the abstract PayloadStore class to create a custom payload storage solution. Implement upload, download, and token validation methods. ```python from typing import Optional from durabletask.payload import PayloadStore, LargePayloadStorageOptions class MyPayloadStore(PayloadStore): def __init__(self, options: LargePayloadStorageOptions): self._options = options @property def options(self) -> LargePayloadStorageOptions: return self._options def upload(self, data: bytes, *, instance_id: Optional[str] = None) -> str: # Store data and return a unique token string ... async def upload_async( self, data: bytes, *, instance_id: Optional[str] = None, ) -> str: ... def download(self, token: str) -> bytes: # Retrieve data by token ... async def download_async(self, token: str) -> bytes: ... def is_known_token(self, value: str) -> bool: # Return True if the value looks like a token from this store ... ``` -------------------------------- ### Create Replay-Safe Logger in Orchestrator Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Use `create_replay_safe_logger` to obtain a logger that suppresses duplicate output during orchestrator replays. Pass an existing `logging.Logger` instance to this method. ```python import logging from durabletask import task logger = logging.getLogger("my_orchestrator") def my_orchestrator(ctx: task.OrchestrationContext, payload): replay_logger = ctx.create_replay_safe_logger(logger) replay_logger.info("Starting orchestration %s", ctx.instance_id) result = yield ctx.call_activity(my_activity, input=payload) replay_logger.info("Activity returned: %s", result) return result ``` -------------------------------- ### Set Environment Variables (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/sandboxes/README.md Configure necessary environment variables for the Durable Task Scheduler connection string and sandbox container image details before running the declarer app. ```powershell $env:DURABLE_TASK_SCHEDULER_CONNECTION_STRING = "Endpoint=https://;TaskHub=;Authentication=DefaultAzure" $env:DTS_SANDBOX_CONTAINER_IMAGE = "" $env:DTS_SANDBOX_IMAGE_PULL_UMI_CLIENT_ID = "" $env:DTS_SANDBOX_SCHEDULER_UMI_CLIENT_ID = "" ``` -------------------------------- ### Old Orchestrator Logic Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Illustrates the logic of an older version of an orchestrator function. ```python def my_orchestrator(ctx: task.OrchestrationContext, order: Order): """Dummy orchestrator function illustrating old logic""" yield ctx.call_activity(activity_one) yield ctx.call_activity(activity_two) return "Success" ``` -------------------------------- ### Configure Explicit Work Item Filters Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Fine-tune work item dispatching by providing explicit filters for orchestrations, activities, and entities. This allows specifying names and versions for precise control. ```python from durabletask.worker import ( WorkItemFilters, OrchestrationWorkItemFilter, ActivityWorkItemFilter, EntityWorkItemFilter, ) w.use_work_item_filters(WorkItemFilters( orchestrations=[ OrchestrationWorkItemFilter(name="my_orch", versions=["2.0.0"]), ], activities=[ ActivityWorkItemFilter(name="my_activity"), ], entities=[ EntityWorkItemFilter(name="my_entity"), ], )) ``` -------------------------------- ### Create Azure Durable Task Scheduler (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Creates an Azure Durable Task Scheduler resource using PowerShell. Replace placeholders with your specific values. ```powershell az durabletask scheduler create ` --resource-group ` --name ` --location ` --ip-allowlist "[0.0.0.0/0]" ` --sku-capacity 1 ` --sku-name "Dedicated" ` --tags "{'myattribute':'myvalue'}" ``` -------------------------------- ### Create Azure Durable Taskhub (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Creates a taskhub within an existing Azure Durable Task Scheduler using PowerShell. Ensure the scheduler is already created. ```powershell az durabletask taskhub create ` --resource-group ` --scheduler-name ` --name ``` -------------------------------- ### Define Class-Based Entity Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Use this syntax to define a class-based durable entity. Leverage get_state and set_state for state management. ```python # Class-based entity class Counter(entities.DurableEntity): def __init__(self): self.set_state(0) def add(self, amount: int): self.set_state(self.get_state(int, 0) + amount) def get(self): return self.get_state(int, 0) ``` -------------------------------- ### Orchestrator with Complete Logic Change Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Handles orchestrations where the logic has completely changed between versions, using version checks to execute different paths. ```python def my_orchestrator(ctx: task.OrchestrationContext, order: Order): if ctx.version == '1.0.0': yield ctx.call_activity(activity_one) yield ctx.call_activity(activity_two) return "Success yield ctx.call_activity(activity_one) yield ctx.call_activity(activity_three) yield ctx.call_activity(activity_two) return "Success" ``` -------------------------------- ### Pull DTS Emulator Docker Image Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Pulls the latest Docker image for the Durable Task Scheduler emulator. This is used for local development and testing. ```bash docker pull mcr.microsoft.com/dts/dts-emulator:latest ``` -------------------------------- ### Human Interaction and Durable Timers Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Orchestrator function demonstrating waiting for external events and creating timers for timeouts. Supports custom business objects. ```python def purchase_order_workflow(ctx: task.OrchestrationContext, order: Order): """Orchestrator function that represents a purchase order workflow""" # Orders under $1000 are auto-approved if order.Cost < 1000: return "Auto-approved" # Orders of $1000 or more require manager approval yield ctx.call_activity(send_approval_request, input=order) # Approvals must be received within 24 hours or they will be cancelled. approval_event = ctx.wait_for_external_event("approval_received") timeout_event = ctx.create_timer(timedelta(hours=24)) winner = yield task.when_any([approval_event, timeout_event]) if winner == timeout_event: return "Cancelled" # The order was approved yield ctx.call_activity(place_order, input=order) approval_details = approval_event.get_result() return f"Approved by '{approval_details.approver}'" ``` -------------------------------- ### Test Durable Timers with Durable Task Python Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Tests the creation and firing of a durable timer within an orchestrator. Verifies that the orchestrator resumes after the specified delay. ```python def test_durable_timers(backend): import time from datetime import timedelta client = TaskHubGrpcClient(host_address="localhost:50051") worker = TaskHubGrpcWorker(host_address="localhost:50051") def timer_orchestrator(ctx, _): fire_at = ctx.current_utc_datetime + timedelta(seconds=1) yield ctx.create_timer(fire_at) return "timer_fired" worker.add_orchestrator(timer_orchestrator) worker.start() try: start_time = time.time() instance_id = client.schedule_new_orchestration(timer_orchestrator) state = client.wait_for_orchestration_completion(instance_id, timeout=10) elapsed = time.time() - start_time assert state.runtime_status == OrchestrationStatus.COMPLETED assert elapsed >= 1.0 # Timer should have waited at least 1 second finally: worker.stop() ``` -------------------------------- ### Manage a Specific Export Job Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Obtain a client for a specific export job using its ID to describe its status, wait for its completion, and delete it. This provides fine-grained control over individual export tasks. ```python job_client = export_client.get_job_client(desc.job_id) final = job_client.wait(timeout=600) print(final.status.value, final.exported_instances) job_client.delete() ``` -------------------------------- ### Set Azure Taskhub Environment Variables (Bash) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Sets environment variables for the taskhub name and endpoint, required for connecting to a deployed scheduler. Use the endpoint obtained from the Azure portal. ```bash export TASKHUB= export ENDPOINT= ``` -------------------------------- ### Fan-out/Fan-in Orchestration Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Orchestrator function that fans out dynamic work items in parallel and fans in the results. Uses `task.when_all` to wait for parallel tasks. ```python # activity function for getting the list of work items def get_work_items(ctx: task.ActivityContext, _) -> List[str]: # ... # activity function for processing a single work item def process_work_item(ctx: task.ActivityContext, item: str) -> int: # ... # orchestrator function that fans-out the work items and then fans-in the results def orchestrator(ctx: task.OrchestrationContext, _): # the number of work-items is unknown in advance work_items = yield ctx.call_activity(get_work_items) # fan-out: schedule the work items in parallel and wait for all of them to complete tasks = [ctx.call_activity(process_work_item, input=item) for item in work_items] results = yield task.when_all(tasks) # fan-in: summarize and return the results return {'work_items': work_items, 'results': results, 'total': sum(results)} ``` -------------------------------- ### Signal Entity from Client Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Invoke an entity operation using signal_entity from a Durable Task Scheduler client. This is a fire-and-forget operation. ```python c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True, taskhub=taskhub_name, token_credential=None) entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId") c.signal_entity(entity_id, "do_nothing") ``` -------------------------------- ### Signal Entity from Orchestrator Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Send a signal to an entity from within an orchestrator. This operation is fire-and-forget. ```python # Signal an entity (fire-and-forget) entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId") ctx.signal_entity(entity_id, operation_name="add", input=5) ``` -------------------------------- ### Function Chaining Orchestration Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Orchestrator function that sequences activity calls. Requires activity functions to be defined separately. ```python # simple activity function that returns a greeting def hello(ctx: task.ActivityContext, name: str) -> str: return f'Hello {name}!' # orchestrator function that sequences the activity calls def sequence(ctx: task.OrchestrationContext, _): result1 = yield ctx.call_activity(hello, input='Tokyo') result2 = yield ctx.call_activity(hello, input='Seattle') result3 = yield ctx.call_activity(hello, input='London') return [result1, result2, result3] ``` -------------------------------- ### Define Function-Based Entity Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Use this syntax to define a function-based durable entity. Ensure operations like 'delete' are implemented manually. ```python # Funtion-based entity def counter(ctx: entities.EntityContext, input: int): state = ctx.get_state(int, 0) if ctx.operation == "add": state += input ctx.set_state(state) elif operation == "get": return state ``` -------------------------------- ### Flush Spans Before Exiting Source: https://github.com/microsoft/durabletask-python/blob/main/examples/distributed-tracing/README.md Forces the TracerProvider to flush any buffered spans before the application exits. This prevents potential loss of trace data. ```python provider.force_flush() ``` -------------------------------- ### Test Sub-Orchestrations with Durable Task Python Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Tests a parent orchestrator that calls two child orchestrators. Ensures the parent correctly aggregates results from sub-orchestrations. ```python def test_sub_orchestrations(backend): client = TaskHubGrpcClient(host_address="localhost:50051") worker = TaskHubGrpcWorker(host_address="localhost:50051") def parent_orchestrator(ctx, _): result1 = yield ctx.call_sub_orchestrator(child_orchestrator, input=1) result2 = yield ctx.call_sub_orchestrator(child_orchestrator, input=2) return result1 + result2 def child_orchestrator(ctx, input: int): return input * 2 worker.add_orchestrator(parent_orchestrator) worker.add_orchestrator(child_orchestrator) worker.start() try: instance_id = client.schedule_new_orchestration(parent_orchestrator) state = client.wait_for_orchestration_completion(instance_id, timeout=10) assert state.runtime_status == OrchestrationStatus.COMPLETED assert state.serialized_output == "6" # (1*2) + (2*2) finally: worker.stop() ``` -------------------------------- ### List Failed Export Jobs Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Enumerate existing export jobs, filtering specifically for those that have failed. This is useful for identifying and debugging issues with past export operations. ```python from durabletask.extensions.history_export import ExportJobQuery, ExportJobStatus for desc in export_client.list_jobs( ExportJobQuery(status=[ExportJobStatus.FAILED]) ): print(desc.job_id, desc.last_error) ``` -------------------------------- ### Set Azure Taskhub Environment Variables (PowerShell) Source: https://github.com/microsoft/durabletask-python/blob/main/examples/README.md Sets environment variables for the taskhub name and endpoint in PowerShell, required for connecting to a deployed scheduler. Use the endpoint obtained from the Azure portal. ```powershell $env:TASKHUB = "" $env:ENDPOINT = "" ``` -------------------------------- ### Call Entity from Orchestrator Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Invoke an entity operation from within an orchestrator and wait for the result. This is a synchronous call. ```python # Call an entity (wait for result) entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId") result = yield ctx.call_entity(entity_id, operation_name="get") ``` -------------------------------- ### Test Orchestration Termination with Durable Task Python Source: https://github.com/microsoft/durabletask-python/blob/main/durabletask/testing/README.md Tests the termination of a long-running orchestrator. Verifies that the orchestration's status changes to TERMINATED. ```python def test_orchestration_termination(backend): client = TaskHubGrpcClient(host_address="localhost:50051") worker = TaskHubGrpcWorker(host_address="localhost:50051") def long_running_orchestrator(ctx, _): yield ctx.wait_for_external_event("never_happens") return "completed" worker.add_orchestrator(long_running_orchestrator) worker.start() try: instance_id = client.schedule_new_orchestration(long_running_orchestrator) # Wait for it to start client.wait_for_orchestration_start(instance_id, timeout=5) # Terminate it client.terminate_orchestration(instance_id, output="terminated_by_test") # Verify termination state = client.wait_for_orchestration_completion(instance_id, timeout=10) assert state.runtime_status == OrchestrationStatus.TERMINATED finally: worker.stop() ``` -------------------------------- ### Explicit Work Item Filters Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Configures a Durable Task worker with explicit work item filters, allowing for specific version constraints on orchestrations and activities. ```python from durabletask.worker import ( WorkItemFilters, OrchestrationWorkItemFilter, ActivityWorkItemFilter, ) w.use_work_item_filters(WorkItemFilters( orchestrations=[ OrchestrationWorkItemFilter( name="greeting_orchestrator", versions=["2.0.0"], ), ], activities=[ ActivityWorkItemFilter(name="greet"), ], )) ``` -------------------------------- ### Auto-Generated Work Item Filters Source: https://github.com/microsoft/durabletask-python/blob/main/docs/supported-patterns.md Configures a Durable Task worker to automatically generate work item filters based on the registered orchestrators and activities. ```python with DurableTaskSchedulerWorker(...) as w: w.add_orchestrator(greeting_orchestrator) w.add_activity(greet) w.use_work_item_filters() # auto-generate from registry w.start() ``` -------------------------------- ### Clear Work Item Filters Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Reset work item filtering to the default behavior, where the worker receives all work items, by passing None to use_work_item_filters(). ```python w.use_work_item_filters(None) ``` -------------------------------- ### Lock Entities for Exclusive Access Source: https://github.com/microsoft/durabletask-python/blob/main/docs/features.md Acquire a lock on multiple entities to ensure exclusive access during critical operations within an orchestrator. ```python with (yield ctx.lock_entities([entity_id_1, entity_id_2])): # Perform entity call operations that require exclusive access ... ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.