### Cache Configuration JSON Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Example JSON structure for cache configuration, including host, port, and password. ```json { "CACHE_HOST": "127.0.0.1", "CACHE_PORT": "6379", "CACHE_PASSWORD": "cache-secret" } ``` -------------------------------- ### Makefile for Redis Docker Setup Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Makefile snippet to set up a temporary Redis Docker container for testing and automatically remove it after tests complete. ```makefile test-idempotency-redis: docker run --name test-idempotency-redis -d -p 63005:6379 redis pytest test_with_real_redis.py;docker stop test-idempotency-redis;docker rm test-idempotency-redis ``` -------------------------------- ### Test Idempotency with Real Redis Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Example of setting up and using the Idempotent decorator with a real Redis client for integration testing. ```python from dataclasses import dataclass import pytest import redis from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( RedisCachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext @pytest.fixture def lambda_context(): @dataclass class LambdaContext: function_name: str = "test" memory_limit_in_mb: int = 128 invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" def get_remaining_time_in_millis(self) -> int: return 1000 return LambdaContext() @pytest.fixture def persistence_store_standalone_redis(): # init a Real Redis client and connect to the Port set in the Makefile redis_client = redis.Redis( host="localhost", port="63005", decode_responses=True, ) # return a persistence layer with real Redis return RedisCachePersistenceLayer(client=redis_client) def test_idempotent_lambda(lambda_context, persistence_store_standalone_redis): # Establish persistence layer using the real redis client persistence_layer = persistence_store_standalone_redis # setup idempotent with redis persistence layer @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): print("expensive operation") return { "payment_id": 12345, "message": "success", "statusCode": 200, } # Inovke the sim lambda handler result = lambda_handler({"testkey": "testvalue"}, lambda_context) assert result["payment_id"] == 12345 ``` -------------------------------- ### Initialize CachePersistenceLayer with Existing Redis Client Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Use an existing Redis client to initialize the CachePersistenceLayer. This allows integration with your existing Redis setup. ```python import os from dataclasses import dataclass, field from uuid import uuid4 from redis import Redis from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( CachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext cache_endpoint = os.getenv("CACHE_CLUSTER_ENDPOINT", "localhost") client = Redis( host=cache_endpoint, port=6379, socket_connect_timeout=5, socket_timeout=5, max_connections=1000, ) persistence_layer = CachePersistenceLayer(client=client) @dataclass class Payment: user_id: str product_id: str payment_id: str = field(default_factory=lambda: f"{uuid4()}") class PaymentError(Exception): ... @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): try: payment: Payment = create_subscription_payment(event) return { "payment_id": payment.payment_id, "message": "success", "statusCode": 200, } except Exception as exc: raise PaymentError(f"Error creating payment {str(exc)}") ``` -------------------------------- ### Idempotency Payload Example Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency A sample JSON payload demonstrating the structure expected for idempotency checks. This payload includes user and product identifiers. ```json { "user_id": "xyz", "product_id": "123456789" } ``` -------------------------------- ### Idempotency Utility with DynamoDB Persistence Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Example of configuring the idempotency utility with a DynamoDBPersistenceLayer. This snippet shows the basic setup for a Lambda handler decorated with the @idempotent decorator. ```python from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, idempotent, ) from aws_lambda_powertools.utilities.typing import LambdaContext persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable") @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): print("expensive operation") return { "payment_id": 12345, "message": "success", "statusCode": 200, } ``` -------------------------------- ### Create Subscription Payment Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Example of creating a payment object from an event dictionary. This function is a basic building block for processing payments and assumes the event structure matches the Payment class. ```python def create_subscription_payment(event: dict) -> Payment: return Payment(**event) ``` -------------------------------- ### Sample API Gateway HTTP Event Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency An example of an API Gateway HTTP event payload used with the validator and idempotency decorators. ```json { "version": "2.0", "routeKey": "$default", "rawPath": "/my/path", "rawQueryString": "parameter1=value1¶meter1=value2¶meter2=value", "cookies": [ "cookie1", "cookie2" ], "headers": { "Header1": "value1", "Header2": "value1,value2" }, "queryStringParameters": { "parameter1": "value1,value2", "parameter2": "value" }, "requestContext": { "accountId": "123456789012", "apiId": "api-id", "authentication": { "clientCert": { "clientCertPem": "CERT_CONTENT", "subjectDN": "www.example.com", "issuerDN": "Example issuer", "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", "validity": { "notBefore": "May 28 12:30:02 2019 GMT", "notAfter": "Aug 5 09:36:04 2021 GMT" } } }, "authorizer": { "jwt": { "claims": { "claim1": "value1", "claim2": "value2" }, "scopes": [ "scope1", "scope2" ] } }, "domainName": "id.execute-api.us-east-1.amazonaws.com", "domainPrefix": "id", "http": { "method": "POST", "path": "/my/path", "protocol": "HTTP/1.1", "sourceIp": "192.168.0.1/32", "userAgent": "agent" }, "requestId": "id", "routeKey": "$default", "stage": "$default", "time": "12/Mar/2020:19:03:58 +0000", "timeEpoch": 1583348638390 }, "body": "{\"message\": \"hello world\", \"username\": \"tom\"}", "pathParameters": { "parameter1": "value1" }, "isBase64Encoded": false, "stageVariables": { "stageVariable1": "value1", "stageVariable2": "value2" } } ``` -------------------------------- ### Implement Custom Persistence Layer Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Inherit from `BasePersistenceLayer` and implement the required methods to create your own persistence store. This example shows how to integrate with AWS DynamoDB. ```python import datetime import logging from typing import Any, Dict, Optional import boto3 from botocore.config import Config from aws_lambda_powertools.utilities.idempotency import BasePersistenceLayer from aws_lambda_powertools.utilities.idempotency.exceptions import ( IdempotencyItemAlreadyExistsError, IdempotencyItemNotFoundError, ) from aws_lambda_powertools.utilities.idempotency.persistence.base import DataRecord logger = logging.getLogger(__name__) class MyOwnPersistenceLayer(BasePersistenceLayer): def __init__( self, table_name: str, key_attr: str = "id", expiry_attr: str = "expiration", status_attr: str = "status", data_attr: str = "data", validation_key_attr: str = "validation", boto_config: Optional[Config] = None, boto3_session: Optional[boto3.session.Session] = None, ): boto3_session = boto3_session or boto3.session.Session() self._ddb_resource = boto3_session.resource("dynamodb", config=boto_config) self.table_name = table_name self.table = self._ddb_resource.Table(self.table_name) self.key_attr = key_attr self.expiry_attr = expiry_attr self.status_attr = status_attr self.data_attr = data_attr self.validation_key_attr = validation_key_attr super().__init__() def _item_to_data_record(self, item: Dict[str, Any]) -> DataRecord: """ Translate raw item records from DynamoDB to DataRecord Parameters ---------- item: Dict[str, Union[str, int]] Item format from dynamodb response Returns ------- DataRecord representation of item """ return DataRecord( idempotency_key=item[self.key_attr], status=item[self.status_attr], expiry_timestamp=item[self.expiry_attr], response_data=item.get(self.data_attr, ""), payload_hash=item.get(self.validation_key_attr, ""), ) def _get_record(self, idempotency_key) -> DataRecord: response = self.table.get_item(Key={self.key_attr: idempotency_key}, ConsistentRead=True) try: item = response["Item"] except KeyError: raise IdempotencyItemNotFoundError return self._item_to_data_record(item) def _put_record(self, data_record: DataRecord) -> None: item = { self.key_attr: data_record.idempotency_key, self.expiry_attr: data_record.expiry_timestamp, self.status_attr: data_record.status, } if self.payload_validation_enabled: item[self.validation_key_attr] = data_record.payload_hash now = datetime.datetime.now() try: logger.debug(f"Putting record for idempotency key: {data_record.idempotency_key}") self.table.put_item( Item=item, ConditionExpression=f"attribute_not_exists({self.key_attr}) OR {self.expiry_attr} < :now", ExpressionAttributeValues={":now": int(now.timestamp())}, ) except self._ddb_resource.meta.client.exceptions.ConditionalCheckFailedException: logger.debug(f"Failed to put record for already existing idempotency key: {data_record.idempotency_key}") raise IdempotencyItemAlreadyExistsError def _update_record(self, data_record: DataRecord): logger.debug(f"Updating record for idempotency key: {data_record.idempotency_key}") ``` -------------------------------- ### Sample SQS Event Payload for Batch Processing Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency An example SQS event payload demonstrating the structure expected by the Batch Processor. It includes a 'Records' array, with each record containing essential information like 'messageId', 'receiptHandle', and 'body'. ```json { "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "replace-to-pass-gitleak", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": { "testAttr": { "stringValue": "100", "binaryValue": "base64Str", "dataType": "Number" } }, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] } ``` -------------------------------- ### Idempotent Function for Batch Processing with SQS Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Integrate idempotency with AWS Batch processing, specifically for SQS events. This example uses DynamoDB for persistence and configures the idempotency key to be extracted from the 'messageId' of the SQS record. ```python import os from typing import Any, Dict from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function, ) from aws_lambda_powertools.utilities.typing import LambdaContext processor = BatchProcessor(event_type=EventType.SQS) table = os.getenv("IDEMPOTENCY_TABLE", "") dynamodb = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig(event_key_jmespath="messageId") @idempotent_function(data_keyword_argument="record", config=config, persistence_store=dynamodb) def record_handler(record: SQSRecord): return {"message": record.body} def lambda_handler(event: Dict[str, Any], context: LambdaContext): config.register_lambda_context(context) # see Lambda timeouts section return process_partial_response( event=event, context=context, processor=processor, record_handler=record_handler, ) ``` -------------------------------- ### Cache SSL Connections with Valkey Client Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how to configure and use a Valkey client for caching SSL connections, fetching configuration from AWS Secrets Manager. ```python from __future__ import annotations from typing import Any from glide import BackoffStrategy, GlideClient, GlideClientConfiguration, NodeAddress, ServerCredentials from aws_lambda_powertools.utilities import parameters from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( CachePersistenceLayer, ) cache_values: dict[str, Any] = parameters.get_secret("cache_info", transform="json") client_config = GlideClientConfiguration( addresses=[ NodeAddress( host=cache_values.get("CACHE_HOST", "localhost"), port=cache_values.get("CACHE_PORT", 6379), ), ], credentials=ServerCredentials( password=cache_values.get("CACHE_PASSWORD", ""), ), request_timeout=10, use_tls=True, reconnect_strategy=BackoffStrategy(num_of_retries=10, factor=2, exponent_base=1), ) valkey_client = GlideClient.create(config=client_config) persistence_layer = CachePersistenceLayer(client=valkey_client) # type: ignore[arg-type] config = IdempotencyConfig( expires_after_seconds=2 * 60, # 2 minutes ) @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event, context): return {"message": "Hello"} ``` -------------------------------- ### Initialize CachePersistenceLayer with Existing Valkey Glide Client Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Use an existing Valkey Glide client to initialize the CachePersistenceLayer. This is useful when you have a pre-configured Glide client. ```python import os from dataclasses import dataclass, field from uuid import uuid4 from glide import GlideClient, GlideClientConfiguration, NodeAddress from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( CachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext cache_endpoint = os.getenv("CACHE_CLUSTER_ENDPOINT", "localhost") client_config = GlideClientConfiguration( addresses=[ NodeAddress( host="localhost", port=6379, ), ], ) client = GlideClient.create(config=client_config) persistence_layer = CachePersistenceLayer(client=client) # type: ignore[arg-type] @dataclass class Payment: user_id: str product_id: str payment_id: str = field(default_factory=lambda: f"{uuid4()}") class PaymentError(Exception): ... @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): try: payment: Payment = create_subscription_payment(event) return { "payment_id": payment.payment_id, "message": "success", "statusCode": 200, } except Exception as exc: raise PaymentError(f"Error creating payment {str(exc)}") def create_subscription_payment(event: dict) -> Payment: return Payment(**event) ``` -------------------------------- ### Initialize CachePersistenceLayer with Endpoint and Port Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Initialize CachePersistenceLayer with your Cache endpoint and port to connect. SSL connections are enforced by default; set ssl=False to disable. ```python import os from dataclasses import dataclass, field from uuid import uuid4 from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( CachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext redis_endpoint = os.getenv("CACHE_CLUSTER_ENDPOINT", "localhost") persistence_layer = CachePersistenceLayer(host=redis_endpoint, port=6379) @dataclass class Payment: user_id: str product_id: str payment_id: str = field(default_factory=lambda: f"{uuid4()}") class PaymentError(Exception): ... @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): try: payment: Payment = create_subscription_payment(event) return { "payment_id": payment.payment_id, "message": "success", "statusCode": 200, } except Exception as exc: raise PaymentError(f"Error creating payment {str(exc)}") from exc def create_subscription_payment(event: dict) -> Payment: return Payment(**event) ``` -------------------------------- ### Testing Idempotency with Mock Redis Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Illustrates how to test the idempotency utility with a mocked Redis client using `mock-redis-py`. This allows for local testing without a running Redis instance. ```python from dataclasses import dataclass import pytest from mock_redis import MockRedis from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( RedisCachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext @pytest.fixture def lambda_context(): @dataclass class LambdaContext: function_name: str = "test" memory_limit_in_mb: int = 128 invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" def get_remaining_time_in_millis(self) -> int: return 1000 return LambdaContext() def test_idempotent_lambda(lambda_context): # Init the Mock redis client redis_client = MockRedis(decode_responses=True) # Establish persistence layer using the mock redis client persistence_layer = RedisCachePersistenceLayer(client=redis_client) # setup idempotent with redis persistence layer @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): print("expensive operation") return { "payment_id": 12345, "message": "success", "statusCode": 200, } # Inovke the sim lambda handler result = lambda_handler({"testkey": "testvalue"}, lambda_context) assert result["payment_id"] == 12345 ``` -------------------------------- ### Sample Event Body for Custom Expiration Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency This is a sample event body used with the custom expiration configuration. It contains user and product IDs. ```json { "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}" } ``` -------------------------------- ### Test Idempotency with DynamoDB Local Client Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how to configure tests to use DynamoDB Local by replacing the default boto3 client with a client pointing to the local endpoint. This is useful for integration testing the idempotency layer. ```python from dataclasses import dataclass import app_test_dynamodb_local import boto3 import pytest @dataclass class LambdaContext: function_name: str = "test" memory_limit_in_mb: int = 128 invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" def get_remaining_time_in_millis(self) -> int: return 5 @pytest.fixture def lambda_context() -> LambdaContext: return LambdaContext() def test_idempotent_lambda(lambda_context): # Configure the boto3 to use the endpoint for the DynamoDB Local instance dynamodb_local_client = boto3.client("dynamodb", endpoint_url="http://localhost:8000") app_test_dynamodb_local.persistence_layer.client = dynamodb_local_client # If desired, you can use a different DynamoDB Local table name than what your code already uses # app.persistence_layer.table_name = "another table name" # noqa: ERA001 result = app_test_dynamodb_local.handler({"testkey": "testvalue"}, lambda_context) assert result["payment_id"] == 12345 ``` -------------------------------- ### Sample Event for Idempotent Decorator Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency A sample JSON payload to be used with the idempotent Lambda handler. ```json { "user_id": "xyz", "product_id": "123456789" } ``` -------------------------------- ### Sample API Gateway Event for Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency This is a sample API Gateway event payload used to test idempotency. It includes a JSON body with user and product IDs. ```json { "version": "2.0", "routeKey": "ANY /createpayment", "rawPath": "/createpayment", "rawQueryString": "", "headers": { "Header1": "value1", "Header2": "value2" }, "requestContext": { "accountId": "123456789012", "apiId": "api-id", "domainName": "id.execute-api.us-east-1.amazonaws.com", "domainPrefix": "id", "http": { "method": "POST", "path": "/createpayment", "protocol": "HTTP/1.1", "sourceIp": "ip", "userAgent": "agent" }, "requestId": "id", "routeKey": "ANY /createpayment", "stage": "$default", "time": "10/Feb/2021:13:40:43 +0000", "timeEpoch": 1612964443723 }, "body": "{\"user_id\":\"xyz\",\"product_id\":\"123456789\"}", "isBase64Encoded": false } ``` -------------------------------- ### Mocking DynamoDB I/O Operations Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how to mock DynamoDB client operations when testing the idempotency utility. This is useful for isolating your function logic from actual database interactions. ```python from dataclasses import dataclass from unittest.mock import MagicMock import app_test_io_operations import pytest @dataclass class LambdaContext: function_name: str = "test" memory_limit_in_mb: int = 128 invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" def get_remaining_time_in_millis(self) -> int: return 5 @pytest.fixture def lambda_context() -> LambdaContext: return LambdaContext() def test_idempotent_lambda(lambda_context): mock_client = MagicMock() app_test_io_operations.persistence_layer.client = mock_client result = app_test_io_operations.handler({"testkey": "testvalue"}, lambda_context) mock_client.put_item.assert_called() assert result ``` -------------------------------- ### Mock Redis Class for Testing Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency A mock Redis class simulating common operations for testing idempotency logic without a real Redis instance. ```python from typing import Dict import time as t # Mock redis class that includes all operations we used in Idempotency class MockRedis: def __init__(self, decode_responses, cache: Dict, **kwargs): self.cache = cache or {} self.expire_dict: Dict = {} self.decode_responses = decode_responses self.acl: Dict = {} self.username = "" def hset(self, name, mapping): self.expire_dict.pop(name, {}) self.cache[name] = mapping def from_url(self, url: str): pass def expire(self, name, time): self.expire_dict[name] = t.time() + time # return {} if no match def hgetall(self, name): if self.expire_dict.get(name, t.time() + 1) < t.time(): self.cache.pop(name, {}) return self.cache.get(name, {}) def get_connection_kwargs(self): return {"decode_responses": self.decode_responses} def auth(self, username, **kwargs): self.username = username def delete(self, name): self.cache.pop(name, {}) ``` -------------------------------- ### Sample Event for Idempotent Operation Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency This is a sample JSON event payload representing an order, used as input for an idempotent Lambda function. ```json { "order" : { "user_id": "xyz", "product_id": "123456789", "quantity": 2, "value": 30 } } ``` -------------------------------- ### Configure Redis Cache Persistence Layer Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Customize attribute names for Redis cache persistence. Use this when initializing the RedisCachePersistenceLayer to define how in-progress expiry, status, data, and validation keys are stored. ```python import os from aws_lambda_powertools.utilities.idempotency import ( idempotent, ) from aws_lambda_powertools.utilities.idempotency.persistence.redis import ( RedisCachePersistenceLayer, ) from aws_lambda_powertools.utilities.typing import LambdaContext redis_endpoint = os.getenv("REDIS_CLUSTER_ENDPOINT", "localhost") persistence_layer = RedisCachePersistenceLayer( host=redis_endpoint, port=6379, in_progress_expiry_attr="in_progress_expiration", status_attr="status", data_attr="data", validation_key_attr="validation", ) @idempotent(persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext) -> dict: return event ``` -------------------------------- ### Cache SSL Connections with Redis Client and Local Certificates Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Shows how to use a Redis client for caching SSL connections with local certificate files. Configuration is fetched from AWS Secrets Manager. ```python from __future__ import annotations from typing import Any from redis import Redis from aws_lambda_powertools.shared.functions import abs_lambda_path from aws_lambda_powertools.utilities import parameters from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent from aws_lambda_powertools.utilities.idempotency.persistence.cache import ( CachePersistenceLayer, ) cache_values: dict[str, Any] = parameters.get_secret("cache_info", transform="json") redis_client = Redis( host=cache_values.get("REDIS_HOST", "localhost"), port=cache_values.get("REDIS_PORT", 6379), password=cache_values.get("REDIS_PASSWORD"), decode_responses=True, socket_timeout=10.0, ssl=True, retry_on_timeout=True, ssl_certfile=f"{abs_lambda_path()}/certs/cache_user.crt", ssl_keyfile=f"{abs_lambda_path()}/certs/cache_user_private.key", ssl_ca_certs=f"{abs_lambda_path()}/certs/cache_ca.pem", ) persistence_layer = CachePersistenceLayer(client=redis_client) config = IdempotencyConfig( expires_after_seconds=2 * 60, # 2 minutes ) @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event, context): return {"message": "Hello"} ``` -------------------------------- ### Handling Exceptions with Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how unhandled exceptions lead to idempotency record deletion, while handled exceptions outside the decorated function maintain idempotency. Use this to understand failure modes. ```python import os import requests from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function, ) from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyPersistenceLayerError from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig() @idempotent_function(data_keyword_argument="data", config=config, persistence_store=persistence_layer) def call_external_service(data: dict): # Any exception raised will lead to idempotency record to be deleted result: requests.Response = requests.post( "https://jsonplaceholder.typicode.com/comments/", json=data, ) return result.json() def lambda_handler(event: dict, context: LambdaContext): try: call_external_service(data=event) except IdempotencyPersistenceLayerError as e: # No idempotency, but you can decide to error differently. raise RuntimeError(f"Oops, can't talk to persistence layer. Permissions? error: {e}") # This exception will not impact the idempotency of 'call_external_service' # because it happens in isolation, or outside their scope. raise SyntaxError("Oops, this shouldn't be here.") ``` -------------------------------- ### Handle Lambda Timeouts with Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Use `register_lambda_context` from your idempotency config to include remaining invocation time in the idempotency record. This prevents extended failures if a Lambda times out before completing an invocation. ```python import os from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord from aws_lambda_powertools.utilities.idempotency ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function, ) from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig() @idempotent_function(data_keyword_argument="record", persistence_store=persistence_layer, config=config) def record_handler(record: SQSRecord): return {"message": record["body"]} def lambda_handler(event: dict, context: LambdaContext): config.register_lambda_context(context) return record_handler(event) ``` -------------------------------- ### Use Custom Boto3 Session for Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Pass a custom `boto3.session.Session` object to `DynamoDBPersistenceLayer` to configure the underlying boto3 client. This allows for custom AWS configurations or credentials. ```python import os import boto3 from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent, ) from aws_lambda_powertools.utilities.typing import LambdaContext # See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#module-boto3.session boto3_session = boto3.session.Session() table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table, boto3_session=boto3_session) config = IdempotencyConfig(event_key_jmespath="body") @idempotent(persistence_store=persistence_layer, config=config) def lambda_handler(event: dict, context: LambdaContext) -> dict: return event ``` -------------------------------- ### Idempotent Lambda Handler with JMESPath Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency This snippet demonstrates an idempotent Lambda function that extracts user and product ID from the event body using JMESPath. It requires a configured persistence store and IdempotencyConfig. ```python from aws_lambda_powertools.utilities.idempotency import IdempotencyConfig, idempotent from aws_lambda_powertools.utilities.typing import LambdaContext from dataclasses import dataclass, field import json from uuid import uuid4 # Assume persistence_layer is configured elsewhere # persistence_layer = ... config = IdempotencyConfig(event_key_jmespath='powertools_json(body).["user_id", "product_id"]') @dataclass class Payment: user_id: str product_id: str payment_id: str = field(default_factory=lambda: f"{uuid4()}") class PaymentError(Exception): ... @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): try: payment_info: str = event.get("body", "") payment: Payment = create_subscription_payment(json.loads(payment_info)) return { "payment_id": payment.payment_id, "message": "success", "statusCode": 200, } except Exception as exc: raise PaymentError(f"Error creating payment {str(exc)}") def create_subscription_payment(event: dict) -> Payment: return Payment(**event) ``` -------------------------------- ### AWS SAM Template for Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Defines a DynamoDB table with TTL and a Lambda function that uses it. The Lambda function is granted permissions to interact with the DynamoDB table. ```yaml Transform: AWS::Serverless-2016-10-31 Resources: IdempotencyTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: id AttributeType: S KeySchema: - AttributeName: id KeyType: HASH TimeToLiveSpecification: AttributeName: expiration Enabled: true BillingMode: PAY_PER_REQUEST HelloWorldFunction: Type: AWS::Serverless::Function Properties: Runtime: python3.12 Handler: app.py Policies: - Statement: - Sid: AllowDynamodbReadWrite Effect: Allow Action: - dynamodb:PutItem - dynamodb:GetItem - dynamodb:UpdateItem - dynamodb:DeleteItem Resource: !GetAtt IdempotencyTable.Arn Environment: Variables: IDEMPOTENCY_TABLE: !Ref IdempotencyTable ``` -------------------------------- ### Enable In-Memory Cache for Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Configure idempotency to use local in-memory caching. This cache is local to each Lambda execution environment and can be adjusted for capacity. ```python import os from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent, ) from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig( event_key_jmespath="powertools_json(body)", # by default, it holds 256 items in a Least-Recently-Used (LRU) manner use_local_cache=True, ) @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event, context: LambdaContext): return event ``` -------------------------------- ### Configuring Idempotency with Custom Expiration Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency This snippet shows how to configure the Idempotency utility with a custom expiration window of 24 hours using `expires_after_seconds`. It also sets the `event_key_jmespath` to 'body'. ```python import os from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent, ) from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig( event_key_jmespath="body", expires_after_seconds=24 * 60 * 60, # 24 hours ) @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event, context: LambdaContext): return event ``` -------------------------------- ### Customize Idempotency Key Prefix in Standalone Functions Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Use the `key_prefix` parameter in the `@idempotent_function` decorator to define a custom prefix for your Idempotency Key. This is useful when applying idempotency to standalone functions. ```python import os from dataclasses import dataclass from aws_lambda_powertools.utilities.idempotency ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function, ) from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") dynamodb = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig(event_key_jmespath="order_id") # see Choosing a payload subset section @dataclass class OrderItem: sku: str description: str @dataclass class Order: item: OrderItem order_id: int @idempotent_function( data_keyword_argument="order", config=config, persistence_store=dynamodb, key_prefix="my_custom_prefix", ) def process_order(order: Order): return f"processed order {order.order_id}" def lambda_handler(event: dict, context: LambdaContext): # see Lambda timeouts section config.register_lambda_context(context) order_item = OrderItem(sku="fake", description="sample") order = Order(item=order_item, order_id=1) # `order` parameter must be called as a keyword argument to work process_order(order=order) ``` -------------------------------- ### Idempotent Function with Pydantic Serialization Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how to use the @idempotent_function decorator with Pydantic models for input and output serialization. This is useful when your Lambda function processes structured data defined by Pydantic models. ```python import os from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent_function, ) from aws_lambda_powertools.utilities.idempotency.serialization.pydantic import PydanticSerializer from aws_lambda_powertools.utilities.parser import BaseModel from aws_lambda_powertools.utilities.typing import LambdaContext table = os.getenv("IDEMPOTENCY_TABLE", "") dynamodb = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig(event_key_jmespath="order_id") # see Choosing a payload subset section class OrderItem(BaseModel): sku: str description: str class Order(BaseModel): item: OrderItem order_id: int class OrderOutput(BaseModel): order_id: int @idempotent_function( data_keyword_argument="order", config=config, persistence_store=dynamodb, output_serializer=PydanticSerializer(model=OrderOutput), ) def process_order(order: Order): return OrderOutput(order_id=order.order_id) def lambda_handler(event: dict, context: LambdaContext): config.register_lambda_context(context) # see Lambda timeouts section order_item = OrderItem(sku="fake", description="sample") order = Order(item=order_item, order_id=1) # `order` parameter must be called as a keyword argument to work process_order(order=order) ``` -------------------------------- ### Use Custom Botocore Config for Idempotency Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Pass a custom `botocore.config.Config` object to `DynamoDBPersistenceLayer` to configure the underlying botocore client. This is useful for setting region, retries, or other low-level client configurations. ```python import os from botocore.config import Config from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent, ) from aws_lambda_powertools.utilities.typing import LambdaContext # See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html#botocore-config boto_config = Config() table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table, boto_config=boto_config) config = IdempotencyConfig(event_key_jmespath="body") @idempotent(persistence_store=persistence_layer, config=config) def lambda_handler(event: dict, context: LambdaContext) -> dict: return event ``` -------------------------------- ### AWS CDK Construct for Idempotency Table Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency A Python construct for defining an Idempotency DynamoDB table and granting necessary permissions to a Lambda role. It enables Time To Live (TTL) and point-in-time recovery. ```python from aws_cdk import RemovalPolicy from aws_cdk import aws_dynamodb as dynamodb from aws_cdk import aws_iam as iam from constructs import Construct class IdempotencyConstruct(Construct): def __init__(self, scope: Construct, name: str, lambda_role: iam.Role) -> None: super().__init__(scope, name) self.idempotency_table = dynamodb.Table( self, "IdempotencyTable", partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, removal_policy=RemovalPolicy.DESTROY, time_to_live_attribute="expiration", point_in_time_recovery=True, ) self.idempotency_table.grant( lambda_role, "dynamodb:PutItem", "dynamodb:GetItem", "dynamodb:UpdateItem", "dynamodb:DeleteItem", ) ``` -------------------------------- ### Idempotency with DynamoDB Persistence and Payload Validation Source: https://docs.powertools.aws.dev/lambda/python/latest/utilities/idempotency Demonstrates how to configure idempotency with a DynamoDB persistence layer and payload validation using JMESPath. Use this when you need to ensure that a specific part of the event payload is validated against previous invocations to prevent misleading results due to changes in non-critical fields. ```python import os from dataclasses import dataclass, field from uuid import uuid4 from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.idempotency import ( DynamoDBPersistenceLayer, IdempotencyConfig, idempotent, ) from aws_lambda_powertools.utilities.idempotency.exceptions import IdempotencyValidationError from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() table = os.getenv("IDEMPOTENCY_TABLE", "") persistence_layer = DynamoDBPersistenceLayer(table_name=table) config = IdempotencyConfig( event_key_jmespath='["user_id", "product_id"]', payload_validation_jmespath="amount", ) @dataclass class Payment: user_id: str product_id: str charge_type: str amount: int payment_id: str = field(default_factory=lambda: f"{uuid4()}") class PaymentError(Exception): ... @idempotent(config=config, persistence_store=persistence_layer) def lambda_handler(event: dict, context: LambdaContext): try: payment: Payment = create_subscription_payment(event) return { "payment_id": payment.payment_id, "message": "success", "statusCode": 200, } except IdempotencyValidationError: logger.exception("Payload tampering detected", payment=payment, failure_type="validation") return { "message": "Unable to process payment at this time. Try again later.", "statusCode": 500, } except Exception as exc: raise PaymentError(f"Error creating payment {str(exc)}") def create_subscription_payment(event: dict) -> Payment: return Payment(**event) ```