### Install TaskIQ Core Library Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md Before installing taskiq-redis, ensure the core taskiq library is installed. This command installs the latest version. ```bash pip install taskiq ``` -------------------------------- ### Install TaskIQ-Redis Plugin Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md Install the taskiq-redis plugin using pip. This command installs the Redis-specific components for TaskIQ. ```bash pip install taskiq-redis ``` -------------------------------- ### startup Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes the consumer group and prepares streams for reading when the broker starts. ```APIDOC ## startup ### Description Initialize consumer group and prepare streams for reading. ### Method `startup()` ### Parameters None ### Returns None ### Example ```python broker = RedisStreamBroker(url="redis://localhost:6379") await broker.startup() ``` ``` -------------------------------- ### TaskIQ Redis Production Setup Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Configure Taskiq for production using RedisStreamBroker with reliability and throughput settings, RedisAsyncResultBackend with result expiration, and ListRedisScheduleSource. This example demonstrates wiring these components together and starting the broker and schedule source. ```python import asyncio from taskiq_redis import ( RedisStreamBroker, RedisAsyncResultBackend, ListRedisScheduleSource, ) from taskiq import TaskiqScheduler async def setup_production(): # 1. Use RedisStreamBroker for reliability broker = RedisStreamBroker( url="redis://prod-redis:6379/0", queue_name="production", consumer_group_name="prod-workers", # 2. Configure for throughput xread_block=5000, xread_count=500, # 3. Auto-reclaim crashed task handling idle_timeout=1800000, # 30 minutes # 4. Limit stream size maxlen=100000, approximate=True, # 5. Connection pooling max_connection_pool_size=51, # 50 workers + 1 ) # 6. Set result expiration result_backend = RedisAsyncResultBackend( redis_url="redis://prod-redis:6379/1", result_ex_time=86400, # 24 hours max_connection_pool_size=51, ) # 7. Optimize schedule source schedule_source = ListRedisScheduleSource( url="redis://prod-redis:6379/2", prefix="prod_schedules", buffer_size=100, ) # 8. Wire everything together broker = broker.with_result_backend(result_backend) # 9. Create scheduler scheduler = TaskiqScheduler(broker, [schedule_source]) # 10. Startup await broker.startup() await schedule_source.startup() return broker, result_backend, schedule_source, scheduler asyncio.run(setup_production()) ``` -------------------------------- ### Redis Cluster Setup Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Shows how to configure Taskiq with Redis cluster using RedisStreamClusterBroker and RedisAsyncClusterResultBackend. This setup is for high availability and scalability. ```python from taskiq_redis import ( RedisStreamClusterBroker, RedisAsyncClusterResultBackend, ) # Cluster nodes cluster_url = "redis-cluster://node1:7000,node2:7000,node3:7000" result_backend = RedisAsyncClusterResultBackend( redis_url=cluster_url, result_ex_time=3600, ) broker = RedisStreamClusterBroker( url=cluster_url, ).with_result_backend(result_backend) @broker.task async def process_data(data: dict) -> dict: """Process data on cluster.""" return {"processed": data} ``` -------------------------------- ### Development Setup for Taskiq Redis Broker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Configure the Taskiq Redis broker and result backend for local development. This setup uses default Redis URLs and connects them. ```python from taskiq_redis import ( RedisStreamBroker, RedisAsyncResultBackend, ) broker = RedisStreamBroker(url="redis://localhost:6379/0") result_backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379/1", ) broker = broker.with_result_backend(result_backend) ``` -------------------------------- ### RedisSentinelScheduleSource Constructor Example Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Instantiate RedisSentinelScheduleSource with Sentinel connection details and master name. Customize prefix and buffer size as needed. ```python from taskiq_redis import RedisSentinelScheduleSource source = RedisSentinelScheduleSource( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ], master_name="mymaster", prefix="schedules", ) ``` -------------------------------- ### Single-Node Redis Setup and Task Execution Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Demonstrates initializing a RedisStreamBroker and RedisAsyncResultBackend for a single Redis instance. Shows how to define tasks and send them asynchronously, then wait for their results. ```python import asyncio from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend # Initialize broker and result backend result_backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=3600, # Expire results after 1 hour ) broker = RedisStreamBroker( url="redis://localhost:6379/0", ).with_result_backend(result_backend) # Define tasks @broker.task async def add_numbers(a: int, b: int) -> int: """Add two numbers.""" return a + b @broker.task async def slow_task() -> str: """Simulate a slow task.""" await asyncio.sleep(5) return "Done!" # Usage async def main(): # Send tasks result = await add_numbers.kiq(2, 3) print(f"Task ID: {result.task_id}") # Wait for result final_result = await result.wait_result() print(f"Result: {final_result.return_value}") # 5 asyncio.run(main()) ``` -------------------------------- ### Initialize RedisStreamBroker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Instantiate the RedisStreamBroker with a Redis connection URL. Ensure the broker is started before use. ```python broker = RedisStreamBroker(url="redis://localhost:6379") await broker.startup() ``` -------------------------------- ### Import Sentinel from redis.asyncio Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Import the `Sentinel` client from the `redis.asyncio` module for managing Redis Sentinel high-availability setups. Provides a helper method `master_for` to get a master connection context manager. ```python from redis.asyncio import Sentinel ``` -------------------------------- ### Set Up a Task Queue with Redis Stream Broker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/README.md Configure a Redis Stream broker with a Redis async result backend. This is a basic setup for a task queue. ```python from taskiq_redis import RedisStreamBroker, RedisAsyncResultBackend result_backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=3600, ) broker = RedisStreamBroker( url="redis://localhost:6379/0", ).with_result_backend(result_backend) @broker.task async def my_task(arg): return arg * 2 ``` -------------------------------- ### RedisStreamBroker Configuration Example Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Configure a RedisStreamBroker with various stream-specific parameters. This example demonstrates setting custom consumer group names, consumer names, block timeouts, message counts, idle timeouts, and stream length limits. ```python from taskiq_redis import RedisStreamBroker broker = RedisStreamBroker( url="redis://localhost:6379", consumer_group_name="my_workers", consumer_name="worker-1", xread_block=5000, # Block 5 seconds xread_count=50, # Fetch 50 at a time idle_timeout=300000, # Auto-reclaim after 5 minutes maxlen=10000, # Keep max 10000 messages ) ``` -------------------------------- ### Using CBORSerializer with RedisAsyncResultBackend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Example of configuring RedisAsyncResultBackend with a custom CBORSerializer for task data. Ensure the CBORSerializer is installed. ```python from taskiq.serializers import CBORSerializer backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", serializer=CBORSerializer(), ) ``` -------------------------------- ### Production Setup for Taskiq Redis Broker (Single-Node) Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Configure the Taskiq Redis broker for a production single-node setup. This includes specifying queue names, consumer group names, and read/idle timeouts for efficient worker management. ```python broker = RedisStreamBroker( url="redis://myredis.example.com:6379/0", queue_name="prod-queue", consumer_group_name="prod-workers", xread_block=5000, xread_count=500, idle_timeout=1800000, # 30 min max_connection_pool_size=51, # 50 workers + 1 ) ``` -------------------------------- ### Running Taskiq Workers Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Command to start Taskiq worker instances that will consume tasks from the configured broker. ```bash # Start worker(s) taskiq worker broker:broker ``` -------------------------------- ### Configure Redis Broker and Async Result Backend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md Example of setting up a RedisStreamBroker with a RedisAsyncResultBackend. This configuration uses Redis for both task queuing and result storage. Ensure your Redis server is accessible at the specified URL. ```python # broker.py import asyncio from taskiq_redis import RedisAsyncResultBackend, RedisStreamBroker result_backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", ) # Or you can use PubSubBroker if you need broadcasting # Or ListQueueBroker if you don't want acknowledges broker = RedisStreamBroker( url="redis://localhost:6379", ).with_result_backend(result_backend) @broker.task async def best_task_ever() -> None: """Solve all problems in the world.""" await asyncio.sleep(5.5) print("All problems are solved!") async def main(): task = await best_task_ever.kiq() print(await task.wait_result()) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Production Setup for Taskiq Redis Result Backend (Single-Node) Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Configure the Taskiq Redis result backend for a production single-node setup. This specifies the Redis URL and the expiration time for task results. ```python result_backend = RedisAsyncResultBackend( redis_url="redis://myredis.example.com:6379/1", result_ex_time=86400, # 1 day max_connection_pool_size=51, ) ``` -------------------------------- ### Taskiq AckableMessage Usage Example Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Demonstrates how to process and acknowledge messages using AckableMessage, ensuring proper handling of exceptions for retries. ```python async for ackable_msg in broker.listen(): try: process(ackable_msg.data) await ackable_msg.ack() # Acknowledge after processing except Exception: # Don't acknowledge; message will be retried pass ``` -------------------------------- ### High-Availability Setup for Taskiq Redis Broker (Sentinel) Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Configure the Taskiq Redis broker for a high-availability setup using Redis Sentinel. This involves providing a list of sentinel addresses and the master name. ```python from taskiq_redis import ( RedisStreamSentinelBroker, RedisAsyncSentinelResultBackend, ) broker = RedisStreamSentinelBroker( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ("sentinel-3", 26379), ], master_name="mymaster", queue_name="prod-queue", ) ``` -------------------------------- ### Migrate Schedules from RedisScheduleSource to ListRedisScheduleSource Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md This example demonstrates how to configure a `ListRedisScheduleSource` to migrate schedules from an existing `RedisScheduleSource`. Ensure different prefixes are used to avoid collisions. Schedules can optionally be deleted from the old source after migration. ```python # broker.py import asyncio import datetime from taskiq import TaskiqScheduler from taskiq_redis import ListRedisScheduleSource, RedisStreamBroker from taskiq_redis.schedule_source import RedisScheduleSource broker = RedisStreamBroker(url="redis://localhost:6379") old_source = RedisScheduleSource("redis://localhost/1", prefix="prefix1") array_source = ListRedisScheduleSource( "redis://localhost/1", prefix="prefix2", # To migrate schedules from an old source. ).with_migrate_from( old_source, # To delete schedules from an old source. delete_schedules=True, ) scheduler = TaskiqScheduler(broker, [array_source]) ``` -------------------------------- ### Configure Redis Cluster Result Backend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Use RedisAsyncClusterResultBackend for Redis Cluster setups. Specify the cluster URL and result expiration. ```python from taskiq_redis import RedisAsyncClusterResultBackend backend = RedisAsyncClusterResultBackend( redis_url="redis-cluster://node1:7000", result_ex_time=3600, ) ``` -------------------------------- ### High-Availability Setup for Taskiq Redis Result Backend (Sentinel) Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Configure the Taskiq Redis result backend for a high-availability setup using Redis Sentinel. This requires specifying the sentinel addresses, master name, and result expiration time. ```python result_backend = RedisAsyncSentinelResultBackend( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ("sentinel-3", 26379), ], master_name="mymaster", result_ex_time=86400, ) ``` -------------------------------- ### Task Scheduling with Cron Schedules Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Set up a Taskiq scheduler to run tasks based on cron expressions. This example schedules a daily report to run at 9 AM UTC. ```python import asyncio from taskiq import TaskiqScheduler from taskiq_redis import RedisStreamBroker, ListRedisScheduleSource broker = RedisStreamBroker(url="redis://localhost:6379") schedule_source = ListRedisScheduleSource(url="redis://localhost:6379/1") @broker.task async def daily_report(): """Run daily at 9 AM.""" print("Generating daily report...") # Create scheduler scheduler = TaskiqScheduler(broker, [schedule_source]) async def main(): # Add a cron schedule (9 AM every day) from taskiq.scheduler.scheduled_task import ScheduledTask schedule = ScheduledTask( schedule_id="daily-report", task_name="daily_report", cron="0 9 * * *", # 9 AM UTC every day ) await schedule_source.add_schedule(schedule) # Run scheduler await scheduler.startup() await scheduler.run() asyncio.run(main()) ``` -------------------------------- ### Launch TaskIQ Worker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md Command to launch a TaskIQ worker that uses the broker configured in the specified Python file. This command starts the worker process to consume and execute tasks. ```bash taskiq worker broker:broker ``` -------------------------------- ### Configure Redis Sentinel Result Backend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Use RedisAsyncSentinelResultBackend for Redis Sentinel high-availability setups. Provide sentinel addresses, master name, and result expiration. ```python from taskiq_redis import RedisAsyncSentinelResultBackend backend = RedisAsyncSentinelResultBackend( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ], master_name="mymaster", result_ex_time=3600, ) ``` -------------------------------- ### Configure Taskiq Redis Sentinel Broker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Use ListQueueSentinelBroker for high-availability Redis setups. Specify sentinel addresses, master name, and optionally a queue name. ```python from taskiq_redis import ListQueueSentinelBroker broker = ListQueueSentinelBroker( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ("sentinel-3", 26379), ], master_name="mymaster", queue_name="taskiq", ) ``` -------------------------------- ### Get Schedules for Execution Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Retrieves all schedules that are ready for execution at the current time. Fetches cron, interval, and current minute's timed schedules, potentially including past timed schedules on the first run. Uses batching for efficiency. ```python schedules = await schedule_source.get_schedules() for schedule in schedules: await task_broker.send_schedule(schedule) ``` -------------------------------- ### startup Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Initializes the schedule source. If configured with `with_migrate_from()`, it performs migration from a previous source. ```APIDOC ## startup ### Description Initialize the schedule source. If configured with `with_migrate_from()`, performs migration from the previous source. ### Returns None ### Example ```python await schedule_source.startup() # If migration configured, this will: # 1. Connect to previous source # 2. Fetch all schedules from it # 3. Add them to this source # 4. Optionally delete from previous source ``` ``` -------------------------------- ### Initialize Schedule Source Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Initializes the schedule source. If configured with `with_migrate_from()`, it performs migration from a previous source during startup. ```python await schedule_source.startup() # If migration configured, this will: # 1. Connect to previous source # 2. Fetch all schedules from it # 3. Add them to this source # 4. Optionally delete from previous source ``` -------------------------------- ### Initialize RedisAsyncClusterResultBackend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Use this backend for Redis Cluster deployments. Configure with the Redis cluster URL and optional result expiration time. ```python from taskiq_redis import RedisAsyncClusterResultBackend backend = RedisAsyncClusterResultBackend( redis_url="redis-cluster://localhost:7000", result_ex_time=3600, ) ``` -------------------------------- ### with_migrate_from Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Configures migration from a previous schedule source. During startup, schedules will be migrated from the old source to the new one, with an option to delete them from the old source. ```APIDOC ## with_migrate_from ### Description Configure migration from a previous schedule source. ### Parameters #### Request Body - **source** (ScheduleSource) - Required - Previous schedule source to migrate from - **delete_schedules** (bool) - Optional - If True, delete schedules from old source after migration ### Returns Self — This schedule source (for chaining) ### Behavior During `startup()`, the source will: 1. Connect to the previous source 2. Retrieve all schedules from it 3. Add them to this source 4. Optionally delete them from the previous source ### Example ```python from taskiq_redis import ListRedisScheduleSource from taskiq_redis.schedule_source import RedisScheduleSource old_source = RedisScheduleSource("redis://localhost:6379", prefix="old") new_source = ListRedisScheduleSource( "redis://localhost:6379", prefix="new", ).with_migrate_from(old_source, delete_schedules=True) # During startup, schedules migrate from old to new await new_source.startup() ``` ``` -------------------------------- ### Instantiate PubSubBroker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Instantiate a PubSubBroker with a Redis connection URL. This broker broadcasts messages to all subscribed workers. ```python from taskiq_redis import PubSubBroker from taskiq import BrokerMessage broker = PubSubBroker(url="redis://localhost:6379") message = BrokerMessage(message=b"task_data", task_id="123") await broker.kick(message) ``` -------------------------------- ### Initialize ListRedisScheduleSource Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Instantiate ListRedisScheduleSource with a Redis connection URL and an optional key prefix. This is the recommended schedule source for TaskIQ. ```python from taskiq_redis import ListRedisScheduleSource schedule_source = ListRedisScheduleSource( url="redis://localhost:6379", prefix="myapp:schedules", ) ``` -------------------------------- ### Taskiq Redis Result Expiration Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Set expiration times for task results to prevent memory bloat, with examples for short-lived and long-running tasks. ```python # Short-lived tasks backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=3600, # 1 hour ) # Long-running tasks backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=86400, # 1 day ) ``` -------------------------------- ### Initialize RedisAsyncSentinelResultBackend Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Use this backend for Redis Sentinel high-availability deployments. Provide a list of sentinel addresses and the master name. Optional parameters include result expiration time and sentinel connection details. ```python from taskiq_redis import RedisAsyncSentinelResultBackend backend = RedisAsyncSentinelResultBackend( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ("sentinel-3", 26379), ], master_name="mymaster", result_ex_time=3600, ) ``` -------------------------------- ### PubSubBroker Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes a PubSubBroker for broadcasting tasks using Redis PUBSUB. Supports single-node, cluster, and sentinel architectures. ```APIDOC ## PubSubBroker Constructor ### Description Initializes a PubSubBroker for broadcasting tasks using Redis PUBSUB. All workers receive all messages. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, url: str, task_id_generator: Callable[[], str] | None = None, result_backend: AsyncResultBackend[_T] | None = None, queue_name: str = "taskiq", max_connection_pool_size: int | None = None, **connection_kwargs: Any, ) -> None ``` ### Parameters - **url** (str) - Required - Redis connection URL (e.g., `redis://localhost:6379`) - **task_id_generator** (Callable[[], str] | None) - Optional - Custom task ID generator function - **result_backend** (AsyncResultBackend[_T] | None) - Optional - Result backend for storing task results - **queue_name** (str) - Optional - PUBSUB channel name (Default: "taskiq") - **max_connection_pool_size** (int | None) - Optional - Maximum concurrent connections - **connection_kwargs** (Any) - Optional - Additional kwargs passed to `BlockingConnectionPool.from_url()` ``` -------------------------------- ### RedisAsyncClusterResultBackend Methods Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Provides methods for managing task results, including setting, getting, and checking the status of results, as well as progress tracking and backend shutdown. ```APIDOC ## RedisAsyncClusterResultBackend Methods ### Description Provides methods for managing task results, including setting, getting, and checking the status of results, as well as progress tracking and backend shutdown. ### Methods - `set_result(task_id, result) -> None`: Sets the result for a given task ID. - `get_result(task_id, with_logs) -> TaskiqResult[_ReturnType]`: Retrieves the result for a given task ID, optionally including logs. - `is_result_ready(task_id) -> bool`: Checks if the result for a given task ID is ready. - `set_progress(task_id, progress) -> None`: Sets the progress for a given task ID. - `get_progress(task_id) -> TaskProgress[_ReturnType] | None`: Retrieves the progress for a given task ID. - `shutdown() -> None`: Shuts down the result backend connection. ``` -------------------------------- ### Configure Schedule Migration Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Configures migration from a previous schedule source. During `startup()`, schedules are migrated to the new source, with an option to delete them from the old source. ```python from taskiq_redis import ListRedisScheduleSource from taskiq_redis.schedule_source import RedisScheduleSource old_source = RedisScheduleSource("redis://localhost:6379", prefix="old") new_source = ListRedisScheduleSource( "redis://localhost:6379", prefix="new", ).with_migrate_from(old_source, delete_schedules=True) # During startup, schedules migrate from old to new await new_source.startup() ``` -------------------------------- ### Configure ListRedisScheduleSource Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Instantiate ListRedisScheduleSource with custom Redis URL, key prefix, and schedule skipping behavior. ```python from taskiq_redis import ListRedisScheduleSource source = ListRedisScheduleSource( url="redis://localhost:6379/1", prefix="myapp:schedules", skip_past_schedules=False, ) ``` -------------------------------- ### Basic Redis Broker Configuration Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/configuration.md Configure a ListQueueBroker with common connection parameters like timeout and keepalive settings. ```python broker = ListQueueBroker( url="redis://localhost:6379", timeout=10.0, socket_keepalive=True, health_check_interval=30, ) ``` -------------------------------- ### Initialize ListQueueBroker Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Instantiate ListQueueBroker with a Redis connection URL. Optional parameters include a custom task ID generator, result backend, queue name, and maximum connection pool size. ```python broker = ListQueueBroker(url="redis://localhost:6379") ``` -------------------------------- ### ListQueueBroker Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes the ListQueueBroker with Redis connection details and optional configurations. ```APIDOC ## ListQueueBroker Constructor Initializes the ListQueueBroker with Redis connection details and optional configurations. ### Parameters #### Path Parameters - **url** (str) - Required - Redis connection URL - **task_id_generator** (Callable[[], str] | None) - Optional - Custom task ID generator - **result_backend** (AsyncResultBackend[_T] | None) - Optional - Result backend - **queue_name** (str) - Optional - Redis list key name (default: "taskiq") - **max_connection_pool_size** (int | None) - Optional - Maximum concurrent connections - **connection_kwargs** (Any) - Optional - Additional connection kwargs ``` -------------------------------- ### Migrating Schedules from RedisScheduleSource to ListRedisScheduleSource Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Demonstrates setting up a Taskiq scheduler with a new ListRedisScheduleSource that automatically migrates schedules from an old RedisScheduleSource. Schedules are deleted from the old source after migration. ```python import asyncio from taskiq import TaskiqScheduler from taskiq_redis import ListRedisScheduleSource, RedisStreamBroker from taskiq_redis.schedule_source import RedisScheduleSource async def main(): # Set up broker broker = RedisStreamBroker(url="redis://localhost:6379") # Old schedule source old_source = RedisScheduleSource( "redis://localhost:6379", prefix="old_prefix", ) # New schedule source with migration new_source = ListRedisScheduleSource( "redis://localhost:6379", prefix="new_prefix", ).with_migrate_from(old_source, delete_schedules=True) # Create scheduler scheduler = TaskiqScheduler(broker, [new_source]) # During startup, migration happens automatically await scheduler.startup() # Schedules are now in the new source await scheduler.run() asyncio.run(main()) ``` -------------------------------- ### PubSubSentinelBroker Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes a PubSubSentinelBroker for Redis Sentinel high-availability deployments. ```APIDOC ## PubSubSentinelBroker Constructor ### Description Initializes a PubSubSentinelBroker for Redis Sentinel high-availability deployments. It uses Redis Sentinel for managing master/replica connections. ### Parameters - **sentinels** (list[tuple[str, int]]) - Required - A list of (host, port) tuples for the Sentinel addresses. - **master_name** (str) - Required - The name of the Redis master managed by Sentinel. - **result_backend** (AsyncResultBackend[_T] | None) - Optional - The result backend to use. Defaults to None. - **task_id_generator** (Callable[[], str] | None) - Optional - A function to generate task IDs. Defaults to None. - **queue_name** (str) - Optional - The name of the queue/channel. Defaults to "taskiq". - **min_other_sentinels** (int) - Optional - The minimum number of other sentinels to wait for. Defaults to 0. - **sentinel_kwargs** (Any | None) - Optional - Additional keyword arguments for Sentinel initialization. Defaults to None. - **connection_kwargs** (Any) - Optional - Additional keyword arguments for the Redis connection. ``` -------------------------------- ### RedisAsyncResultBackend Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Initializes the RedisAsyncResultBackend with connection details and configuration for result storage and expiration. ```APIDOC ## RedisAsyncResultBackend Constructor ### Description Initializes the RedisAsyncResultBackend with connection details and configuration for result storage and expiration. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, redis_url: str, keep_results: bool = True, result_ex_time: int | None = None, result_px_time: int | None = None, max_connection_pool_size: int | None = None, serializer: TaskiqSerializer | None = None, prefix_str: str | None = None, **connection_kwargs: Any, ) ``` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None #### Constructor Arguments - **redis_url** (str) - Required - Redis connection URL (e.g., `redis://localhost:6379`) - **keep_results** (bool) - Optional - True, results remain in Redis after retrieval; if False, deleted after read - **result_ex_time** (int | None) - Optional - Expiration time in seconds; cannot be used with result_px_time - **result_px_time** (int | None) - Optional - Expiration time in milliseconds; cannot be used with result_ex_time - **max_connection_pool_size** (int | None) - Optional - Maximum concurrent connections in pool - **serializer** (TaskiqSerializer | None) - Optional - Custom serializer (PickleSerializer used if None) - **prefix_str** (str | None) - Optional - Key prefix for result keys (e.g., "myapp:results") - **connection_kwargs** (Any) - Optional - Additional kwargs for `BlockingConnectionPool.from_url()` (e.g., `timeout`) ### Raises: - `DuplicateExpireTimeSelectedError` — if both result_ex_time and result_px_time are set - `ExpireTimeMustBeMoreThanZeroError` — if expiration time is <= 0 ### Recommended Usage: ```python from taskiq_redis import RedisAsyncResultBackend # With expiration backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=3600, # 1 hour ) ``` ``` -------------------------------- ### RedisSentinelScheduleSource Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Initializes the RedisSentinelScheduleSource for high-availability Redis scheduling using Sentinel. ```APIDOC ## RedisSentinelScheduleSource Constructor ### Description Initializes the RedisSentinelScheduleSource for high-availability Redis scheduling using Sentinel. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, sentinels: list[tuple[str, int]], master_name: str, prefix: str = "schedule", buffer_size: int = 50, serializer: TaskiqSerializer | None = None, min_other_sentinels: int = 0, sentinel_kwargs: Any | None = None, **connection_kwargs: Any, ) -> None ``` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | sentinels | list[tuple[str, int]] | ✓ | — | Sentinel (host, port) pairs | | master_name | str | ✓ | — | Sentinel master name | | prefix | str | ✗ | "schedule" | Key prefix | | buffer_size | int | ✗ | 50 | MGET batch size | | serializer | TaskiqSerializer \| None | ✗ | None | Custom serializer | | min_other_sentinels | int | ✗ | 0 | Minimum additional sentinels to connect | | sentinel_kwargs | Any \| None | ✗ | None | Sentinel initialization kwargs | | **connection_kwargs | Any | ✗ | — | Additional connection kwargs | ### Example ```python from taskiq_redis import RedisSentinelScheduleSource source = RedisSentinelScheduleSource( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ], master_name="mymaster", prefix="schedules", ) ``` ``` -------------------------------- ### Track Task Progress with Taskiq Redis Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Demonstrates how to implement and monitor the progress of a long-running task using Taskiq's progress tracking capabilities. The client side shows how to initiate the task and retrieve its progress. ```python from taskiq.depends.progress_tracker import TaskProgress @broker.task async def long_task(): """Task that reports progress.""" from taskiq import CurrentTaskInfo from taskiq.depends import Depends async def get_progress() -> TaskProgress: """Progress tracker dependency.""" # This would be injected by TaskIQ pass for i in range(100): await asyncio.sleep(0.1) # Progress is tracked automatically by TaskIQ return "Complete!" # From client side task_result = await long_task.kiq() # Check progress progress = await broker.result_backend.get_progress(task_result.task_id) if progress: print(f"Progress: {progress.current}/{progress.total}") ``` -------------------------------- ### RedisAsyncSentinelResultBackend Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Initializes the RedisAsyncSentinelResultBackend for Redis Sentinel deployments, ensuring high availability. It requires sentinel addresses and master name. ```APIDOC ## RedisAsyncSentinelResultBackend Constructor ### Description Initializes the RedisAsyncSentinelResultBackend for Redis Sentinel deployments, ensuring high availability. It requires sentinel addresses and master name. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, sentinels: list[tuple[str, int]], master_name: str, keep_results: bool = True, result_ex_time: int | None = None, result_px_time: int | None = None, min_other_sentinels: int = 0, sentinel_kwargs: Any | None = None, serializer: TaskiqSerializer | None = None, prefix_str: str | None = None, **connection_kwargs: Any, ) -> None ``` ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | sentinels | list[tuple[str, int]] | ✓ | — | List of (host, port) sentinel addresses | | master_name | str | ✓ | — | Sentinel master name for the Redis instance | | keep_results | bool | ✗ | True | If True, keep results; if False, delete after read | | result_ex_time | int \| None | ✗ | None | Expiration time in seconds | | result_px_time | int \| None | ✗ | None | Expiration time in milliseconds | | min_other_sentinels | int | ✗ | 0 | Minimum additional sentinels to connect to | | sentinel_kwargs | Any \| None | ✗ | None | Kwargs for Sentinel initialization | | serializer | TaskiqSerializer \| None | ✗ | None | Custom serializer | | prefix_str | str \| None | ✗ | None | Key prefix for results | | **connection_kwargs | Any | ✗ | — | Additional connection kwargs | ### Raises - `DuplicateExpireTimeSelectedError` - `ExpireTimeMustBeMoreThanZeroError` ### Example ```python from taskiq_redis import RedisAsyncSentinelResultBackend backend = RedisAsyncSentinelResultBackend( sentinels=[ ("sentinel-1", 26379), ("sentinel-2", 26379), ("sentinel-3", 26379), ], master_name="mymaster", result_ex_time=3600, ) ``` ``` -------------------------------- ### ListQueueClusterBroker Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes a ListQueueClusterBroker for Redis cluster deployments. ```APIDOC ## ListQueueClusterBroker Constructor ### Description Initializes a ListQueueClusterBroker for Redis cluster deployments. It works identically to `ListQueueBroker` but uses `RedisCluster` for cluster mode. ### Parameters - **url** (str) - Required - The connection URL for the Redis cluster. - **queue_name** (str) - Optional - The name of the queue. Defaults to "taskiq". - **max_connection_pool_size** (int) - Optional - Maximum size of the connection pool. Defaults to 2**31. - **connection_kwargs** (Any) - Optional - Additional keyword arguments for the Redis connection. ``` -------------------------------- ### Run Main Application Code Source: https://github.com/taskiq-python/taskiq-redis/blob/main/README.md Command to execute the Python script that defines and potentially sends tasks. This script will interact with the TaskIQ worker. ```bash python3 broker.py ``` -------------------------------- ### Dynamic Queue Naming for Tasks Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Shows how to assign a specific queue name to a task at decoration time using the 'queue_name' argument. ```python @broker.task(queue_name="high_priority") async def urgent_task(): pass ``` -------------------------------- ### Import Redis from redis.asyncio Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Import the `Redis` client from the `redis.asyncio` module for executing asynchronous Redis commands. Can be used as an async context manager. ```python from redis.asyncio import Redis ``` -------------------------------- ### Import BlockingConnectionPool from redis.asyncio Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Import the `BlockingConnectionPool` class from the `redis.asyncio` module. This is used for TaskIQ brokers requiring blocking connection behavior. Requires redis package version >= 8.0.0 and < 9. ```python from redis.asyncio import BlockingConnectionPool ``` -------------------------------- ### Configure Multiple Redis Databases for Broker, Result Backend, and Schedule Source Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/USAGE_EXAMPLES.md Assigns different Redis databases to the broker, result backend, and schedule source. This allows for logical separation of data within Redis. ```python broker = RedisStreamBroker(url="redis://localhost:6379/0") result_backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379/1", ) schedule_source = ListRedisScheduleSource( url="redis://localhost:6379/2", ) ``` -------------------------------- ### RedisAsyncClusterResultBackend Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Initializes the RedisAsyncClusterResultBackend for Redis Cluster deployments. It accepts connection details and result storage options. ```APIDOC ## RedisAsyncClusterResultBackend Constructor ### Description Initializes the RedisAsyncClusterResultBackend for Redis Cluster deployments. It accepts connection details and result storage options. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, redis_url: str, keep_results: bool = True, result_ex_time: int | None = None, result_px_time: int | None = None, serializer: TaskiqSerializer | None = None, prefix_str: str | None = None, **connection_kwargs: Any, ) -> None ``` ### Raises - `DuplicateExpireTimeSelectedError` - `ExpireTimeMustBeMoreThanZeroError` ### Example ```python from taskiq_redis import RedisAsyncClusterResultBackend backend = RedisAsyncClusterResultBackend( redis_url="redis-cluster://localhost:7000", result_ex_time=3600, ) ``` ``` -------------------------------- ### RedisScheduleSource Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Initializes the RedisScheduleSource with connection details and configuration. This class is deprecated. ```APIDOC ## RedisScheduleSource Constructor ### Description Initializes the RedisScheduleSource with connection details and configuration. This class is deprecated. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Constructor Signature ```python def __init__( self, url: str, prefix: str = "schedule", buffer_size: int = 50, max_connection_pool_size: int | None = None, serializer: TaskiqSerializer | None = None, **connection_kwargs: Any, ) -> None ``` ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | url | str | ✓ | — | Redis connection URL | | prefix | str | ✗ | "schedule" | Prefix for schedule keys | | buffer_size | int | ✗ | 50 | Batch size for MGET operations | | max_connection_pool_size | int \| None | ✗ | None | Maximum concurrent connections | | serializer | TaskiqSerializer \| None | ✗ | None | Custom serializer | | **connection_kwargs | Any | ✗ | — | Additional connection kwargs | ``` -------------------------------- ### RedisClusterScheduleSource Initialization Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Instantiate RedisClusterScheduleSource for Redis Cluster environments. Specify the Redis cluster URL and an optional prefix for schedule keys. ```python from taskiq_redis import RedisClusterScheduleSource source = RedisClusterScheduleSource( url="redis-cluster://localhost:7000", prefix="schedules", ) ``` -------------------------------- ### ListRedisScheduleSource Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/schedule-sources.md Initializes the ListRedisScheduleSource, an optimized schedule source using Redis lists for efficient storage and retrieval of schedules by type and time. ```APIDOC ## ListRedisScheduleSource Constructor ### Description Initializes the ListRedisScheduleSource, which uses Redis lists to store schedules categorized by type (cron, timed, interval) for optimized retrieval. ### Constructor Signature ```python def __init__( self, url: str, prefix: str = "schedule", max_connection_pool_size: int | None = None, serializer: TaskiqSerializer | None = None, buffer_size: int = 50, skip_past_schedules: bool = False, **connection_kwargs: Any, ) -> None ``` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | url | str | ✓ | — | Redis connection URL (e.g., `redis://localhost:6379`) | | prefix | str | ✗ | "schedule" | Prefix for all Redis keys (prevents key collisions) | | max_connection_pool_size | int | ✗ | None | Maximum concurrent connections in pool | | serializer | TaskiqSerializer | ✗ | None | Custom serializer (PickleSerializer used if None) | | buffer_size | int | ✗ | 50 | Batch size for retrieving schedules via MGET | | skip_past_schedules | bool | ✗ | False | If True, don't fetch timed schedules from before startup | | connection_kwargs | Any | ✗ | — | Additional kwargs for `BlockingConnectionPool.from_url()` | ### Example ```python from taskiq_redis import ListRedisScheduleSource schedule_source = ListRedisScheduleSource( url="redis://localhost:6379", prefix="myapp:schedules", ) ``` ``` -------------------------------- ### Migrating Schedules Between Redis Sources Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/PACKAGE_OVERVIEW.md Shows how to migrate schedules from an old source to a new one using ListRedisScheduleSource. The 'with_migrate_from' method can optionally delete schedules from the old source after migration. ```python new_source = ListRedisScheduleSource( url="redis://localhost:6379", prefix="new", ).with_migrate_from(old_source, delete_schedules=True) ``` -------------------------------- ### PubSubBroker.listen Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Listens for incoming messages on the PUBSUB channel, yielding raw message data. ```APIDOC ## PubSubBroker.listen ### Description Listens for incoming messages on the PUBSUB channel. All subscribed workers will receive it. ### Method `listen` ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Method Signature ```python async def listen(self) -> AsyncGenerator[bytes, None] ``` ### Parameters None ### Response #### Success Response - **Yields:** bytes — Raw message data ### Example ```python async for message_bytes in broker.listen(): print(f"Received: {message_bytes}") ``` ``` -------------------------------- ### Configure Redis Result Backend with Key Prefix Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Set a custom string prefix for all keys used by the Redis result backend. This helps in organizing and isolating Taskiq keys within Redis. ```python # Without prefix key = "task-123" key_progress = "task-123__progress" # With prefix backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", prefix_str="myapp" ) # Result key: "myapp:task-123" # Progress key: "myapp:task-123__progress" ``` -------------------------------- ### Initialize RedisAsyncResultBackend with expiration Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/result-backends.md Instantiate RedisAsyncResultBackend with a Redis URL and set an expiration time for results in seconds. Ensure that result_ex_time and result_px_time are not used together. ```python from taskiq_redis import RedisAsyncResultBackend # With expiration backend = RedisAsyncResultBackend( redis_url="redis://localhost:6379", result_ex_time=3600, # 1 hour ) ``` -------------------------------- ### RedisStreamBroker Constructor Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/api-reference/brokers.md Initializes the RedisStreamStreamBroker with connection details and configuration for Redis streams. ```APIDOC ## RedisStreamBroker Constructor ### Description Initializes the RedisStreamBroker with connection details and configuration for Redis streams. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters Table | Parameter | Type | Required | Default | Description | |-----------|------|----------|---------|-------------| | url | str | ✓ | — | Redis connection URL | | queue_name | str | ✗ | "taskiq" | Redis stream key name | | max_connection_pool_size | int | ✗ | None | Maximum concurrent connections | | consumer_group_name | str | ✗ | "taskiq" | Consumer group name for stream tracking | | consumer_name | str | ✗ | None | Unique consumer name (UUID if not provided) | | consumer_id | str | ✗ | "$" | Stream ID to start reading from ($ = newest) | | mkstream | bool | ✗ | True | Auto-create stream if missing | | xread_block | int | ✗ | 2000 | Block timeout in milliseconds for XREADGROUP | | maxlen | int | ✗ | None | Maximum stream length before trimming | | approximate | bool | ✗ | True | Use approximate trimming for performance | | idle_timeout | int | ✗ | 600000 | Auto-reclaim timeout in ms (10 minutes) | | unacknowledged_batch_size | int | ✗ | 100 | Messages to fetch per auto-claim | | unacknowledged_lock_timeout | float | ✗ | None | Lock timeout for auto-claim (None = indefinite) | | xread_count | int | ✗ | 100 | Messages to fetch per XREADGROUP call | | additional_streams | dict[str, str | int] | ✗ | None | Additional streams to read from | | connection_kwargs | Any | ✗ | — | Additional connection kwargs ``` -------------------------------- ### Import RedisCluster from redis.asyncio Source: https://github.com/taskiq-python/taskiq-redis/blob/main/_autodocs/types.md Import the `RedisCluster` client from the `redis.asyncio` module for interacting with Redis Cluster deployments. ```python from redis.asyncio import RedisCluster ```