### Full Basic Example Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md A complete example demonstrating the setup of FastStream with TimersBroker, a subscriber, and timer scheduling. ```python from datetime import timedelta from faststream import FastStream from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) app = FastStream(broker) @broker.subscriber("reminders") async def handle_reminder(message: str) -> None: print(f"Reminder fired: {message}") @app.after_startup async def schedule_reminder() -> None: await broker.publish( "Call dentist", topic="reminders", activate_in=timedelta(hours=24), ) ``` -------------------------------- ### Start Redis with Docker Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/introduction/installation.md If you do not have a Redis instance running, you can start one using this Docker command. Ensure Redis version 5.0 or higher is used. ```bash docker run -d -p 6379:6379 redis:latest ``` -------------------------------- ### Install faststream-redis-timers with uv Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/introduction/installation.md Use this command to add the library if you are using uv as your package manager. ```bash uv add faststream-redis-timers ``` -------------------------------- ### Install faststream-redis-timers with poetry Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/introduction/installation.md Use this command to add the library if you are using poetry as your package manager. ```bash poetry add faststream-redis-timers ``` -------------------------------- ### Install faststream-redis-timers with pip Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/introduction/installation.md Use this command to add the library if you are using pip as your package manager. ```bash pip install faststream-redis-timers ``` -------------------------------- ### Publishing from a Handler Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/publisher.md Schedule a timer from within a FastStream subscriber handler. The example shows scheduling a reminder 7 days after an order is received. ```python from datetime import timedelta from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) reminder_publisher = broker.publisher("reminders") @broker.subscriber("orders") async def handle_order(order_id: str) -> None: # Schedule a follow-up reminder 7 days after the order await reminder_publisher.publish( f"Follow up on order {order_id}", activate_in=timedelta(days=7), ) ``` -------------------------------- ### Register a Basic Subscriber Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/subscriber.md Use the `@broker.subscriber` decorator to register a handler for a specific topic. This example shows a simple handler for the 'reminders' topic. ```python from faststream import FastStream from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) app = FastStream(broker) @broker.subscriber("reminders") async def handle_reminder(body: str) -> None: print(f"Reminder fired: {body}") ``` -------------------------------- ### Initialize Broker and App Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Set up the FastStream application with a TimersBroker instance connected to a Redis client. The Redis client lifecycle is managed externally. ```python from faststream import FastStream from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) app = FastStream(broker) ``` -------------------------------- ### Basic Timer Scheduling and Handling Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Demonstrates setting up a FastStream app with TimersBroker, defining a subscriber, and scheduling a message to be delivered in 30 days. ```python from datetime import timedelta from faststream import FastStream from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) app = FastStream(broker) @broker.subscriber("invoices") async def handle_invoice(invoice_id: str) -> None: print(f"Invoice {invoice_id} is due!") @app.after_startup async def schedule() -> None: await broker.publish( "INV-001", topic="invoices", activate_in=timedelta(days=30), ) ``` -------------------------------- ### Create a TimersRouter Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/router.md Instantiate TimersRouter with an optional prefix to prepend to all topics. Define subscribers using the router's decorator. ```python from faststream_redis_timers import TimersRouter router = TimersRouter(prefix="my-service:") @router.subscriber("invoices") async def handle_invoice(invoice_id: str) -> None: print(f"Invoice due: {invoice_id}") ``` -------------------------------- ### Publisher Options: Tracing and Headers Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/publisher.md Demonstrates how to use correlation_id and headers when publishing messages. This is crucial for tracing and passing metadata to handlers. ```python from faststream import Context from faststream.message import StreamMessage pub = broker.publisher("orders") @broker.subscriber("orders") async def handle( body: dict, correlation_id: str = Context("message.correlation_id"), tenant: str = Context("message.headers.x-tenant"), ) -> None: ... # handler logic await pub.publish( {"order_id": 42}, correlation_id="trace-abc-123", headers={"x-tenant": "acme"}, ) ``` -------------------------------- ### Schedule Timer at Absolute Time Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Shows how to schedule a timer for a specific absolute date and time using `activate_at`. The `publish()` method returns the `timer_id` for potential cancellation. ```python from datetime import UTC, datetime timer_id = await broker.publish( "INV-001", topic="invoices", activate_at=datetime(2026, 6, 1, 9, tzinfo=UTC), ) ``` -------------------------------- ### Creating a Publisher Object Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/publisher.md Instantiate a TimersBroker and obtain a reusable publisher object for a specific topic. This is useful for dependency injection or use within handlers. ```python from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) reminder_publisher = broker.publisher("reminders") ``` -------------------------------- ### Inspecting Pending Timers Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/publisher.md Demonstrates how to fetch and assert the presence of pending timers using `fetch_redis_timers`. This is useful for testing timer scheduling without waiting for them to fire. ```python from datetime import UTC, datetime, timedelta pub = broker.publisher("invoices") await pub.publish("INV-001", timer_id="invoice-INV-001-due", activate_in=timedelta(days=30)) # Assert the timer is scheduled pending = await pub.fetch_redis_timers(datetime.now(tz=UTC) + timedelta(days=31)) assert ("invoices", "invoice-INV-001-due") in pending # Timers not yet due are excluded pending_now = await pub.fetch_redis_timers(datetime.now(tz=UTC)) assert pending_now == [] ``` -------------------------------- ### Inspecting Scheduled Timers Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/testing.md Demonstrates how to inspect scheduled timers using TestTimersBroker.scheduled_timers to verify that a specific timer was scheduled with the correct parameters without actual execution. ```python from datetime import UTC, datetime, timedelta from faststream_redis_timers import TestTimersBroker, TimersBroker async def test_order_schedules_reminder() -> None: broker = TimersBroker() @broker.subscriber("reminders") async def reminder_handler(body: str) -> None: ... test_broker = TestTimersBroker(broker) async with test_broker: await broker.publish( "follow up on order 42", topic="reminders", timer_id="order-42-followup", activate_at=datetime.now(tz=UTC) + timedelta(days=7), ) assert len(test_broker.scheduled_timers) == 1 record = test_broker.scheduled_timers[0] assert record.topic == "reminders" assert record.timer_id == "order-42-followup" assert record.body == "follow up on order 42" ``` -------------------------------- ### Managing connection ownership with Redis client Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Demonstrates how to manage the lifecycle of a Redis client when using TimersBroker. The caller owns the client lifecycle and should manage it using async with or try/finally. ```python async with Redis.from_url("redis://localhost:6379") as client: broker = TimersBroker(client) app = FastStream(broker) await app.run() ``` -------------------------------- ### Schedule Timers with Relative and Absolute Times Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Publish messages to a topic with scheduled delivery. Use 'activate_in' for relative delays (e.g., timedelta) or 'activate_at' for specific future datetimes. Ensure 'activate_at' datetimes are timezone-aware. ```python from datetime import UTC, datetime, timedelta @app.after_startup async def schedule_reminder() -> None: # Relative — fire 24 hours from now await broker.publish( "Call dentist", topic="reminders", activate_in=timedelta(hours=24), ) # Absolute — fire at a specific moment await broker.publish( "Quarterly review", topic="reminders", activate_at=datetime(2026, 6, 1, 9, tzinfo=UTC), ) ``` -------------------------------- ### Define Routes with TimersRoute Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/router.md Declare handlers and their topics upfront using TimersRoute, which can be useful for code generation or plugin architectures. ```python from faststream_redis_timers import TimersRoute, TimersRouter async def handle_invoice(invoice_id: str) -> None: print(f"Invoice due: {invoice_id}") router = TimersRouter( handlers=[ TimersRoute(handle_invoice, topic="invoices"), ], ) ``` -------------------------------- ### Configure Subscriber Polling Options Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/router.md Customize subscriber behavior per router by setting options like polling interval, maximum polling interval, concurrency, and lease TTL. ```python @router.subscriber( "high-priority", polling_interval=0.01, # poll every 10ms when the queue has work max_polling_interval=0.5, # cap idle-backoff at 500ms (default 5s) max_concurrent=20, # up to 20 handlers may run in parallel lease_ttl=60, # hold lease for up to 60 seconds ) async def handle_urgent(message: str) -> None: ... ``` -------------------------------- ### List All Pending Timers on a Topic Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Retrieve a list of all timer IDs currently scheduled on a given topic. ```python pending = await broker.get_pending_timers("invoices") ``` -------------------------------- ### Publish to a Prefixed Router Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/router.md When publishing to a topic managed by a TimersRouter with a prefix, use the full prefixed topic name with `broker.publish` or use a router-scoped publisher. ```python await broker.publish("INV-001", topic="my-service:invoices", activate_in=...) ``` ```python publisher = router.publisher("invoices") await publisher.publish("INV-001", activate_in=...) ``` -------------------------------- ### Configuring TimersBroker for high availability Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Instantiate TimersBroker with custom timeline and payload keys for high availability. Ensure all topic-related keys reside on the same Redis node. ```python broker = TimersBroker( Redis.from_url("redis://..."), timeline_key="my_timeline", payloads_key="my_payloads", ) ``` -------------------------------- ### Basic Test of Reminder Handler Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/testing.md Tests a basic reminder handler by publishing a message and asserting its reception. Uses TestTimersBroker for synchronous message delivery. ```python import pytest from faststream_redis_timers import TestTimersBroker, TimersBroker @pytest.mark.asyncio async def test_reminder_handler() -> None: broker = TimersBroker() received: list[str] = [] @broker.subscriber("reminders") async def handler(body: str) -> None: received.append(body) async with TestTimersBroker(broker) as tb: await tb.publish("hello", topic="reminders") assert received == ["hello"] ``` -------------------------------- ### Pytest Configuration for Asyncio Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/testing.md Configuration for pytest to handle asyncio tests automatically and set the default fixture loop scope to function. ```toml [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" ``` -------------------------------- ### Include TimersRouter in Broker Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/router.md Integrate a TimersRouter into a FastStream application by including it in the TimersBroker. ```python from faststream import FastStream from faststream_redis_timers import TimersBroker from redis.asyncio import Redis from myapp.routers import router client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) broker.include_router(router) app = FastStream(broker) ``` -------------------------------- ### Configure Subscriber Polling Behavior Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/subscriber.md Customize polling behavior for individual subscribers using parameters like `polling_interval`, `max_polling_interval`, `max_concurrent`, and `lease_ttl`. These settings control how frequently the broker polls for new timers and how many handlers can run concurrently. ```python @broker.subscriber( "high-priority", polling_interval=0.01, # poll every 10ms when busy max_polling_interval=0.5, # never sleep longer than 500ms when idle max_concurrent=20, # up to 20 handlers may run in parallel lease_ttl=60, # hold lease for up to 60 seconds ) async def handle_urgent(body: str) -> None: ... ``` -------------------------------- ### Publishing messages with tracing and headers Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Publish messages with correlation IDs and custom headers. These are passed to the handler via the standard FastStream message. ```python await broker.publish( {"order_id": 42}, topic="orders", correlation_id="trace-abc-123", headers={"x-tenant": "acme", "x-priority": "high"}, ) ``` -------------------------------- ### Redis Keys for Timer Storage Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/introduction/how-it-works.md Two Redis keys are used per topic: a sorted set for activation timestamps and a hash for message payloads. This structure allows for efficient retrieval and storage of scheduled timers. ```redis ZADD timers_timeline:{topic} HSET timers_payloads:{topic} ``` -------------------------------- ### List Pending Timers Due Before a Specific Time Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Filter pending timers to include only those scheduled to run before a specified timezone-aware datetime. ```python from datetime import UTC, datetime, timedelta soon = await broker.get_pending_timers( "invoices", before=datetime.now(tz=UTC) + timedelta(hours=1), ) ``` -------------------------------- ### Testing Publishers Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/testing.md Tests the functionality of publishing messages using a broker publisher within a test context. Asserts that the message is correctly received by a subscriber. ```python import pytest from faststream_redis_timers import TestTimersBroker, TimersBroker @pytest.mark.asyncio async def test_publisher() -> None: broker = TimersBroker() received: list[str] = [] @broker.subscriber("reminders") async def handler(body: str) -> None: received.append(body) pub = broker.publisher("reminders") async with TestTimersBroker(broker): await pub.publish("world") assert received == ["world"] ``` -------------------------------- ### Publish Timer with Custom Timer ID Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Assign a unique, custom timer ID during publishing for idempotency. If a timer with the same ID already exists, it will be overwritten. Namespace IDs to prevent collisions. ```python await broker.publish( "INV-001", topic="invoices", timer_id="invoice-INV-001-due", activate_in=timedelta(days=30), ) ``` -------------------------------- ### Cancel a Scheduled Timer Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Illustrates how to cancel a previously scheduled timer using its topic and the returned `timer_id`. ```python timer_id = await broker.publish("INV-001", topic="invoices", activate_in=timedelta(days=30)) await broker.cancel_timer("invoices", timer_id) ``` -------------------------------- ### Cancelling a Pending Timer Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/publisher.md Shows how to cancel a previously scheduled timer using its timer_id. The cancel operation is a no-op if the timer has already fired or does not exist. ```python pub = broker.publisher("invoices") timer_id = await pub.publish("INV-001", activate_in=timedelta(days=30)) # Later — cancel the timer via the publisher await pub.cancel(timer_id) ``` -------------------------------- ### Accessing tracing and headers in handler Source: https://github.com/modern-python/faststream-redis-timers/blob/main/README.md Retrieve correlation ID and headers from the incoming message within the handler using FastStream's Context. ```python from faststream import Context @broker.subscriber("orders") async def handle( body: dict, correlation_id: str = Context("message.correlation_id"), tenant: str = Context("message.headers.x-tenant"), ) -> None: ... ``` -------------------------------- ### Define a Timer Subscriber Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Create a message handler for a specific topic using the @broker.subscriber decorator. This function will be called when a message is published to the 'reminders' topic. ```python @broker.subscriber("reminders") async def handle_reminder(message: str) -> None: print(f"Reminder fired: {message}") ``` -------------------------------- ### Check if a Timer is Pending Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Use `has_pending` to quickly check if a specific timer is still scheduled on a topic. ```python if await broker.has_pending("invoices", "invoice-INV-001-due"): ... ``` -------------------------------- ### Manual Acknowledgement with Message Object Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/subscriber.md Manually control message acknowledgement (ack, nack, reject) by injecting the `StreamMessage` object. This is useful for implementing custom retry logic or permanently discarding messages based on specific error conditions. ```python from faststream.message import StreamMessage from faststream_redis_timers.message import TimerMessage @broker.subscriber("invoices") async def handle_invoice( body: str, msg: StreamMessage[TimerMessage], ) -> None: try: process(body) await msg.ack() except TransientError: await msg.nack() # retry later except PermanentError: await msg.reject() # discard permanently ``` -------------------------------- ### Cancel All Timers on a Topic Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Use `cancel_all` to remove all scheduled timers for a topic. This is useful for queue resets during maintenance. It returns the count of timers removed. ```python removed = await broker.cancel_all("invoices") ``` -------------------------------- ### Handle Different Body Types Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/subscriber.md FastStream automatically deserializes message bodies into annotated types. Any JSON-serializable type, like dataclasses, can be used. ```python from dataclasses import dataclass @dataclass class Order: order_id: str amount: float @broker.subscriber("orders") async def handle_order(body: Order) -> None: print(f"Order {body.order_id} for {body.amount} is due") ``` -------------------------------- ### Access Timer ID via Context Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/subscriber.md The timer ID, which is the message's `message_id`, can be injected into handler arguments using `Context`. This allows you to reference the specific timer that triggered the handler. ```python from faststream import Context from faststream_redis_timers import TimersBroker from redis.asyncio import Redis client = Redis.from_url("redis://localhost:6379") broker = TimersBroker(client) @broker.subscriber("invoices") async def handle_invoice( body: str, timer_id: str = Context("message.message_id"), ) -> None: print(f"Timer {timer_id} fired: {body}") ``` -------------------------------- ### Cancel a Pending Timer Source: https://github.com/modern-python/faststream-redis-timers/blob/main/docs/usage/basic.md Remove a scheduled timer before it fires using its topic and timer ID. This operation is a no-op if the timer has already fired or does not exist. ```python timer_id = await broker.publish( "INV-001", topic="invoices", activate_in=timedelta(days=30), ) # Later — invoice was paid early, cancel the reminder await broker.cancel_timer("invoices", timer_id) ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.