### Install dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/contrib.md Install all project dependencies using uv. ```bash uv sync --all-extras ``` -------------------------------- ### Install custom brokers Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/brokers.md Install packages for Taskiq-maintained brokers. ```bash pip install taskiq-aio-pika ``` ```bash pip install taskiq-redis ``` ```bash pip install taskiq-nats ``` -------------------------------- ### Install third-party brokers Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/brokers.md Install packages for community-maintained brokers. ```bash pip install taskiq-postgresql ``` ```bash pip install taskiq-aio-sqs ``` ```bash pip install taskiq-ydb ``` -------------------------------- ### Install taskiq-aiohttp Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiohttp.md Install the taskiq-aiohttp library using pip. ```python pip install "taskiq-aiohttp" ``` -------------------------------- ### Install taskiq-faststream library Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/faststream.md Install the required package to enable FastStream integration with Taskiq. ```bash pip install "taskiq-faststream" ``` -------------------------------- ### Install ZeroMQ dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/brokers.md Install the necessary packages to use the ZeroMQ broker. ```bash pip install pyzmq ``` ```bash pip install "taskiq[zmq]" ``` -------------------------------- ### Install RabbitMQ Broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Install the required library for RabbitMQ integration. ```bash pip install taskiq-aio-pika ``` -------------------------------- ### Install Taskiq Source: https://github.com/taskiq-python/taskiq/blob/master/README.md Install the package using pip or directly from the git repository. ```bash pip install taskiq ``` ```bash pip install git+https://github.com/taskiq-python/taskiq ``` -------------------------------- ### Install Taskiq-Aiogram Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiogram.md Install the required library to enable Taskiq integration with Aiogram. ```bash pip install "taskiq-aiogram" ``` -------------------------------- ### Manage documentation Source: https://github.com/taskiq-python/taskiq/blob/master/docs/contrib.md Commands for installing, developing, and building the VuePress documentation. ```bash pnpm i ``` ```bash pnpm docs:dev ``` ```bash pnpm docs:build pnpm docs:serve ``` -------------------------------- ### Install aiohttp-deps Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiohttp.md Install the aiohttp-deps library using pip. ```python pip install "aiohttp-deps" ``` -------------------------------- ### Install Taskiq with MsgPack support Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Install Taskiq with the 'msgpack' extra to enable the MSGPackSerializer for reduced network data transfer. ```bash pip install "taskiq[msgpack]" ``` -------------------------------- ### Install Taskiq from Git Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Install the Taskiq library directly from its GitHub repository using pip. This is useful for development or when needing the latest unreleased features. ```bash pip install git+https://github.com/taskiq-python/taskiq.git ``` -------------------------------- ### Install Taskiq with CBOR support Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Install Taskiq with the 'cbor' extra to enable the CBORSerializer, which offers a smaller message size than JSON. ```bash pip install "taskiq[cbor]" ``` -------------------------------- ### Configure Taskiq with MSGPackSerializer Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Example of initializing an InMemoryBroker and setting it to use the MSGPackSerializer for efficient data transfer. ```python from taskiq import InMemoryBroker from taskiq.serializers import MSGPackSerializer broker = InMemoryBroker().with_serializer(MSGPackSerializer()) ``` -------------------------------- ### Install OpenTelemetry dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Install the necessary package to enable OpenTelemetry tracing support. ```bash pip install "taskiq[opentelemetry]" ``` -------------------------------- ### Install PostgreSQL schedule source Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Command to install the third-party PostgreSQL schedule source package. ```bash pip install taskiq-postgres ``` -------------------------------- ### Start scheduler command Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/scheduling-tasks.md Command to initiate the scheduler instance for a specific module. ```bash taskiq scheduler module:scheduler ``` -------------------------------- ### Install Prometheus dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Install the required packages to enable Prometheus metrics collection. ```bash pip install "prometheus_client" ``` ```bash pip install "taskiq[metrics]" ``` -------------------------------- ### Install Taskiq with ORJSON support Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Install Taskiq with the 'orjson' extra to enable the ORJSONSerializer for faster JSON processing. ```bash pip install "taskiq[orjson]" ``` -------------------------------- ### Install PostgreSQL result backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/result-backends.md Install the third-party PostgreSQL result backend package via pip. ```bash pip install taskiq-postgresql ``` -------------------------------- ### Install YDB result backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/result-backends.md Install the third-party YDB result backend package via pip. ```bash pip install taskiq-ydb ``` -------------------------------- ### Install Taskiq via pip Source: https://github.com/taskiq-python/taskiq/blob/master/docs/README.md Use this command to install the taskiq package in your Python environment. ```bash pip install taskiq ``` -------------------------------- ### Install S3 result backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/result-backends.md Install the third-party S3 result backend package via pip. ```bash pip install taskiq-aio-sqs ``` -------------------------------- ### Start scheduler with skip first run Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/scheduling-tasks.md Command to start the scheduler while preventing immediate execution of tasks upon startup. ```bash taskiq scheduler module:scheduler --skip-first-run ``` -------------------------------- ### Configure Taskiq with ORJSONSerializer Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Example of initializing an InMemoryBroker and setting it to use the ORJSONSerializer. ```python from taskiq import InMemoryBroker from taskiq.serializers import ORJSONSerializer broker = InMemoryBroker().with_serializer(ORJSONSerializer()) ``` -------------------------------- ### Install Redis result backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/result-backends.md Install the official Redis result backend package via pip. ```bash pip install taskiq-redis ``` -------------------------------- ### Implement an Async Middleware in Taskiq Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/middleware.md This example demonstrates creating an asynchronous middleware. It also inherits from `taskiq.abc.middleware.TaskiqMiddleware` and defines async methods for pre- and post-task execution. ```python from taskiq.abc.middleware import TaskiqMiddleware class MyAsyncMiddleware(TaskiqMiddleware): async def pre_task(self, task_id: str, args: tuple, kwargs: dict) -> tuple: print("Executing async middleware before task") return args, kwargs async def post_task(self, task_id: str, result: object) -> object: print("Executing async middleware after task") return result ``` -------------------------------- ### Run Taskiq Scheduler Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/cli.md Use this command to start the Taskiq scheduler. Specify the path to your scheduler object and any modules to import for task discovery. ```bash taskiq scheduler [optional module to import]... ``` ```bash taskiq scheduler my_project.broker:scheduler my_project.module1 my_project.module2 ``` -------------------------------- ### Install NATS result backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/result-backends.md Install the official NATS result backend package via pip. ```bash pip install taskiq-nats ``` -------------------------------- ### Configure Taskiq with CBORSerializer Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md Example of initializing an InMemoryBroker and setting it to use the CBORSerializer for a compact message format. ```python from taskiq import InMemoryBroker from taskiq.serializers import CBORSerializer broker = InMemoryBroker().with_serializer(CBORSerializer()) ``` -------------------------------- ### Run RabbitMQ via Docker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Start a RabbitMQ instance using Docker for local development. ```bash docker run --rm -d \ -p "5672:5672" \ -p "15672:15672" \ --env "RABBITMQ_DEFAULT_USER=guest" \ --env "RABBITMQ_DEFAULT_PASS=guest" \ --env "RABBITMQ_DEFAULT_VHOST=/" \ rabbitmq:3.8.27-management-alpine ``` ```powershell docker run --rm -d ^ -p "5672:5672" ^ -p "15672:15672" ^ --env "RABBITMQ_DEFAULT_USER=guest" ^ --env "RABBITMQ_DEFAULT_PASS=guest" ^ --env "RABBITMQ_DEFAULT_VHOST=/" ^ rabbitmq:3.8.27-management-alpine ``` -------------------------------- ### Run a Task with InMemoryBroker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Example of running a task using Taskiq with an InMemoryBroker. Ensure the startup method is called before executing tasks. This snippet demonstrates task execution and result retrieval. ```python from taskiq import TaskiqTask from broker import broker @broker.task def add(a: int, b: int) -> int: return a + b async def main(): await broker.startup() task: TaskiqTask = add.kiq(2, 2) result = await task.get_result() # wait for result print(f"Returned value: {result}") if __name__ == "__main__": import asyncio asyncio.run(main()) ``` -------------------------------- ### Run Redis via Docker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Start a Redis instance using Docker for result storage. ```bash docker run --rm -d \ -p "6379:6379" \ redis ``` ```powershell docker run --rm -d ^ -p "6379:6379" ^ redis ``` -------------------------------- ### Start Taskiq Worker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Launch the worker process to begin consuming tasks from the broker. ```bash taskiq worker broker:broker ``` -------------------------------- ### Install Taskiq with Reload Support Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/cli.md Installs Taskiq with the necessary extras for enabling the hot-reload feature, which automatically restarts the worker when code changes. ```bash pip install "taskiq[reload]" ``` ```bash poetry add taskiq -E reload ``` ```bash uv add taskiq[reload] ``` -------------------------------- ### Default Taskiq Message Format Example Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/message-format.md This is an example of the default JSON message format used by Taskiq for tasks. ```json { "task_name": "my_project.module1.task", "args": [1, 2, 3], "kwargs": {"a": 1, "b": 2, "c": 3}, "labels": { "label1": "value1", "label2": "value2" } } ``` -------------------------------- ### Task Execution Output Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md This is the expected output when running the example code that executes a Taskiq task locally. ```bash ❯ python broker.py Task execution took: 7.3909759521484375e-06 seconds. Returned value: 2 ``` -------------------------------- ### Run Taskiq Worker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/architecture-overview.md Commands to start a taskiq worker using a specific broker or file system discovery. ```bash taskiq worker my_project.broker:mybroker ``` ```bash taskiq worker test_project.broker:broker test_project.submodule.tasks test_project.utils.tasks ``` ```bash taskiq worker test_project.broker:broker -fsd ``` -------------------------------- ### Initialize Redis Pool on Worker Startup Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md This snippet demonstrates how to initialize a Redis connection pool when a Taskiq worker starts, storing it in the global state for later use. ```python from taskiq.events import TaskiqEvents from taskiq.state import TaskiqState @broker.on_event(TaskiqEvents.WORKER_STARTUP) async def startup(state: TaskiqState) -> None: # Here we store connection pool on startup for later use. state.redis = ConnectionPool.from_url("redis://localhost/1") ``` -------------------------------- ### Run Taskiq Worker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/cli.md Starts a Taskiq worker process. Specify the broker connection string and the modules containing your tasks. ```bash taskiq worker mybroker:broker_var my_project.module1 my_project.module2 ``` -------------------------------- ### Define a task for testing Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Example of a simple task definition that can be imported and tested as a normal function. ```python from your_project.tkq import broker @broker.task async def parse_int(val: str) -> int: return int(val) ``` -------------------------------- ### Run Taskiq Workers Source: https://github.com/taskiq-python/taskiq/blob/master/README.md Start worker processes to execute tasks, optionally using file discovery. ```bash taskiq worker path.to.the.module:broker ``` ```bash taskiq worker path.to.the.module:broker --fs-discover ``` -------------------------------- ### Define tasks with dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Use TaskiqDepends to inject dependencies into task functions. These examples show both Annotated and default value syntax. ```python from typing import Annotated from pathlib import Path from taskiq import TaskiqDepends from your_project.tkq import broker @broker.task async def modify_path(some_path: Annotated[Path, TaskiqDepends()]): return some_path.parent / "taskiq.py" ``` ```python from pathlib import Path from taskiq import TaskiqDepends from your_project.tkq import broker @broker.task async def modify_path(some_path: Path = TaskiqDepends()): return some_path.parent / "taskiq.py" ``` -------------------------------- ### Accessing State in Task (Default Values) Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md This example shows how to access the global state, specifically the Redis connection pool, within a Taskiq task using default values for context. ```python @broker.task async def my_task(context: Context = TaskiqDepends()) -> None: async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis: await redis.set('key', 'value') ``` -------------------------------- ### Accessing State in Task (Annotated) Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md This example shows how to access the global state, specifically the Redis connection pool, within a Taskiq task using Annotated type hints for context. ```python from typing import Annotated @broker.task async def my_task(context: Annotated[Context, TaskiqDepends()]) -> None: async with Redis(connection_pool=context.state.redis, decode_responses=True) as redis: await redis.set('key', 'value') ``` -------------------------------- ### Local Development Commands Source: https://github.com/taskiq-python/taskiq/blob/master/README.md Commands for setting up linting, testing, and documentation servers. ```bash pre-commit install ``` ```bash pytest ``` ```bash yarn install ``` ```bash yarn docs:dev ``` -------------------------------- ### Create Dynamic Worker Task in Event Loop Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/dynamic-brokers.md This example demonstrates creating a dynamic worker task within the current event loop to listen for and execute messages. The worker task is automatically cancelled upon exiting the scope. ```python import asyncio from taskiq import InMemoryBroker broker = InMemoryBroker() async def main(): task = broker.register_task(lambda x: x + 1) async with broker.receiver(): await task.kiq(5) await broker.startup() await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Initialize a Broker Source: https://github.com/taskiq-python/taskiq/blob/master/README.md Create a broker instance to communicate with the task queue backend. ```python from taskiq_nats import JetStreamBroker broker = JetStreamBroker("nats://localhost:4222", queue="my_queue") ``` -------------------------------- ### Initialize FastStream broker with Taskiq Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/faststream.md Configure a Kafka broker using FastStream and wrap it with Taskiq's BrokerWrapper. ```python from faststream import FastStream from faststream.kafka import KafkaBroker from taskiq_faststream import BrokerWrapper broker = KafkaBroker("localhost:9092") app = FastStream(broker) taskiq_broker = BrokerWrapper(broker) ``` -------------------------------- ### Configure AioPika Broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Initialize the AioPika broker and ensure the startup method is called before task execution. ```python from taskiq_aio_pika import AioPikaBroker broker = AioPikaBroker('amqp://guest:guest@localhost:5672') ``` ```python await broker.startup() ``` -------------------------------- ### Configure Kicker and Broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/architecture-overview.md Demonstrates using a kicker to override the default broker and add labels to a task execution. ```python import asyncio from taskiq.brokers.inmemory_broker import InMemoryBroker broker = InMemoryBroker() second_broker = InMemoryBroker() @broker.task async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") async def main(): # This task was initially assigned to broker, # but this time it is going to be sent using # the second broker with additional label `delay=1`. task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq() print(await task.get_result()) asyncio.run(main()) ``` -------------------------------- ### Initialize aiohttp-deps Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiohttp.md Add a startup event to your AioHTTP application to initialize the dependency injection context. ```python from aiohttp import web import aiohttp_deps app = web.Application() # This startup event makes all the magic happen. # It parses current handlers and create dependency graphs for them. app.on_startup.append(aiohttp_deps.init) web.run_app(app) ``` -------------------------------- ### Implement a custom broker template Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/broker.md Use this template to implement the required kick and listen methods along with optional startup and shutdown lifecycle hooks. ```python from typing import Any, AsyncGenerator from taskiq.abc.broker import AsyncBroker from taskiq.message import BrokerMessage class MyBroker(AsyncBroker): async def startup(self) -> None: """This method is called when the broker starts.""" await super().startup() async def shutdown(self) -> None: """This method is called when the broker stops.""" await super().shutdown() async def kick(self, message: BrokerMessage) -> None: """This method is called when a task is sent.""" # Your implementation here. pass async def listen(self) -> AsyncGenerator[bytes, None]: """This method is called when the broker starts listening.""" # Your implementation here. yield b"" ``` -------------------------------- ### Run linting Source: https://github.com/taskiq-python/taskiq/blob/master/docs/contrib.md Execute all configured linters manually. ```bash pre-commit run -a ``` -------------------------------- ### Define unawaitable task example Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md A task and a function that triggers it without awaiting the result. ```python from your_project.tkq import broker @broker.task async def parse_int(val: str) -> int: return int(val) async def parse_int_later(val: str) -> int: await parse_int.kiq(val) return 1 ``` -------------------------------- ### Register CLI Entry Points Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/cli.md Configure entry points in your project file to expose the custom command to the taskiq CLI. ```python from setuptools import setup setup( # ..., entry_points={ 'taskiq_cli': [ 'demo = my_project.cmd:MyCommand', ] } ) ``` ```toml [project.entry-points.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` ```toml [tool.poetry.plugins.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` -------------------------------- ### Initialize taskiq-aiohttp integration Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiohttp.md Initialize taskiq-aiohttp by providing the broker instance and a path to your web.Application variable or a factory function. ```python import taskiq_aiohttp broker = MyBroker() # The second argument is a path to web.Application variable. # Also you can provide here a factory function that takes no # arguments and returns an application. This function can be async. taskiq_aiohttp.init(broker, "my_project.main:app") ``` -------------------------------- ### Minimal AsyncResultBackend Implementation Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/result-backend.md Implement this class to create a custom result backend. It's recommended to only fetch logs when `with_logs=True` is explicitly requested. ```python from taskiq.abc.result_backend import AsyncResultBackend class MyResultBackend(AsyncResultBackend): async def get_result(self, task_id: str, with_logs: bool = False) -> str: # your implementation here pass async def set_result(self, task_id: str, result: str) -> None: # your implementation here pass async def is_result_ready(self, task_id: str) -> bool: # your implementation here pass async def get_logs(self, task_id: str) -> str: # your implementation here pass async def set_logs(self, task_id: str, logs: str) -> None: # your implementation here pass ``` -------------------------------- ### Initialize Taskiq Broker with Aiogram Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiogram.md Configure the Taskiq broker and define a task that utilizes the injected Aiogram Bot dependency. ```python import asyncio import taskiq_aiogram from aiogram import Bot from taskiq import TaskiqDepends from taskiq_redis import ListQueueBroker broker = ListQueueBroker("redis://localhost") # This line is going to initialize everything. taskiq_aiogram.init( broker, # This is path to the dispatcher. "bot:dp", # This is path to the bot instance. "bot:bot", # You can specify more bots here. ) @broker.task(task_name="my_task") async def my_task(chat_id: int, bot: Bot = TaskiqDepends()) -> None: print("I'm a task") await asyncio.sleep(4) await bot.send_message(chat_id, "task completed") ``` -------------------------------- ### Initialize Taskiq with FastAPI Broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-fastapi.md Initialize the taskiq-fastapi integration by providing the broker instance and the path to your FastAPI application. This should be called in your main broker file. ```python from taskiq import ZeroMQBroker import taskiq_fastapi broker = ZeroMQBroker() taskiq_fastapi.init(broker, "my_package.application:app") ``` -------------------------------- ### Schedule tasks dynamically with Redis Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/scheduling-tasks.md Demonstrates adding tasks to a Redis schedule source using time, cron, and interval-based scheduling. ```python await redis_source.startup() await my_task.schedule_by_time( redis_source, # It's better to use UTC time, or add tzinfo to datetime. datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5), # You can pass args and kwargs here as usual 11, arg2="arg2", ) ``` ```python await my_task.schedule_by_cron( redis_source, "*/5 * * * *", 11, arg2="arg2", ) ``` ```python await my_task.schedule_by_interval( redis_source, datetime.timedelta(seconds=5), 11, arg2="arg2", ) ``` -------------------------------- ### Enable Taskiq Worker Hot Reload Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/cli.md Starts a Taskiq worker with the hot-reload feature enabled, automatically restarting the worker process when source files are modified. Use --reload-dir to specify directories to watch. ```bash taskiq worker --reload my_module:broker ``` -------------------------------- ### Configure broker for automatic awaiting Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Initialize the InMemoryBroker with await_inplace=True to automatically await tasks. ```python broker = InMemoryBroker(await_inplace=True) ``` -------------------------------- ### Implement Custom Middleware Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/architecture-overview.md Create a custom middleware by subclassing TaskiqMiddleware and implementing lifecycle hooks. ```python import asyncio from taskiq.abc.middleware import TaskiqMiddleware from taskiq.message import TaskiqMessage class MyMiddleware(TaskiqMiddleware): async def pre_send(self, message: "TaskiqMessage") -> TaskiqMessage: await asyncio.sleep(1) message.labels["my_label"] = "my_value" return message def post_send(self, message: "TaskiqMessage") -> None: print(f"Message {message} was sent.") ``` -------------------------------- ### Implement a custom ScheduleSource Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/schedule-sources.md Create a new schedule source by inheriting from taskiq.abc.schedule_source.ScheduleSource. ```python import typing from taskiq.abc.schedule_source import ScheduleSource from taskiq.models import ScheduledTask class MyScheduleSource(ScheduleSource): async def get_schedules(self) -> typing.List[ScheduledTask]: return [ ScheduledTask( task_name="my_task", labels={}, args=[], kwargs={}, cron="* * * * *", ), ] ``` -------------------------------- ### FastAPI Lifespan for Broker Startup/Shutdown Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-fastapi.md Manage broker startup and shutdown using FastAPI's lifespan context manager. This is the recommended approach for handling asynchronous events during application startup and shutdown. ```python from contextlib import asynccontextmanager from fastapi import FastAPI from your_project.taskiq import broker @asynccontextmanager async def lifespan(app: FastAPI): # Startup if not broker.is_worker_process: await broker.startup() yield # Shutdown if not broker.is_worker_process: await broker.shutdown() app = FastAPI(lifespan=lifespan) ``` -------------------------------- ### Configure OpenTelemetry instrumentation Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Enable tracing using either the TaskiqInstrumentor or the OpenTelemetryMiddleware. ```python from taskiq import ZeroMQBroker from taskiq.instrumentation import TaskiqInstrumentor TaskiqInstrumentor().instrument() broker = ZeroMQBroker() ``` ```python from taskiq import ZeroMQBroker from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware broker = ZeroMQBroker().with_middlewares( OpenTelemetryMiddleware, ) ``` -------------------------------- ### Configure anyio backend fixture Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Define the anyio backend fixture in your conftest.py to support async testing. ```python @pytest.fixture def anyio_backend(): return 'asyncio' ``` -------------------------------- ### Execute Custom CLI Command Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/cli.md Run the newly registered command using the taskiq CLI. ```bash $ taskiq demo --help usage: demo [-h] [--test TEST] optional arguments: -h, --help show this help message and exit --test TEST My test parameter. ``` ```bash $ taskiq demo --test aaa Namespace(test='aaa') ``` -------------------------------- ### Implement a Sync Middleware in Taskiq Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/middleware.md Use this pattern to create a synchronous middleware. Ensure it inherits from `taskiq.abc.middleware.TaskiqMiddleware` and implements the required methods. ```python from taskiq.abc.middleware import TaskiqMiddleware class MySyncMiddleware(TaskiqMiddleware): async def pre_task(self, task_id: str, args: tuple, kwargs: dict) -> tuple: print("Executing sync middleware before task") return args, kwargs async def post_task(self, task_id: str, result: object) -> object: print("Executing sync middleware after task") return result ``` -------------------------------- ### Define and Execute Tasks Source: https://github.com/taskiq-python/taskiq/blob/master/README.md Use the broker decorator to define tasks and the kiq method to trigger them. ```python import asyncio from taskiq_nats import JetStreamBroker broker = JetStreamBroker("nats://localhost:4222", queue="my_queue2") @broker.task async def my_task(a: int, b: int) -> None: print("AB", a + b) async def main(): await broker.startup() await my_task.kiq(1, 2) await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Implement TaskiqCMD Source: https://github.com/taskiq-python/taskiq/blob/master/docs/extending-taskiq/cli.md Create a class inheriting from TaskiqCMD to define custom CLI behavior. ```python from taskiq.abc.cmd import TaskiqCMD import argparse class MyCommand(TaskiqCMD): def add_arguments(self, parser: argparse.ArgumentParser) -> None: parser.add_argument("--test", help="My test parameter.") def exec(self, args: argparse.Namespace) -> None: print(args) ``` -------------------------------- ### Run tests Source: https://github.com/taskiq-python/taskiq/blob/master/docs/contrib.md Execute the test suite using pytest or tox. ```bash pytest ``` ```bash pytest -n 2 ``` ```bash tox ``` -------------------------------- ### Manual task loop implementation Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/scheduling-tasks.md A basic approach to periodic task execution using an infinite loop and asyncio sleep. ```python while True: await heavy_task.kiq(1) await asyncio.sleep(timedelta(minutes=5).total_seconds) ``` -------------------------------- ### Configure SmartRetryMiddleware Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Register the SmartRetryMiddleware to support advanced retry strategies like jitter and exponential backoff. ```python from taskiq import ZeroMQBroker from taskiq.middlewares import SmartRetryMiddleware broker = ZeroMQBroker().with_middlewares( SmartRetryMiddleware( default_retry_count=5, default_delay=10, use_jitter=True, use_delay_exponent=True, max_delay_exponent=120 ), ) ``` -------------------------------- ### Configure SimpleRetryMiddleware Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Register the SimpleRetryMiddleware with a broker to enable basic error-based task retries. ```python from taskiq import ZeroMQBroker, SimpleRetryMiddleware broker = ZeroMQBroker().with_middlewares( SimpleRetryMiddleware(default_retry_count=3), ) ``` -------------------------------- ### Configure Redis Result Backend Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Attach the Redis result backend to the existing AioPika broker instance. ```python from taskiq_redis import RedisAsyncResultBackend broker = AioPikaBroker( "amqp://guest:guest@localhost:5672", ).with_result_backend(RedisAsyncResultBackend("redis://localhost")) ``` -------------------------------- ### Configure RedisScheduleSource Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Requires the taskiq-redis package. Used for dynamic scheduling with Redis as the storage backend. ```python from taskiq_redis import RedisScheduleSource from taskiq import TaskiqScheduler redis_source = RedisScheduleSource("redis://localhost:6379/0") scheduler = TaskiqScheduler(broker, sources=[redis_source]) ``` -------------------------------- ### Class Dependency Default Values Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Demonstrates using a class as a dependency with default parameter values. ```python from taskiq.dependencies import TaskiqDepends, TaskiqDependency class MyDependency: def __init__(self, value: int) -> None: self.value = value def get_dependency(value=TaskiqDepends(get_value)) -> MyDependency: return MyDependency(value) async def my_task(dep: MyDependency = TaskiqDepends(get_dependency)): return dep.value ``` -------------------------------- ### Test task execution with dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Verify task results when dependencies are resolved via the broker's context. ```python @pytest.mark.anyio async def test_modify_path(): task = await modify_path.kiq() result = await task.wait_result() assert str(result.return_value).endswith("taskiq.py") ``` -------------------------------- ### FastAPI on_event for Broker Startup/Shutdown (Deprecated) Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-fastapi.md Handle broker startup and shutdown using FastAPI's on_event decorators. While functional, the lifespan context manager is the preferred method for managing application events. ```python from fastapi import FastAPI from your_project.taskiq import broker app = FastAPI() @app.on_event("startup") async def app_startup(): if not broker.is_worker_process: await broker.startup() @app.on_event("shutdown") async def app_shutdown(): if not broker.is_worker_process: await broker.shutdown() ``` -------------------------------- ### Configure NATSKeyValueScheduleSource Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Requires the taskiq-nats package. Uses NATS Key-Value store for dynamic schedule management. ```python from taskiq_nats import NATSKeyValueScheduleSource from taskiq.scheduler import TaskiqScheduler broker = ... scheduler = TaskiqScheduler( broker=broker, sources=[NATSKeyValueScheduleSource(broker)], ) ``` -------------------------------- ### Set environment variables for testing Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Configure the environment variable for testing across different operating systems. ```bash export ENVIRONMENT="pytest" pytest -vv ``` ```powershell $env:ENVIRONMENT = 'pytest' pytest -vv ``` -------------------------------- ### Define Task Labels Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/architecture-overview.md Shows two equivalent ways to define task labels: via the decorator or via the kicker instance. ```python @broker.task(my_label=1, label2="something") async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") async def main(): await my_async_task.kiq() ``` ```python @broker.task async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") async def main(): await my_async_task.kicker().with_labels( my_label=1, label2="something", ).kiq() ``` -------------------------------- ### Execute Task Script Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Run the Python script to trigger task execution. ```bash $ python broker.py Task execution took: 0.0 seconds. Returned value: None ``` -------------------------------- ### Implement Taskiq in Aiogram Bot Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiogram.md Manage broker lifecycle events within the Aiogram startup and shutdown hooks to prevent infinite loops. ```python import asyncio import logging import sys from aiogram import Bot, Dispatcher, types from aiogram.filters import Command from tkq import broker, my_task dp = Dispatcher() bot = Bot(token="TOKEN") # Taskiq calls this function when starting the worker. @dp.startup() async def setup_taskiq(bot: Bot, *_args, **_kwargs): # Here we check if it's a client-side, # Because otherwise you're going to # create infinite loop of startup events. if not broker.is_worker_process: logging.info("Setting up taskiq") await broker.startup() # Taskiq calls this function when shutting down the worker. @dp.shutdown() async def shutdown_taskiq(bot: Bot, *_args, **_kwargs): if not broker.is_worker_process: logging.info("Shutting down taskiq") await broker.shutdown() ## Simple command to handle @dp.message(Command("task")) async def message(message: types.Message): await my_task.kiq(message.chat.id) ## Main function that starts the bot. async def main(): await dp.start_polling(bot) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, stream=sys.stdout) asyncio.run(main()) ``` -------------------------------- ### Annotated Dependencies Tree Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Demonstrates dependency declaration using type annotations for Python 3.10+. ```python from taskiq.dependencies import TaskiqDepends def common_dep(): return 1 def dep1(common_dep: int = TaskiqDepends(common_dep)): return common_dep + 1 def dep2(common_dep: int = TaskiqDepends(common_dep)): return common_dep + 2 async def my_task(dep1: int, dep2: int): return dep1 + dep2 ``` -------------------------------- ### Async Generator Dependency Annotated Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Demonstrates using an asynchronous generator for dependencies with type annotations (Python 3.10+). ```python from taskiq.dependencies import TaskiqDepends from typing import AsyncGenerator async def async_setup_resource() -> AsyncGenerator[str, None]: # Async setup logic here resource = "my_async_resource" yield resource # Async teardown logic here print("Async resource cleaned up") async def my_task(resource: str = TaskiqDepends(async_setup_resource)): print(f"Using async resource: {resource}") ``` -------------------------------- ### RedisScheduleSource Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Official schedule source using Redis for dynamic schedule management. Requires the `taskiq-redis` package. ```APIDOC ## RedisScheduleSource ### Description This source is capable of adding new schedules in runtime. It uses Redis as a storage for schedules. To use this source you need to install `taskiq-redis` package. ### Usage ```python from taskiq_redis import RedisScheduleSource from taskiq import TaskiqScheduler redis_source = RedisScheduleSource("redis://localhost:6379/0") scheduler = TaskiqScheduler(broker, sources=[redis_source]) ``` For more information on how to use dynamic schedule sources read [Dynamic scheduling section](../guide/scheduling-tasks.md#dynamic-scheduling). ``` -------------------------------- ### No Cache Default Value Dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Demonstrates dependency declaration without caching using default values. ```python from taskiq.dependencies import TaskiqDepends def common_dep(): return 1 def dep1(common_dep=TaskiqDepends(common_dep)): return common_dep + 1 def dep2(common_dep=TaskiqDepends(common_dep, use_cache=False)): return common_dep + 2 async def my_task(dep1: int, dep2: int): return dep1 + dep2 ``` -------------------------------- ### Schedule tasks with custom labels Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/scheduling-tasks.md Use the Kicker instance to attach labels to a dynamically scheduled task. ```python schedule = ( await my_task.kicker() .with_labels(label1="value") .schedule_by_time( redis_source, datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=10), 11, arg2="arg2", ) ) ``` -------------------------------- ### Test client fixture with Taskiq context Source: https://github.com/taskiq-python/taskiq/blob/master/docs/framework_integrations/taskiq-with-aiohttp.md A pytest fixture for creating a test client with AioHTTP and Taskiq, ensuring the dependency injection context is populated for InMemoryBroker. ```python import taskiq_aiohttp @pytest.fixture async def test_client( app: web.Application, ) -> AsyncGenerator[TestClient, None]: """ Create a test client. This function creates a TestServer and a test client for the application. Also this fixture populates context with needed variables. :param app: current application. :yield: ready to use client. """ loop = asyncio.get_running_loop() server = TestServer(app) client = TestClient(server, loop=loop) await client.start_server() # This is important part. # Since InMemoryBroker doesn't # run as a worker process, we have to populate # broker's context by hand. taskiq_aiohttp.populate_context( broker=broker, server=server.runner.server, app=app, loop=loop, ) yield client broker.custom_dependency_context = {} await client.close() ``` -------------------------------- ### No Cache Annotated Dependencies Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Illustrates dependency declaration without caching for Python 3.10+. ```python from taskiq.dependencies import TaskiqDepends def common_dep(): return 1 def dep1(common_dep: int = TaskiqDepends(common_dep)): return common_dep + 1 def dep2(common_dep: int = TaskiqDepends(common_dep, use_cache=False)): return common_dep + 2 async def my_task(dep1: int, dep2: int): return dep1 + dep2 ``` -------------------------------- ### Register Task Dynamically with Broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/dynamic-brokers.md Use the `register_task` method to define and assign tasks to a broker instance. Tasks created this way are not discoverable by the `taskiq worker` command. ```python from taskiq import InMemoryBroker broker = InMemoryBroker() @broker.task def simple_task(): return 1 async def main(): task = broker.register_task(lambda x: x + 1) await task.kiq(5) await simple_task.kiq() await broker.startup() await broker.shutdown() if __name__ == "__main__": import asyncio asyncio.run(main()) ``` -------------------------------- ### Configure PrometheusMiddleware Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Register the PrometheusMiddleware to expose worker metrics on a specified address and port. ```python from taskiq import ZeroMQBroker, PrometheusMiddleware broker = ZeroMQBroker().with_middlewares( PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000), ) ``` -------------------------------- ### Verify Task Execution Results Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Run the script again to verify that results are correctly stored and retrieved. ```bash $ python broker.py Task execution took: 1.0013580322265625e-05 seconds. Returned value: 2 ``` -------------------------------- ### Dependency Injection for Redis (Default Values) Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md This snippet defines a dependency function to retrieve a Redis client from the state and injects it into a Taskiq task using default values, enabling autocompletion. ```python def redis_dep(context: Context = TaskiqDepends()) -> Redis: return Redis(connection_pool=context.state.redis, decode_responses=True) @broker.task async def my_task(redis: Redis = TaskiqDepends(redis_dep)) -> None: await redis.set('key', 'value') ``` -------------------------------- ### Declare InMemoryBroker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/getting-started.md Declare a broker instance using InMemoryBroker. This broker is suitable for local development and testing as it does not involve network communication. ```python # broker.py from taskiq import InMemoryBroker broker = InMemoryBroker() ``` -------------------------------- ### Enable LabelScheduleSource Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Registers the label-based source with the scheduler to parse task labels for scheduling information. ```python from taskiq.scheduler import TaskiqScheduler from taskiq.schedule_sources import LabelScheduleSource broker = ... scheduler = TaskiqScheduler( broker=broker, sources=[LabelScheduleSource(broker)], ) ``` -------------------------------- ### NATS Schedule Source Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Official schedule source using NATS for dynamic schedule management. Requires the `taskiq-nats` package. ```APIDOC ## NATS Schedule Source ### Description This source is capable of adding new schedules in runtime. It uses NATS as a storage for schedules. To use this source you need to install `taskiq-nats` package. ### Usage ```python from taskiq_nats import NATSKeyValueScheduleSource from taskiq.scheduler import TaskiqScheduler broker = ... scheduler = TaskiqScheduler( broker=broker, sources=[NATSKeyValueScheduleSource(broker)], ) ``` This schedule source doesn't use `schedule` label on tasks. To add new schedules, you need to call `add_schedule` method on the source. ``` -------------------------------- ### Assign Custom Task Names Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/architecture-overview.md Demonstrates assigning a unique task name using the task decorator. ```python @broker.task(task_name="my_tasks.add_one", label1=1) async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") ``` -------------------------------- ### Define task schedule labels Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/schedule-sources.md Configures task scheduling directly via decorator labels. Supports cron, interval, or specific time scheduling. ```python @broker.task( schedule=[ { "cron": "* * * * *", # type: str, either cron, interval or time should be specified. "cron_offset": None, # type: str | timedelta | None, can be omitted. "interval": None, # type: int | timedelta, either cron, interval or time should be specified. "time": None, # type: datetime | None, either cron, interval or time should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. "labels": {}, # type: Dict[str, Any] | None, can be omitted. "schedule_id": "every_minute", # type: str | None, can be omitted. } ] ) async def my_task(): ... ``` -------------------------------- ### Class Dependency Annotated Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Shows how to use a class as a dependency with type annotations (Python 3.10+). ```python from taskiq.dependencies import TaskiqDepends, TaskiqDependency class MyDependency: def __init__(self, value: int) -> None: self.value = value def get_dependency(value: int = TaskiqDepends(get_value)) -> MyDependency: return MyDependency(value) async def my_task(dep: MyDependency = TaskiqDepends(get_dependency)): return dep.value ``` -------------------------------- ### Dependency Injection for Redis (Annotated) Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md This snippet defines a dependency function to retrieve a Redis client from the state and injects it into a Taskiq task using Annotated type hints, enabling autocompletion. ```python from typing import Annotated def redis_dep(context: Annotated[Context, TaskiqDepends()]) -> Redis: return Redis(connection_pool=context.state.redis, decode_responses=True) @broker.task async def my_task(redis: Annotated[Redis, TaskiqDepends(redis_dep)]) -> None: await redis.set('key', 'value') ``` -------------------------------- ### Configure shared task broker Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/brokers.md Set the default broker for shared tasks or specify one during execution. ```python from taskiq.brokers.shared_broker import async_shared_broker async_shared_broker.default_broker(broker) ``` ```python await my_task.kicker().with_broker(broker).kiq() ``` -------------------------------- ### Enable SmartRetry on a task Source: https://github.com/taskiq-python/taskiq/blob/master/docs/available-components/middlewares.md Configure specific retry parameters directly on the task decorator. ```python @broker.task(retry_on_error=True, max_retries=10, delay=15) async def my_task(): raise Exception("Error, retrying!") ``` -------------------------------- ### Async Generator Dependency Default Values Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/state-and-deps.md Shows using an asynchronous generator for dependencies with default parameter values. ```python from taskiq.dependencies import TaskiqDepends from typing import AsyncGenerator async def async_setup_resource() -> AsyncGenerator[str, None]: # Async setup logic here resource = "my_async_resource" yield resource # Async teardown logic here print("Async resource cleaned up") async def my_task(resource=TaskiqDepends(async_setup_resource)): print(f"Using async resource: {resource}") ``` -------------------------------- ### Test tasks manually Source: https://github.com/taskiq-python/taskiq/blob/master/docs/guide/testing-taskiq.md Test tasks by passing dependencies directly as arguments to the function. ```python import pytest from your_project.tkq import broker from pathlib import Path @pytest.mark.anyio async def test_modify_path(): modified = await modify_path(Path.cwd()) assert str(modified).endswith("taskiq.py") ```