### Example: Starting Workflows and Getting Results in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Shows how to start a workflow without parameters and another with arguments, custom timeouts, and memo data. It also demonstrates how to retrieve the final result from the workflow handle. ```python # Start simple workflow with no parameters handle = await client.start_workflow( my_workflow, id="workflow-123", task_queue="my-queue" ) # Start workflow with parameters handle = await client.start_workflow( my_workflow_with_args, args=("arg1", 42), id="workflow-456", task_queue="my-queue", execution_timeout=timedelta(hours=1), memo={"key": "value"} ) # Get the workflow result later result = await handle.result() ``` -------------------------------- ### Start Activity Example (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Use this example to start an activity directly from the client and retrieve its result, specifying an activity ID and a start-to-close timeout. ```python handle = await client.start_activity( my_activity, id="activity-123", start_to_close_timeout=timedelta(seconds=30) ) result = await handle.result() ``` -------------------------------- ### Execute Workflow Example (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Use this example to start a workflow and wait for its completion, retrieving the result. ```python result = await client.execute_workflow( my_workflow, id="workflow-789", task_queue="my-queue" ) print(f"Workflow returned: {result}") ``` -------------------------------- ### Create Schedule Example (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Use this example to define and create a new schedule that starts a workflow, specifying the workflow, ID, and task queue. ```python from temporalio.client import Schedule, ScheduleActionStartWorkflow schedule = Schedule( action=ScheduleActionStartWorkflow( workflow=my_workflow, id="workflow-", task_queue="my-queue" ), spec=... # ScheduleSpec with calendar/interval ) handle = await client.create_schedule("my-schedule", schedule) ``` -------------------------------- ### Get Workflow Handle Example (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Use this example to obtain a handle for an existing workflow and then wait for its result. ```python handle = client.get_workflow_handle("existing-workflow-id") result = await handle.result() ``` -------------------------------- ### Install poethepoet with uv Source: https://github.com/temporalio/sdk-python/blob/main/README.md Installs `poethepoet` using `uv`, a prerequisite for building the SDK. ```bash uv tool install poethepoet ``` -------------------------------- ### Install all Python SDK dependencies for building Source: https://github.com/temporalio/sdk-python/blob/main/README.md Installs all project dependencies, including optional extras, using `uv` for the build process. ```bash uv sync --all-extras ``` -------------------------------- ### Install Temporal Python SDK wheel file Source: https://github.com/temporalio/sdk-python/blob/main/README.md Installs the pre-built Temporal Python SDK wheel file into a virtual environment. ```bash pip install /path/to/cloned/sdk-python/dist/*.whl ``` -------------------------------- ### Client.config() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Get the configuration used to create this client. This method returns a copy of the client's configuration, which can be either the initial setup or the currently active modified configuration. ```APIDOC ## Client.config() ### Description Retrieves the configuration settings that were used to initialize this client. You can request either the initial configuration or the currently active configuration, which may have been modified. ### Method Signature ```python def config(self, *, active_config: bool = False) -> ClientConfig ``` ### Parameters - **active_config** (bool) - Optional - Default: False - If true, return modified active config; if false, return initial config ### Returns **ClientConfig** - A shallow copy of the client's configuration. ### Example ```python config = client.config() print(f"Using namespace: {config['namespace']}") ``` ``` -------------------------------- ### WorkflowEnvironment.start() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Start an in-memory test environment. ```APIDOC ## WorkflowEnvironment.start() ### Description Start an in-memory test environment. ### Method Signature ```python @staticmethod async def start() -> WorkflowEnvironment ``` ### Returns - **WorkflowEnvironment** - Started test environment ### Example ```python from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker async def test_my_workflow(): env = await WorkflowEnvironment.start() try: async with Worker( env.client, task_queue="test-queue", workflows=[MyWorkflow], activities=[my_activity], ) as worker: result = await env.client.execute_workflow( MyWorkflow, id="test-1", task_queue="test-queue" ) assert result == expected_value finally: await env.shutdown() ``` ``` -------------------------------- ### Test Workflow with WorkflowEnvironment.start() (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Demonstrates how to start a WorkflowEnvironment, create a Worker, execute a workflow, and shut down the environment for testing. ```python from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker async def test_my_workflow(): env = await WorkflowEnvironment.start() try: async with Worker( env.client, task_queue="test-queue", workflows=[MyWorkflow], activities=[my_activity], ) as worker: result = await env.client.execute_workflow( MyWorkflow, id="test-1", task_queue="test-queue" ) assert result == expected_value finally: await env.shutdown() ``` -------------------------------- ### Client.__init__() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Initialize a client directly from a service client. Most users should use `Client.connect()` instead for a more common setup. ```APIDOC ## Client.__init__() ### Description Initializes a client directly from a service client. This method is typically used for advanced scenarios, as most users should opt for `Client.connect()` for standard client setup. ### Method Signature ```python def __init__( self, service_client: temporalio.service.ServiceClient, *, namespace: str = "default", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, plugins: Sequence[Plugin] = [], interceptors: Sequence[Interceptor] = [], default_workflow_query_reject_condition: None | temporalio.common.QueryRejectCondition = None, header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC, ) -> None ``` ### Parameters - **service_client** (ServiceClient) - Required - Raw gRPC service client to Temporal server - **namespace** (str) - Optional - Default: "default" - Temporal namespace for calls - **data_converter** (DataConverter) - Optional - Default: DataConverter.default - Converter for serialization - **plugins** (Sequence[Plugin]) - Optional - Default: [] - Client configuration plugins - **interceptors** (Sequence[Interceptor]) - Optional - Default: [] - Call interceptors - **default_workflow_query_reject_condition** (QueryRejectCondition | None) - Optional - Default: None - Default query rejection condition - **header_codec_behavior** (HeaderCodecBehavior) - Optional - Default: NO_CODEC - Header encoding behavior ``` -------------------------------- ### Complete Schedule Creation and Management Example in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md This example demonstrates how to create, list, trigger, pause, unpause, and delete a Temporal Schedule using the Python SDK. ```python from temporalio.client import ( Client, Schedule, ScheduleActionStartWorkflow, ScheduleSpec, ScheduleCalendarSpec, ScheduleRange, SchedulePolicy, ScheduleOverlapPolicy ) from datetime import datetime, timedelta async def main(): client = await Client.connect("localhost:7233") # Create schedule for daily reports schedule = Schedule( action=ScheduleActionStartWorkflow( workflow=daily_report_workflow, args=(), id="report-", # Suffix with timestamp task_queue="reports", task_timeout=timedelta(minutes=30), ), spec=ScheduleSpec( calendars=[ ScheduleCalendarSpec( hour=[ScheduleRange(start=9)], # 9 AM day_of_week=[ScheduleRange(start=1, end=5)], # Weekdays ) ], timezone="America/New_York", start_at=datetime(2024, 1, 1), ), policy=SchedulePolicy( overlap=ScheduleOverlapPolicy.SKIP, pause_on_failure=False, ), note="Daily report generation" ) # Create the schedule handle = await client.create_schedule("daily-reports", schedule) # List schedules async for list_item in await client.list_schedules(): print(f"Schedule: {list_item.schedule.id}") # Trigger immediately await handle.trigger() # Pause temporarily await handle.pause("Maintenance") # Resume await handle.unpause() # Delete when done # await handle.delete() asyncio.run(main()) ``` -------------------------------- ### Install Temporal Python SDK with Pydantic Support Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/converter.md Install the Temporal Python SDK with the `pydantic` extra to enable support for Pydantic models and the PydanticPayloadConverter. ```bash pip install temporalio[pydantic] ``` -------------------------------- ### Connect Client and Start Workflow in Python Source: https://github.com/temporalio/sdk-python/blob/main/README.md Connects to a Temporal server and starts a workflow. Use `start_workflow` with a single positional argument or the `args` keyword for multiple arguments in non-type-safe calls. ```python from temporalio.client import Client async def main(): # Create client connected to server at the given address and namespace client = await Client.connect("localhost:7233", namespace="my-namespace") # Start a workflow handle = await client.start_workflow(MyWorkflow.run, "some arg", id="my-workflow-id", task_queue="my-task-queue") # Wait for result result = await handle.result() print(f"Result: {result}") ``` -------------------------------- ### Example: Accessing Client Configuration in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Demonstrates how to retrieve the client's configuration and access specific details, such as the configured namespace. ```python config = client.config() print(f"Using namespace: {config['namespace']}") ``` -------------------------------- ### Install Temporal with LangSmith Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/langsmith/README.md Install the Temporal Python SDK with the LangSmith feature enabled using pip. ```bash uv add temporalio[langsmith] ``` -------------------------------- ### Client.start_activity() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Start an activity execution directly from client. ```APIDOC ## Client.start_activity() ### Description Start an activity execution directly from client. ### Method async def start_activity( self, activity: CallableType, *, id: str | None = None, activity_id: str | None = None, ... ) -> AsyncActivityHandle[ReturnType] ### Parameters - **activity** (Callable) - Required - Activity function - **id** (str | None) - Optional - Activity ID (replaces activity_id if set) - **activity_id** (str | None) - Optional - Activity ID for the execution - **schedule_to_close_timeout** (timedelta | None) - Optional - Maximum time for activity - **start_to_close_timeout** (timedelta | None) - Optional - Max time from start to completion - **heartbeat_timeout** (timedelta | None) - Optional - Max time between heartbeats ### Request Example ```python handle = await client.start_activity( my_activity, id="activity-123", start_to_close_timeout=timedelta(seconds=30) ) result = await handle.result() ``` ### Response #### Return Value `AsyncActivityHandle[ReturnType]` #### Response Example `handle` (type `AsyncActivityHandle[ReturnType]`) - Handle to activity execution ``` -------------------------------- ### Examples of Defining Activities with @activity.defn Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/activity.md These examples demonstrate how to define asynchronous, synchronous, and dynamic activities using the `@activity.defn` decorator, showcasing custom naming and handling of arbitrary arguments. ```python from temporalio import activity from datetime import timedelta @activity.defn async def send_email(email: str, subject: str) -> bool: # Activity logic return True ``` ```python @activity.defn(name="custom_name") def sync_activity(value: int) -> str: # Synchronous activity return str(value) ``` ```python @activity.defn(dynamic=True) def dynamic_activity(args: Sequence[RawValue]) -> RawValue: # Dynamic activity receives arbitrary arguments return args[0] ``` -------------------------------- ### Install wheel package for Python environment Source: https://github.com/temporalio/sdk-python/blob/main/README.md Installs the `wheel` package, required for installing `.whl` files. ```bash pip install wheel ``` -------------------------------- ### Install Dependencies for Temporalio LangGraph Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/langgraph/README.md Use `uv sync` to install all necessary dependencies, including extra packages, for the Temporalio LangGraph project. ```sh uv sync --all-extras ``` -------------------------------- ### workflow.start_activity() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Start an activity and return a handle without waiting. ```APIDOC ## workflow.start_activity() ### Description Start an activity and return a handle without waiting. ### Signature ```python async def start_activity( activity: CallableType, *, schedule_to_close_timeout: timedelta | None = None, ... ) -> ActivityHandle[ReturnType] ``` ### Parameters - **activity** (Callable) - Activity function with optional parameters - **schedule_to_close_timeout** (timedelta | None) - Max time for activity scheduling and execution - **start_to_close_timeout** (timedelta | None) - Max time from start to completion - **heartbeat_timeout** (timedelta | None) - Max time between heartbeats - **retry_policy** (RetryPolicy | None) - Retry configuration - **activity_id** (str | None) - Activity execution ID - **cancellation_type** (ActivityCancellationType) - How to handle cancellation ### Returns `ActivityHandle[ReturnType]` — Handle to activity (can await result later) ### Example ```python handle = await workflow.start_activity( my_activity, args=("arg1",), start_to_close_timeout=timedelta(seconds=30) ) # Do other work... result = await handle.result() ``` ``` -------------------------------- ### Example ScheduleIntervalSpec for Hourly Execution in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md This example shows how to configure a ScheduleIntervalSpec to trigger every hour. ```python # Every hour ScheduleIntervalSpec(every=timedelta(hours=1)) ``` -------------------------------- ### Start an Activity Asynchronously and Get Handle in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Call `workflow.start_activity()` to initiate an activity, perform other tasks, and then await the result from the returned handle. ```python handle = await workflow.start_activity( my_activity, args=("arg1",), start_to_close_timeout=timedelta(seconds=30) ) # Do other work... result = await handle.result() ``` -------------------------------- ### Define WorkflowEnvironment.start() Method Signature (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Defines the static asynchronous method to start an in-memory Temporal test environment. ```python @staticmethod async def start() -> WorkflowEnvironment ``` -------------------------------- ### Install OpenTelemetry dependencies (pip) Source: https://github.com/temporalio/sdk-python/blob/main/README.md Install the necessary `opentelemetry` dependencies for tracing support in the Temporal Python SDK. ```bash pip install 'temporalio[opentelemetry]' ``` -------------------------------- ### Define Asynchronous Activity Start with workflow.start_activity() in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md This overload defines parameters for starting an activity and immediately returning an `ActivityHandle` without waiting for completion. ```python async def start_activity( activity: CallableType, *, schedule_to_close_timeout: timedelta | None = None, ... ) -> ActivityHandle[ReturnType] ``` -------------------------------- ### Run basic Temporal Python workflow example Source: https://github.com/temporalio/sdk-python/blob/main/README.md Executes the Python script containing the basic Temporal workflow definition. ```bash python example.py ``` -------------------------------- ### Install LangGraph Plugin for Temporal Python SDK Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/langgraph/README.md Use `uv add` to install the LangGraph plugin along with its dependencies for the Temporal Python SDK. ```sh uv add temporalio[langgraph] ``` -------------------------------- ### Install OTEL Dependencies for OpenAI Agents Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/openai_agents/README.md Install the necessary packages for OpenTelemetry integration with OpenAI agents. Choose specific exporters based on your monitoring needs. ```bash pip install openinference-instrumentation-openai-agents opentelemetry-sdk ``` ```bash # For OTLP (works with most OTEL collectors and monitoring systems) pip install opentelemetry-exporter-otlp # For Console output (development/debugging) pip install opentelemetry-exporter-console # Other exporters available for specific systems pip install opentelemetry-exporter- ``` -------------------------------- ### Client.execute_workflow() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Start a workflow and wait for its completion. ```APIDOC ## Client.execute_workflow() ### Description Start a workflow and wait for its completion. ### Method async def execute_workflow( self, workflow: CallableAsyncNoParam[AwaitableReturnType], *, id: str, task_queue: str, ... ) -> AwaitableReturnType ### Parameters Same as `start_workflow()`, plus polling options. ### Request Example ```python result = await client.execute_workflow( my_workflow, id="workflow-789", task_queue="my-queue" ) print(f"Workflow returned: {result}") ``` ### Response #### Return Value The return value of the workflow #### Response Example `AwaitableReturnType` ``` -------------------------------- ### Install Temporal Strands Agents Plugin Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/strands/README.md Use `uv` to add the `temporalio[strands-agents]` package, which includes the Strands Agents plugin for Temporal. ```sh uv add temporalio[strands-agents] ``` -------------------------------- ### Start Workflow Execution Method Signature in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Starts a workflow execution asynchronously and returns a handle. This specific overload shows the signature for workflows that do not require any parameters. ```python @overload async def start_workflow( self, workflow: CallableAsyncNoParam[AwaitableReturnType], *, id: str, task_queue: str, execution_timeout: timedelta | None = None, run_timeout: timedelta | None = None, task_timeout: timedelta | None = None, ... ) -> WorkflowHandle[AwaitableReturnType]: ... ``` -------------------------------- ### Run and Signal a Temporal Workflow from a Client in Python Source: https://github.com/temporalio/sdk-python/blob/main/README.md Use this client-side Python code to start a workflow, send signals to update its state, and await its final result. The `id` and `task_queue` parameters are required when starting a workflow. ```python from temporalio.client import Client from my_workflow_package import GreetingWorkflow async def create_greeting(client: Client) -> str: # Start the workflow handle = await client.start_workflow(GreetingWorkflow.run, "my name", id="my-workflow-id", task_queue="my-task-queue") # Change the salutation await handle.signal(GreetingWorkflow.update_salutation, "Aloha") # Tell it to complete await handle.signal(GreetingWorkflow.complete_with_greeting) # Wait and return result return await handle.result() ``` -------------------------------- ### Start a Workflow in Python (Non-blocking) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/README.md Connects to the Temporal server and initiates a workflow execution, returning a handle immediately without waiting for completion. ```python from temporalio.client import Client client = await Client.connect("localhost:7233") # Start workflow (returns immediately) handle = await client.start_workflow( my_workflow, args=("arg1",), id="wf-1", task_queue="my-queue" ) # Later, get result result = await handle.result() ``` -------------------------------- ### Worker Setup with GoogleAdkPlugin Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/google_adk_agents/README.md Configure the Temporal client and worker on the worker side to use the GoogleAdkPlugin. This enables the worker to handle ADK-related tasks. ```python from temporalio.client import Client from temporalio.worker import Worker from temporalio.contrib.google_adk_agents import GoogleAdkPlugin client = await Client.connect( "localhost:7233", plugins=[ GoogleAdkPlugin(), ], ) worker = Worker( client, task_queue="my-queue", ) ``` -------------------------------- ### Connecting to Temporal Server (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Examples demonstrating how to connect to a local development server, Temporal Cloud with an API key, and a custom-configured Temporal server using `Client.connect()`. ```python import asyncio from temporalio.client import Client async def main(): # Connect to local development server client = await Client.connect("localhost:7233") # Connect to Temporal Cloud with API key cloud_client = await Client.connect( "my-namespace.a1b2c.tmprl.cloud:7233", api_key="your-api-key", namespace="my-namespace" ) # Custom configuration custom_client = await Client.connect( "temporal.example.com:7233", namespace="production", tls=True, identity="my-service-v1" ) ``` -------------------------------- ### Client.get_workflow_handle() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Get a handle to an existing workflow execution by ID (without starting it). ```APIDOC ## Client.get_workflow_handle() ### Description Get a handle to an existing workflow execution by ID (without starting it). ### Method def get_workflow_handle( self, workflow_id: str, *, run_id: str | None = None ) -> WorkflowHandle[Any] ### Parameters - **workflow_id** (str) - Required - Workflow execution ID - **run_id** (str | None) - Optional - Specific run ID; if not specified, uses current run ### Request Example ```python handle = client.get_workflow_handle("existing-workflow-id") result = await handle.result() ``` ### Response #### Return Value `WorkflowHandle[Any]` #### Response Example `handle` (type `WorkflowHandle[Any]`) - Handle to existing workflow ``` -------------------------------- ### Example of @workflow.run Decorator in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Use this decorator to designate the entry point for your workflow's execution logic within a class. ```python from temporalio import workflow class MyWorkflow: @workflow.run async def main(self, name: str) -> str: return f"Hello, {name}!" ``` -------------------------------- ### Get Workflow Memo Signature (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Defines the signature for `workflow.memo()`, which retrieves the memo dictionary attached to the workflow at start. ```python def memo() -> Mapping[str, Any] ``` -------------------------------- ### Define ActivityEnvironment.default_info() Method Signature (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Defines the static method to get default activity info for testing activities. ```python @staticmethod def default_info() -> activity.Info ``` -------------------------------- ### Define and Use Pytest AsyncIO Fixture for Temporal Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Use this fixture to manage the lifecycle of a Temporal WorkflowEnvironment for tests. It ensures the environment starts before tests and shuts down afterward, simplifying test setup. ```python import pytest @pytest.fixture async def temporal_env(): env = await WorkflowEnvironment.start() yield env await env.shutdown() @pytest.mark.asyncio async def test_with_fixture(temporal_env): async with Worker( temporal_env.client, task_queue="test-queue", workflows=[MyWorkflow], ) as worker: result = await temporal_env.client.execute_workflow( MyWorkflow, id="test", task_queue="test-queue" ) ``` -------------------------------- ### Define a Temporal Python Workflow with decorators Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/README.md This example shows how to define a workflow class and its methods using @workflow.defn, @workflow.run, @workflow.signal, and @workflow.query decorators. ```python @workflow.defn class MyWorkflow: @workflow.run async def main(self, name: str) -> str: return f"Hello, {name}" @workflow.signal async def my_signal(self, value: str): self.state = value @workflow.query async def my_query(self) -> str: return self.state ``` -------------------------------- ### Test Workflow with Signal Received (Python) Source: https://github.com/temporalio/sdk-python/blob/main/README.md This test demonstrates how to start a workflow and send a signal using the time-skipping environment, verifying the workflow's signal handling. ```python from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker async def test_signal_workflow(): async with await WorkflowEnvironment.start_time_skipping() as env: async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]): # Start workflow, send signal, check result handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1") await handle.signal(SignalWorkflow.some_signal) assert "got signal" == await handle.result() ``` -------------------------------- ### Initialize Runtime with Telemetry and Connect Client in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/service.md Demonstrates how to initialize a `Runtime` instance with `TelemetryConfig` for logging and then use it when connecting a Temporal client. ```python from temporalio.runtime import Runtime, TelemetryConfig runtime = Runtime( telemetry=TelemetryConfig( logging=LoggingConfig(level=logging.DEBUG) ) ) client = await Client.connect( "localhost:7233", runtime=runtime ) ``` -------------------------------- ### Get Default DataConverter and Use in Client (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/converter.md Demonstrates how to retrieve the default `DataConverter` and configure a Temporal client to use it for serialization and deserialization. ```python from temporalio.converter import DataConverter # Get default converter converter = DataConverter.default() # Use in client client = await Client.connect( "localhost:7233", data_converter=converter ) ``` -------------------------------- ### Start Strands Agent Workflow from Client Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/strands/README.md This client code connects to Temporal and executes the `MyWorkflow` defined in `workflow.py`, passing a prompt to the Strands Agent. ```python import asyncio from temporalio.client import Client from workflow import MyWorkflow async def main() -> None: client = await Client.connect("localhost:7233") result = await client.execute_workflow( MyWorkflow.run, "Hello", id="strands-quickstart", task_queue="strands", ) print(result) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Example ScheduleCalendarSpec for Weekday 9 AM in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md This example demonstrates how to configure a ScheduleCalendarSpec to trigger every weekday at 9 AM in a specific timezone. ```python # Every weekday at 9 AM spec = ScheduleSpec( calendars=[ ScheduleCalendarSpec( hour=[ScheduleRange(start=9)], day_of_week=[ScheduleRange(start=1, end=5)], # Mon-Fri ) ], timezone="America/New_York" ) ``` -------------------------------- ### Client.start_activity() Signature (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md This method allows direct execution of an activity from the client, returning an asynchronous handle. ```python async def start_activity( self, activity: CallableType, *, id: str | None = None, activity_id: str | None = None, ... ) -> AsyncActivityHandle[ReturnType] ``` -------------------------------- ### Configuring TLS for ServiceClient (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/service.md Illustrates how to instantiate `TLSConfig` with paths to client certificates, private keys, CA certificates, and a domain for server verification. ```python tls = TLSConfig( cert_path="/certs/client.pem", key_path="/certs/client-key.pem", ca_cert_path="/certs/ca.pem", domain="temporal.example.com" ) ``` -------------------------------- ### Create a Schedule in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/README.md Demonstrates how to define a new schedule using `Schedule` and `ScheduleSpec` and then create it with `client.create_schedule()`. ```python schedule = Schedule( action=ScheduleActionStartWorkflow(workflow=my_workflow, ...), spec=ScheduleSpec( calendars=[ScheduleCalendarSpec(hour=[ScheduleRange(start=9)])], timezone="UTC" ) ) handle = await client.create_schedule("my-schedule", schedule) ``` -------------------------------- ### Create a SimplePlugin Instance (Python) Source: https://github.com/temporalio/sdk-python/blob/main/README.md Instantiate a `SimplePlugin` with a name and a data converter for basic plugin functionality. ```python plugin = SimplePlugin( "MyPlugin", data_converter=converter, ) ``` -------------------------------- ### Start Child Workflow Signature (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Defines the signature for starting a child workflow without waiting for its result, returning a handle to the child workflow. ```python async def start_child_workflow( workflow: CallableAsyncNoParam[AwaitableReturnType], *, id: str | None = None, ... ) -> ChildWorkflowHandle[AwaitableReturnType] ``` -------------------------------- ### Execute the Temporal Worker Script Source: https://github.com/temporalio/sdk-python/blob/main/README.md This command starts the Python script that runs the Temporal worker. Ensure a Temporal server is running on localhost before execution. ```bash python run_worker.py ``` -------------------------------- ### Customizing JSONPlainPayloadConverter for Custom Types (Python) Source: https://github.com/temporalio/sdk-python/blob/main/README.md This example demonstrates how to extend the default JSON conversion to support custom types such as `IPv4Address` by providing a custom `json.JSONEncoder` and `JSONTypeConverter` to the `JSONPlainPayloadConverter`. ```python class IPv4AddressJSONEncoder(AdvancedJSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, ipaddress.IPv4Address): return str(o) return super().default(o) class IPv4AddressJSONTypeConverter(JSONTypeConverter): def to_typed_value( self, hint: Type, value: Any ) -> Union[Optional[Any], JSONTypeConverterUnhandled]: if issubclass(hint, ipaddress.IPv4Address): return ipaddress.IPv4Address(value) return JSONTypeConverter.Unhandled class IPv4AddressPayloadConverter(CompositePayloadConverter): def __init__(self) -> None: # Replace default JSON plain with our own that has our encoder and type # converter json_converter = JSONPlainPayloadConverter( encoder=IPv4AddressJSONEncoder, custom_type_converters=[IPv4AddressJSONTypeConverter()], ) super().__init__( *[ c if not isinstance(c, JSONPlainPayloadConverter) else json_converter for c in DefaultPayloadConverter.default_encoding_payload_converters ] ) my_data_converter = dataclasses.replace( DataConverter.default, payload_converter_class=IPv4AddressPayloadConverter, ) ``` -------------------------------- ### Configure S3 External Storage with aioboto3 Client Source: https://github.com/temporalio/sdk-python/blob/main/README.md Configures the Temporal client to use `S3StorageDriver` for external storage, offloading large payloads to an S3 bucket. This setup requires `aioboto3` for the S3 client. ```python import aioboto3 import dataclasses from temporalio.client import Client, ClientConfig from temporalio.contrib.aws.s3driver import S3StorageDriver from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client from temporalio.converter import DataConverter from temporalio.converter import ExternalStorage client_config = ClientConfig.load_client_connect_config() session = aioboto3.Session() async with session.client("s3") as s3_client: driver = S3StorageDriver( client=new_aioboto3_client(s3_client), bucket="my-bucket", ) client = await Client.connect( **client_config, data_converter=dataclasses.replace( DataConverter.default, external_storage=ExternalStorage(drivers=[driver]), ), ) ``` -------------------------------- ### Call Nexus Operations from a Temporal Workflow Source: https://github.com/temporalio/sdk-python/blob/main/README.md Create a Nexus client within a workflow to execute or start Nexus operations, then await their results. This demonstrates both `execute_operation` and `start_operation`. ```python @workflow.defn class CallerWorkflow: def __init__(self): self.nexus_client = workflow.create_nexus_client( service=MyNexusService, endpoint="my-nexus-endpoint" ) @workflow.run async def run(self, name: str) -> tuple[MyOutput, MyOutput]: # Start the Nexus operation and wait for the result in one go, using execute_operation. wf_result = await self.nexus_client.execute_operation( MyNexusService.my_workflow_run_operation, MyInput(name), ) # Or alternatively, obtain the operation handle using start_operation, # and then use it to get the result: sync_operation_handle = await self.nexus_client.start_operation( MyNexusService.my_sync_operation, MyInput(name), ) sync_result = await sync_operation_handle return sync_result, wf_result ``` -------------------------------- ### Basic Agent Setup with TemporalModel Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/google_adk_agents/README.md Configure your agent workflow to use TemporalModel for intercepting and executing model calls as Temporal activities. Ensure necessary imports for Temporal workflow and ADK Agent. ```python from temporalio.contrib.google_adk_agents import TemporalModel from temporalio.workflow import ActivityConfig from google.adk import Agent ``` -------------------------------- ### Implement Nexus Operation Handlers in Python Source: https://github.com/temporalio/sdk-python/blob/main/README.md Implement a service handler with `@sync_operation` for synchronous responses and `@workflow_run_operation` to start a Temporal workflow and return a handle. ```python @service_handler(service=MyNexusService) class MyNexusServiceHandler: @sync_operation async def my_sync_operation( self, ctx: StartOperationContext, input: MyInput ) -> MyOutput: return MyOutput(message=f"Hello {input.name} from sync operation!") @workflow_run_operation async def my_workflow_run_operation( self, ctx: WorkflowRunOperationContext, input: MyInput ) -> nexus.WorkflowHandle[MyOutput]: return await ctx.start_workflow( WorkflowStartedByNexusOperation.run, input, id=str(uuid.uuid4()), ) ``` -------------------------------- ### Connecting to Temporal Server with ServiceClient (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/service.md Demonstrates how to establish a secure connection to a Temporal server using `ServiceClient.connect()` with a `ConnectConfig` and `TLSConfig`. ```python from temporalio.service import ServiceClient, ConnectConfig, TLSConfig config = ConnectConfig( target_host="localhost:7233", tls=TLSConfig(), ) service = await ServiceClient.connect(config) ``` -------------------------------- ### Client.connect() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/client.md Connect to a Temporal server and return a configured client instance. ```APIDOC ## Client.connect()\n\n### Description\nConnect to a Temporal server and return a configured client instance.\n\n### Signature\n```python\n@classmethod\nasync def connect(\n cls,\n target_host: str,\n *,\n namespace: str = "default",\n api_key: str | None = None,\n data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,\n plugins: Sequence[Plugin] = [],\n interceptors: Sequence[Interceptor] = [],\n default_workflow_query_reject_condition: None | temporalio.common.QueryRejectCondition = None,\n tls: bool | TLSConfig | None = None,\n retry_config: RetryConfig | None = None,\n keep_alive_config: KeepAliveConfig | None = KeepAliveConfig.default,\n rpc_metadata: Mapping[str, str | bytes] = {},\n identity: str | None = None,\n lazy: bool = False,\n runtime: temporalio.runtime.Runtime | None = None,\n http_connect_proxy_config: HttpConnectProxyConfig | None = None,\n dns_load_balancing_config: DnsLoadBalancingConfig | None = None,\n header_codec_behavior: HeaderCodecBehavior = HeaderCodecBehavior.NO_CODEC,\n) -> Self\n```\n\n### Parameters\n- **target_host** (str) - Required - Host and port for the Temporal server (e.g., "localhost:7233")\n- **namespace** (str) - Optional - Default: "default" - Temporal namespace for client calls\n- **api_key** (str | None) - Optional - Default: None - API key for authentication; becomes "Authorization: Bearer" header\n- **data_converter** (DataConverter) - Optional - Default: DataConverter.default - Converter for serializing/deserializing workflow and activity arguments\n- **plugins** (Sequence[Plugin]) - Optional - Default: [] - Plugins chained during client creation; earlier plugins wrap later ones\n- **interceptors** (Sequence[Interceptor]) - Optional - Default: [] - Interceptors chained during client calls; earlier ones wrap later ones\n- **default_workflow_query_reject_condition** (QueryRejectCondition | None) - Optional - Default: None - Default rejection condition for workflow queries if not set at query time\n- **tls** (bool | TLSConfig | None) - Optional - Default: None - TLS configuration; None enables TLS if api_key is set, False disables TLS, True uses system default, or pass TLSConfig\n- **retry_config** (RetryConfig | None) - Optional - Default: None - Retry configuration for service calls\n- **keep_alive_config** (KeepAliveConfig | None) - Optional - Default: KeepAliveConfig.default - Keep-alive configuration (every 30s check, kill if no response in 15s)\n- **rpc_metadata** (Mapping[str, str | bytes]) - Optional - Default: {} - Headers for all server calls\n- **identity** (str | None) - Optional - Default: None - Identity for this client; defaults based on SDK version\n- **lazy** (bool) - Optional - Default: False - If true, connect only on first call (cannot be used with workers)\n- **runtime** (Runtime | None) - Optional - Default: None - Runtime for this client; uses default if unset\n- **http_connect_proxy_config** (HttpConnectProxyConfig | None) - Optional - Default: None - HTTP CONNECT proxy configuration\n- **dns_load_balancing_config** (DnsLoadBalancingConfig | None) - Optional - Default: None - DNS load balancing configuration (re-resolve every 30s by default)\n- **header_codec_behavior** (HeaderCodecBehavior) - Optional - Default: NO_CODEC - Encoding behavior for headers sent by client\n\n### Returns\n**Client** - Connected client instance\n\n### Example\n```python\nimport asyncio\nfrom temporalio.client import Client\n\nasync def main():\n # Connect to local development server\n client = await Client.connect("localhost:7233")\n \n # Connect to Temporal Cloud with API key\n cloud_client = await Client.connect(\n "my-namespace.a1b2c.tmprl.cloud:7233",\n api_key="your-api-key",\n namespace="my-namespace"\n )\n \n # Custom configuration\n custom_client = await Client.connect(\n "temporal.example.com:7233",\n namespace="production",\n tls=True,\n identity="my-service-v1"\n )\n``` ``` -------------------------------- ### Access Workflow Info Example (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/workflow.md Shows how to use `workflow.info()` to access and print details about the current workflow execution, such as workflow ID, run ID, and attempt number. ```python @workflow.run async def main(self): info = workflow.info() print(f"Workflow ID: {info.workflow_id}") print(f"Run ID: {info.run_id}") print(f"Attempt: {info.attempt}") ``` -------------------------------- ### Fail Standard Library Call on Restricted Object Source: https://github.com/temporalio/sdk-python/blob/main/README.md This example shows a low-level C Python validation failure when attempting to use a standard library call on a restricted object within the sandbox. ```python dict.items(os.__dict__) ``` -------------------------------- ### Test Workflow with Signals and Queries (Python) Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Shows how to test a workflow that uses signals and queries within the `WorkflowEnvironment`. ```python import asyncio from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker from temporalio import workflow @pytest.mark.asyncio async def test_workflow_with_signals(): env = await WorkflowEnvironment.start() try: async with Worker( env.client, task_queue="test-queue", workflows=[SignalWorkflow], ) as worker: # Start workflow handle = await env.client.start_workflow( SignalWorkflow, id="signal-test", task_queue="test-queue" ) # Send signal await handle.signal(my_signal, "value") # Query state state = await handle.query(get_state) # Wait for completion result = await handle.result() assert result == "completed" finally: await env.shutdown() ``` -------------------------------- ### ScheduleHandle.describe() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md Get detailed schedule information. ```APIDOC ## ScheduleHandle.describe() ### Description Get detailed schedule information. ### Signature ```python async def describe(self) -> ScheduleDescription ``` ### Returns ScheduleDescription - Schedule details ### Example ```python handle = client.get_schedule_handle("my-schedule") description = await handle.describe() print(f"Next action: {description.info.next_action_times}") ``` ``` -------------------------------- ### Configure TLS for Temporal Client in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/configuration.md Sets up TLS configuration with certificate paths, CA certificate, and domain, then connects a Temporal client using these settings. ```python from temporalio.service import TLSConfig tls_config = TLSConfig( cert_path="/path/to/cert.pem", key_path="/path/to/key.pem", ca_cert_path="/path/to/ca.pem", domain="temporal.example.com", server_name="temporal", ) client = await Client.connect( "temporal.example.com:7233", tls=tls_config ) ``` -------------------------------- ### Client.get_schedule_handle() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md Get handle to existing schedule by ID. ```APIDOC ## Client.get_schedule_handle() ### Description Get handle to existing schedule by ID. ### Signature ```python def get_schedule_handle(self, id: str) -> ScheduleHandle ``` ### Parameters - **id** (str) - Schedule ID ### Returns ScheduleHandle - Handle to schedule ### Example ```python handle = client.get_schedule_handle("my-schedule") description = await handle.describe() ``` ``` -------------------------------- ### Start Client-Side Trace for Workflow Execution Source: https://github.com/temporalio/sdk-python/blob/main/temporalio/contrib/openai_agents/README.md Use `plugin.tracing_context()` to start traces outside of a worker context. This ensures proper instrumentation and trace propagation into the workflow execution. The `trace` and `custom_span` context managers can be used for finer-grained tracing within the client code. ```python with plugin.tracing_context(): with trace("Customer support workflow"): with custom_span("Workflow execution"): # Execute workflow within the trace context result = await client.execute_workflow( CustomerSupportAgent.run, "Help me with my order", id="customer-support-123", task_queue="my-task-queue", ) print(f"Result: {result}") ``` -------------------------------- ### Runtime.__init__ Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/service.md Initializes a Temporal runtime instance, optionally configuring telemetry. ```APIDOC ## Runtime.__init__ ### Description Initializes a Temporal runtime instance. The runtime manages gRPC connections and the asyncio event loop. Telemetry can be configured during initialization. ### Signature `def __init__(self, telemetry: TelemetryConfig | None = None) -> None` ### Parameters - **telemetry** (TelemetryConfig | None) - Optional - Configuration for logging, tracing, and metrics. Defaults to `None`. ### Example ```python from temporalio.runtime import Runtime, TelemetryConfig import logging # Assuming logging is imported for LoggingConfig runtime = Runtime( telemetry=TelemetryConfig( logging=LoggingConfig(level=logging.DEBUG) ) ) ``` ``` -------------------------------- ### ActivityEnvironment.default_info() Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Get default activity info for testing. Returns activity info with defaults. ```APIDOC ## ActivityEnvironment.default_info() ### Description Get default activity info for testing. Returns activity info with defaults. ### Method Signature ```python @staticmethod def default_info() -> activity.Info ``` ### Returns - **activity.Info** - Activity info with defaults ### Example ```python from dataclasses import replace from temporalio.testing import ActivityEnvironment from temporalio import activity # Customize activity info info = ActivityEnvironment.default_info() custom_info = replace(info, activity_id="custom-123") ``` ``` -------------------------------- ### Create a Schedule with Client.create_schedule() in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/schedules.md Use this snippet to define and create a new schedule that starts a workflow hourly. Ensure `my_workflow` is defined and the client is connected. ```python from temporalio.client import Client, Schedule, ScheduleSpec, ScheduleAction, ScheduleActionStartWorkflow from datetime import timedelta client = await Client.connect("localhost:7233") schedule = Schedule( action=ScheduleActionStartWorkflow( workflow=my_workflow, args=(), id="scheduled-workflow-", task_queue="my-queue", ), spec=ScheduleSpec( intervals=[ScheduleIntervalSpec(every=timedelta(hours=1))] ), ) handle = await client.create_schedule("my-schedule", schedule) ``` -------------------------------- ### Implement a Download Activity with Heartbeating and Cancellation in Python Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/activity.md Provides a complete Python activity implementation for downloading a file, demonstrating how to use heartbeats for progress, handle cancellation requests, and resume from a specific byte on retry. ```python from temporalio import activity from datetime import timedelta import logging logger = logging.getLogger(__name__) @activity.defn async def download_file(url: str) -> bytes: """Download file from URL with heartbeating and cancellation support.""" info = activity.info() # Check if we're resuming from a retry try: start_byte = activity.get_heartbeat_details(int) logger.info(f"Resuming from byte {start_byte}") except: start_byte = 0 try: data = b"" async for chunk in fetch_url_chunks(url, start_from=start_byte): # Check for cancellation request if activity.is_cancelled(): logger.info("Download cancelled") raise activity.CancelledError() data += chunk # Send heartbeat with progress activity.heartbeat(len(data)) return data except Exception as e: logger.error(f"Download failed: {e}") raise ``` -------------------------------- ### WorkflowEnvironment.client Source: https://github.com/temporalio/sdk-python/blob/main/_autodocs/api-reference/testing.md Get client for the test environment. Returns a `Client` connected to the test environment. ```APIDOC ## WorkflowEnvironment.client ### Description Get client for the test environment. Returns a `Client` connected to the test environment. ### Property Declaration ```python @property def client(self) -> Client ``` ### Returns - **Client** - Client connected to the test environment ```