### Install dependencies Source: https://taskiq-python.github.io/contrib.html Install project dependencies using uv. ```bash uv sync --all-extras ``` -------------------------------- ### Install Taskiq from Git Source: https://taskiq-python.github.io/guide/getting-started.html Install Taskiq directly from its GitHub repository using pip. ```bash pip install git+https://github.com/taskiq-python/taskiq.git ``` -------------------------------- ### Install taskiq-faststream Source: https://taskiq-python.github.io/framework_integrations/faststream.html Install the necessary library to enable integration between Taskiq and FastStream. ```bash pip install "taskiq-faststream" ``` -------------------------------- ### Start the scheduler via CLI Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Command to start the scheduler process. ```bash taskiq scheduler module:scheduler ``` -------------------------------- ### Install taskiq-aiohttp Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiohttp.html Install the taskiq-aiohttp library to integrate Taskiq with your AioHTTP application. ```bash pip install "taskiq-aiohttp" ``` -------------------------------- ### Install Taskiq with ZMQ extra Source: https://taskiq-python.github.io/available-components/brokers.html Install Taskiq with the 'zmq' extra to include ZeroMQ support. This simplifies the installation process for using the ZeroMQ broker. ```bash pip install "taskiq[zmq]" ``` -------------------------------- ### Install Prometheus Client Source: https://taskiq-python.github.io/available-components/middlewares.html Install the `prometheus_client` package to enable Prometheus metrics collection for Taskiq workers. ```bash pip install "prometheus_client" ``` -------------------------------- ### Install aiohttp-deps Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiohttp.html Install the aiohttp-deps library to enable dependency injection in AioHTTP. ```bash pip install "aiohttp-deps" ``` -------------------------------- ### Manage documentation Source: https://taskiq-python.github.io/contrib.html Commands for installing, developing, and building the documentation site. ```bash pnpm i ``` ```bash pnpm docs:dev ``` ```bash pnpm docs:build pnpm docs:serve ``` -------------------------------- ### Install taskiq-postgres Source: https://taskiq-python.github.io/available-components/schedule-sources.html Install the PostgreSQL schedule source package using pip. ```bash pip install taskiq-postgres ``` -------------------------------- ### Install Taskiq with OpenTelemetry Extras Source: https://taskiq-python.github.io/available-components/middlewares.html Install Taskiq with the `opentelemetry` extra to enable OpenTelemetry tracing capabilities. ```bash pip install "taskiq[opentelemetry]" ``` -------------------------------- ### Aiogram Bot Setup with Taskiq Integration Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiogram.html Set up an Aiogram bot and dispatcher, integrating them with Taskiq. This includes starting and shutting down the broker during the bot's lifecycle. ```python import asyncio import logging import sys from aiogram import Bot, Dispatcher, types from aiogram.filters import Command from tkq import broker, my_task dp = Dispatcher() bot = Bot(token="TOKEN") # Taskiq calls this function when starting the worker. @dp.startup() async def setup_taskiq(bot: Bot, *_args, **_kwargs): # Here we check if it's a client-side, # Because otherwise you're going to # create infinite loop of startup events. if not broker.is_worker_process: logging.info("Setting up taskiq") await broker.startup() # Taskiq calls this function when shutting down the worker. @dp.shutdown() async def shutdown_taskiq(bot: Bot, *_args, **_kwargs): if not broker.is_worker_process: logging.info("Shutting down taskiq") await broker.shutdown() ## Simple command to handle @dp.message(Command("task")) async def message(message: types.Message): await my_task.kiq(message.chat.id) ## Main function that starts the bot. async def main(): await dp.start_polling(bot) if __name__ == "__main__": logging.basicConfig(level=logging.INFO, stream=sys.stdout) asyncio.run(main()) ``` -------------------------------- ### Define and Run Task with InMemoryBroker Source: https://taskiq-python.github.io/guide/getting-started.html This example demonstrates defining a task, initializing the broker, sending a task, and retrieving its result. Always call `broker.startup()` before use and `broker.shutdown()` afterwards. ```python # broker.py import asyncio from taskiq import InMemoryBroker broker = InMemoryBroker() @broker.task async def add_one(value: int) -> int: return value + 1 async def main() -> None: # Never forget to call startup in the beginning. await broker.startup() # Send the task to the broker. task = await add_one.kiq(1) # Wait for the result. result = await task.wait_result(timeout=2) print(f"Task execution took: {result.execution_time} seconds.") if not result.is_err: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Initialize AioPika Broker Source: https://taskiq-python.github.io/guide/getting-started.html Basic setup for an AioPika broker and a task definition. ```python # broker.py import asyncio from taskiq_aio_pika import AioPikaBroker broker = AioPikaBroker("amqp://guest:guest@localhost:5672") @broker.task async def add_one(value: int) -> int: return value + 1 async def main() -> None: await broker.startup() # Send the task to the broker. task = await add_one.kiq(1) # Wait for the result. result = await task.wait_result(timeout=2) print(f"Task execution took: {result.execution_time} seconds.") if not result.is_err: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Install PostgreSQL result backend Source: https://taskiq-python.github.io/available-components/result-backends.html Install the third-party PostgreSQL result backend package for Taskiq. ```bash pip install taskiq-postgresql ``` -------------------------------- ### Install Taskiq with Metrics Extras Source: https://taskiq-python.github.io/available-components/middlewares.html Install Taskiq with the `metrics` extra to include Prometheus client dependencies. ```bash pip install "taskiq[metrics]" ``` -------------------------------- ### Install S3 result backend Source: https://taskiq-python.github.io/available-components/result-backends.html Install the third-party S3 result backend package for Taskiq. ```bash pip install taskiq-aio-sqs ``` -------------------------------- ### Install Taskiq-Aiogram Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiogram.html Install the taskiq-aiogram library using pip. This is the first step to enable the integration. ```bash pip install "taskiq-aiogram" ``` -------------------------------- ### Install Taskiq with CBOR support Source: https://taskiq-python.github.io/guide/message-format.html Install Taskiq with the 'cbor' extra to enable the CBORSerializer, which offers a smaller message size than JSON. ```bash pip install "taskiq[cbor]" ``` -------------------------------- ### Install Redis result backend Source: https://taskiq-python.github.io/available-components/result-backends.html Install the official Redis result backend package for Taskiq. ```bash pip install taskiq-redis ``` -------------------------------- ### Install YDB result backend Source: https://taskiq-python.github.io/available-components/result-backends.html Install the third-party YDB result backend package for Taskiq. ```bash pip install taskiq-ydb ``` -------------------------------- ### Initialize Taskiq Scheduler and Broker Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Setup the broker and scheduler instances required for task execution and scheduling. ```python # Here's the broker that is going to execute tasks broker = ListQueueBroker("redis://localhost:6379/0") # Here's the source that is used to store scheduled tasks redis_source = ListRedisScheduleSource("redis://localhost:6379/0") # And here's the scheduler that is used to query scheduled sources scheduler = TaskiqScheduler(broker, sources=[redis_source]) @broker.task async def my_task(arg1: int, arg2: str) -> None: """Example task.""" print("Hello from my_task!", arg1, arg2) ``` -------------------------------- ### Install NATS result backend Source: https://taskiq-python.github.io/available-components/result-backends.html Install the official NATS result backend package for Taskiq. ```bash pip install taskiq-nats ``` -------------------------------- ### Run RabbitMQ via Docker Source: https://taskiq-python.github.io/guide/getting-started.html Commands to start a RabbitMQ instance for the broker. ```bash docker run --rm -d \ -p "5672:5672" \ -p "15672:15672" \ --env "RABBITMQ_DEFAULT_USER=guest" \ --env "RABBITMQ_DEFAULT_PASS=guest" \ --env "RABBITMQ_DEFAULT_VHOST=/" \ rabbitmq:3.8.27-management-alpine ``` ```bash docker run --rm -d ^ -p "5672:5672" ^ -p "15672:15672" ^ --env "RABBITMQ_DEFAULT_USER=guest" ^ --env "RABBITMQ_DEFAULT_PASS=guest" ^ --env "RABBITMQ_DEFAULT_VHOST=/" ^ rabbitmq:3.8.27-management-alpine ``` -------------------------------- ### Configure Taskiq with MSGPackSerializer Source: https://taskiq-python.github.io/guide/message-format.html Example of initializing an InMemoryBroker and setting the MSGPackSerializer for message serialization. ```python from taskiq import InMemoryBroker from taskiq.serializers import MSGPackSerializer broker = InMemoryBroker().with_serializer(MSGPackSerializer()) ``` -------------------------------- ### Define a basic task with AioPikaBroker Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Initial setup for a task using the AioPikaBroker. ```python from taskiq_aio_pika import AioPikaBroker broker = AioPikaBroker("amqp://guest:guest@localhost:5672/") @broker.task async def heavy_task(value: int) -> int: return value + 1 ``` -------------------------------- ### Install Taskiq with MsgPack support Source: https://taskiq-python.github.io/guide/message-format.html Install Taskiq with the 'msgpack' extra to enable the MSGPackSerializer for potentially smaller message sizes. ```bash pip install "taskiq[msgpack]" ``` -------------------------------- ### Initialize and Access Redis Connection Pool with TaskiqState Source: https://taskiq-python.github.io/guide/state-and-deps.html This example demonstrates initializing a Redis connection pool during worker startup and accessing it within tasks using TaskiqState. It also shows how to properly shut down the pool. ```python import asyncio from typing import Annotated from redis.asyncio import ConnectionPool, Redis # type: ignore from taskiq_aio_pika import AioPikaBroker from taskiq_redis import RedisAsyncResultBackend from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState # To run this example, please install: # * taskiq # * taskiq-redis # * taskiq-aio-pika broker = AioPikaBroker( "amqp://localhost", ).with_result_backend(RedisAsyncResultBackend("redis://localhost")) @broker.on_event(TaskiqEvents.WORKER_STARTUP) async def startup(state: TaskiqState) -> None: # Here we store connection pool on startup for later use. state.redis = ConnectionPool.from_url("redis://localhost/1") @broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) async def shutdown(state: TaskiqState) -> None: # Here we close our pool on shutdown event. await state.redis.disconnect() @broker.task async def get_val( key: str, context: Annotated[Context, TaskiqDepends()], ) -> str | None: # Now we can use our pool. redis = Redis(connection_pool=context.state.redis, decode_responses=True) return await redis.get(key) @broker.task async def set_val( key: str, value: str, context: Annotated[Context, TaskiqDepends()], ) -> None: # Now we can use our pool to set value. await Redis(connection_pool=context.state.redis).set(key, value) async def main() -> None: await broker.startup() set_task = await set_val.kiq("key", "value") set_result = await set_task.wait_result(with_logs=True) if set_result.is_err: print(set_result.log) raise ValueError("Cannot set value in redis. See logs.") get_task = await get_val.kiq("key") get_res = await get_task.wait_result() print(f"Got redis value: {get_res.return_value}") await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` ```python import asyncio from redis.asyncio import ConnectionPool, Redis # type: ignore from taskiq_aio_pika import AioPikaBroker from taskiq_redis import RedisAsyncResultBackend from taskiq import Context, TaskiqDepends, TaskiqEvents, TaskiqState # To run this example, please install: # * taskiq # * taskiq-redis # * taskiq-aio-pika broker = AioPikaBroker( "amqp://localhost", ).with_result_backend(RedisAsyncResultBackend("redis://localhost")) @broker.on_event(TaskiqEvents.WORKER_STARTUP) async def startup(state: TaskiqState) -> None: # Here we store connection pool on startup for later use. state.redis = ConnectionPool.from_url("redis://localhost/1") @broker.on_event(TaskiqEvents.WORKER_SHUTDOWN) async def shutdown(state: TaskiqState) -> None: # Here we close our pool on shutdown event. await state.redis.disconnect() @broker.task async def get_val(key: str, context: Context = TaskiqDepends()) -> str | None: # Now we can use our pool. redis = Redis(connection_pool=context.state.redis, decode_responses=True) return await redis.get(key) @broker.task async def set_val(key: str, value: str, context: Context = TaskiqDepends()) -> None: # Now we can use our pool to set value. await Redis(connection_pool=context.state.redis).set(key, value) async def main() -> None: await broker.startup() set_task = await set_val.kiq("key", "value") set_result = await set_task.wait_result(with_logs=True) if set_result.is_err: print(set_result.log) raise ValueError("Cannot set value in redis. See logs.") get_task = await get_val.kiq("key") get_res = await get_task.wait_result() print(f"Got redis value: {get_res.return_value}") await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Install Taskiq Source: https://taskiq-python.github.io/ Install Taskiq using pip. This is the standard method for adding the library to your Python project. ```bash pip install taskiq ``` -------------------------------- ### Start Taskiq Worker Source: https://taskiq-python.github.io/guide/getting-started.html Command to start the worker process using the CLI. ```bash taskiq worker broker:broker ``` -------------------------------- ### Install Taskiq with ORJSON support Source: https://taskiq-python.github.io/guide/message-format.html Install Taskiq with the 'orjson' extra to enable the ORJSONSerializer for faster JSON processing. ```bash pip install "taskiq[orjson]" ``` -------------------------------- ### Run Taskiq Scheduler Source: https://taskiq-python.github.io/guide/cli.html Use this command to start the Taskiq scheduler. Specify the path to your scheduler object and any modules to import tasks from. ```bash taskiq scheduler [optional module to import]... ``` ```bash taskiq scheduler my_project.broker:scheduler my_project.module1 my_project.module2 ``` -------------------------------- ### Install PyZMQ for ZeroMQBroker Source: https://taskiq-python.github.io/available-components/brokers.html Install the pyzmq library to use the ZeroMQ broker. This is required for communication between worker and client processes using ZMQ. ```bash pip install pyzmq ``` -------------------------------- ### Initialize Redis schedule source Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Setup for using Redis as a schedule source for dynamic scheduling. ```python from taskiq_redis import ListQueueBroker, ListRedisScheduleSource from taskiq import TaskiqScheduler ``` -------------------------------- ### Start the scheduler with skip-first-run flag Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Command to start the scheduler while skipping immediate execution of past-due tasks. ```bash taskiq scheduler module:scheduler --skip-first-run ``` -------------------------------- ### Install AioPikaBroker for RabbitMQ Source: https://taskiq-python.github.io/available-components/brokers.html Install the taskiq-aio-pika package to use RabbitMQ as a broker. This package provides support for RabbitMQ messaging. ```bash pip install taskiq-aio-pika ``` -------------------------------- ### Sync Taskiq Middleware Example Source: https://taskiq-python.github.io/extending-taskiq/middleware.html Implement this middleware for synchronous operations. It includes methods for startup, shutdown, pre-execution, and post-save hooks, each with a simulated delay. ```python from time import sleep from typing import Any from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult class MyMiddleware(TaskiqMiddleware): def startup(self) -> None: print("RUN STARTUP") sleep(1) def shutdown(self) -> None: print("RUN SHUTDOWN") sleep(1) def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage: sleep(1) return message def post_save(self, message: "TaskiqMessage", result: "TaskiqResult[Any]") -> None: sleep(1) print("Post save") ``` -------------------------------- ### Run Redis via Docker Source: https://taskiq-python.github.io/guide/getting-started.html Commands to start a Redis instance for result storage. ```bash docker run --rm -d \ -p "6379:6379" \ redis ``` ```bash docker run --rm -d ^ -p "6379:6379" ^ redis ``` -------------------------------- ### Configure Taskiq with CBORSerializer Source: https://taskiq-python.github.io/guide/message-format.html Example of initializing an InMemoryBroker and setting the CBORSerializer for message serialization. ```python from taskiq import InMemoryBroker from taskiq.serializers import CBORSerializer broker = InMemoryBroker().with_serializer(CBORSerializer()) ``` -------------------------------- ### Run Taskiq Worker Source: https://taskiq-python.github.io/guide/cli.html Starts a Taskiq worker process. Specify the broker connection string and the modules containing your tasks. The worker will then connect to the broker and start processing tasks. ```bash taskiq worker mybroker:broker_var my_project.module1 my_project.module2 ``` -------------------------------- ### Install Taskiq with Reload Support Source: https://taskiq-python.github.io/guide/cli.html Installs Taskiq with the necessary extras for hot-reloading worker processes. This avoids manual restarts when task code changes. ```bash pip install "taskiq[reload]" ``` ```bash poetry add taskiq -E reload ``` ```bash uv add taskiq[reload] ``` -------------------------------- ### Configure Taskiq with ORJSONSerializer Source: https://taskiq-python.github.io/guide/message-format.html Example of initializing an InMemoryBroker and setting the ORJSONSerializer for message serialization. ```python from taskiq import InMemoryBroker from taskiq.serializers import ORJSONSerializer broker = InMemoryBroker().with_serializer(ORJSONSerializer()) ``` -------------------------------- ### Define tasks with dependencies Source: https://taskiq-python.github.io/guide/testing-taskiq.html Examples of defining tasks using Annotated or default values with TaskiqDepends. ```python from typing import Annotated from pathlib import Path from taskiq import TaskiqDepends from your_project.tkq import broker @broker.task async def modify_path(some_path: Annotated[Path, TaskiqDepends()]): return some_path.parent / "taskiq.py" ``` ```python from pathlib import Path from taskiq import TaskiqDepends from your_project.tkq import broker @broker.task async def modify_path(some_path: Path = TaskiqDepends()): return some_path.parent / "taskiq.py" ``` -------------------------------- ### Implement a Custom Taskiq Schedule Source Source: https://taskiq-python.github.io/extending-taskiq/schedule-sources.html This is a minimal example of a schedule source that extends the `taskiq.abc.schedule_source.ScheduleSource` abstract class. It demonstrates the required `get_schedules` method and optional methods like `startup`, `shutdown`, `add_schedule`, `delete_schedule`, `pre_send`, and `post_send`. ```python from taskiq import ScheduledTask, ScheduleSource class MyScheduleSource(ScheduleSource): async def startup(self) -> None: """Do something when starting broker.""" async def shutdown(self) -> None: """Do something on shutdown.""" async def get_schedules(self) -> list["ScheduledTask"]: # Here you must return list of scheduled tasks from your source. return [ ScheduledTask( task_name="", labels={}, args=[], kwargs={}, cron="* * * * *", ), ] # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. async def add_schedule(self, schedule: "ScheduledTask") -> None: print("New schedule added:", schedule) # This method is completely optional, but if you want to support # schedule cancelation, you must implement it. async def delete_schedule(self, schedule_id: str) -> None: print("Deleting schedule:", schedule_id) # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. async def pre_send(self, task: "ScheduledTask") -> None: """ Actions to execute before task will be sent to broker. This method may raise ScheduledTaskCancelledError. This cancels the task execution. :param task: task that will be sent """ # This method is optional. You may not implement this. # It's just a helper to people to be able to interact with your source. async def post_send(self, task: "ScheduledTask") -> None: """ Actions to execute after task was sent to broker. :param task: task that just have sent """ ``` -------------------------------- ### Run Taskiq Worker with Specific Broker Source: https://taskiq-python.github.io/guide/architecture-overview.html Shows the command-line interface command to start a Taskiq worker, specifying the path to the broker instance. Replace `my_project.broker:mybroker` with your actual module and variable name. ```bash taskiq worker my_project.broker:mybroker ``` -------------------------------- ### Async Taskiq Middleware Example Source: https://taskiq-python.github.io/extending-taskiq/middleware.html Implement this middleware for asynchronous operations. It mirrors the sync version but uses `async def` and `await` for non-blocking execution of hooks. ```python from asyncio import sleep from typing import Any from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult class MyMiddleware(TaskiqMiddleware): async def startup(self) -> None: print("RUN STARTUP") await sleep(1) async def shutdown(self) -> None: print("RUN SHUTDOWN") await sleep(1) async def pre_execute(self, message: "TaskiqMessage") -> TaskiqMessage: await sleep(1) return message async def post_save( self, message: "TaskiqMessage", result: "TaskiqResult[Any]", ) -> None: await sleep(1) print("Post save") ``` -------------------------------- ### Default Taskiq Message Format Example Source: https://taskiq-python.github.io/guide/message-format.html This is an example of the default JSON structure used for Taskiq messages, including task name, arguments, and labels. ```json { "task_name": "my_project.module1.task", "args": [1, 2, 3], "kwargs": {"a": 1, "b": 2, "c": 3}, "labels": { "label1": "value1", "label2": "value2" } } ``` -------------------------------- ### Implement Custom AsyncResultBackend in Taskiq Source: https://taskiq-python.github.io/extending-taskiq/result-backend.html Provides a minimal example of a custom result backend by inheriting from `AsyncResultBackend`. Implement `startup`, `shutdown`, `set_result`, `get_result`, and `is_result_ready` methods. ```python from typing import TypeVar from taskiq import TaskiqResult from taskiq.abc.result_backend import AsyncResultBackend _ReturnType = TypeVar("_ReturnType") class MyResultBackend(AsyncResultBackend[_ReturnType]): async def startup(self) -> None: """Do something when starting broker.""" async def shutdown(self) -> None: """Do something on shutdown.""" async def set_result( self, task_id: str, result: TaskiqResult[_ReturnType], ) -> None: """ Set result in your backend. :param task_id: current task id. :param result: result of execution. """ async def get_result( self, task_id: str, with_logs: bool = False, ) -> TaskiqResult[_ReturnType]: """ Here you must retrieve result by id. Logs is a part of a result. Here we have a parameter whether you want to fetch result with logs or not, because logs can have a lot of info and sometimes it's critical to get only needed information. :param task_id: id of a task. :param with_logs: whether to fetch logs. :return: result. """ return ... # type: ignore async def is_result_ready( self, task_id: str, ) -> bool: """ Check if result exists. This function must check whether result is available in your result backend without fetching the result. :param task_id: id of a task. :return: True if result is ready. """ return ... # type: ignore ``` -------------------------------- ### Custom Taskiq Middleware Example Source: https://taskiq-python.github.io/guide/architecture-overview.html Implement custom logic by subclassing TaskiqMiddleware and overriding hook methods like pre_send and post_send. Hooks can be sync or async. Exceptions in middlewares are not caught by Taskiq. ```python import asyncio from taskiq.abc.middleware import TaskiqMiddleware from taskiq.message import TaskiqMessage class MyMiddleware(TaskiqMiddleware): async def pre_send(self, message: "TaskiqMessage") -> TaskiqMessage: await asyncio.sleep(1) message.labels["my_label"] = "my_value" return message def post_send(self, message: "TaskiqMessage") -> None: print(f"Message {message} was sent.") ``` -------------------------------- ### Implement message acknowledgement in listen Source: https://taskiq-python.github.io/extending-taskiq/broker.html Example of yielding AckableMessage objects within the listen method to support message acknowledgement. ```python async def listen(self) -> AsyncGenerator[AckableMessage, None]: for message in self.my_channel: yield AckableMessage( data=message.bytes, # Ack is a function that takes no parameters. # So you either set here method of a message, # or you can make a closure. ack=message.ack, ) ``` -------------------------------- ### Declare InMemoryBroker Source: https://taskiq-python.github.io/guide/getting-started.html Create a Python module to declare your Taskiq broker. The InMemoryBroker is suitable for local development without network setup. ```python # broker.py from taskiq import InMemoryBroker broker = InMemoryBroker() ``` -------------------------------- ### Register Custom Command with Setuptools Source: https://taskiq-python.github.io/extending-taskiq/cli.html Add an entry point for your custom command in `setup.py` under the `taskiq_cli` group. This makes your command discoverable by the Taskiq CLI. ```python from setuptools import setup setup( # ..., entry_points={ 'taskiq_cli': [ 'demo = my_project.cmd:MyCommand', ] } ) ``` -------------------------------- ### Initialize Taskiq with FastAPI Application Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-fastapi.html Call the `init` function in your main broker file, providing the broker instance and the path to your FastAPI application. ```python from taskiq import ZeroMQBroker import taskiq_fastapi broker = ZeroMQBroker() taskiq_fastapi.init(broker, "my_package.application:app") ``` -------------------------------- ### Startup AioPikaBroker Source: https://taskiq-python.github.io/guide/getting-started.html When using distributed brokers like AioPika, it is essential to call the `startup` method before interacting with the broker. ```python await broker.startup() ``` -------------------------------- ### Register Custom Command with PyProject.toml Source: https://taskiq-python.github.io/extending-taskiq/cli.html Configure your custom command's entry point in `pyproject.toml` under the `[project.entry-points.taskiq_cli]` section. This is an alternative to using `setup.py` for package metadata. ```toml [project.entry-points.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` -------------------------------- ### Run linting Source: https://taskiq-python.github.io/contrib.html Execute pre-commit hooks manually to lint the codebase. ```bash pre-commit run -a ``` -------------------------------- ### Initialize aiohttp-deps Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiohttp.html Add a startup event to your AioHTTP application to initialize the dependency injection context. ```python from aiohttp import web import aiohttp_deps app = web.Application() # This startup event makes all the magic happen. # It parses current handlers and create dependency graphs for them. app.on_startup.append(aiohttp_deps.init) web.run_app(app) ``` -------------------------------- ### Display Help for Custom Command Source: https://taskiq-python.github.io/extending-taskiq/cli.html After registering a custom command, you can view its help message using the `taskiq --help` syntax. This shows the arguments defined in your command's `exec` method. ```bash $ taskiq demo --help ``` -------------------------------- ### Test a Taskiq Task Directly Source: https://taskiq-python.github.io/guide/testing-taskiq.html This example shows how to test a Taskiq task by importing it and calling it as a regular asynchronous function within a pytest test. ```python import pytest from your_project.tasks import parse_int @pytest.mark.anyio async def test_task(): assert await parse_int("11") == 11 ``` -------------------------------- ### Initialize Taskiq with AioHTTP Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiohttp.html Initialize Taskiq with your AioHTTP application's broker. This allows sharing dependencies. ```python import taskiq_aiohttp broker = MyBroker() # The second argument is a path to web.Application variable. # Also you can provide here a factory function that takes no # arguments and returns an application. This function can be async. taskiq_aiohttp.init(broker, "my_project.main:app") ``` -------------------------------- ### Implement a custom broker template Source: https://taskiq-python.github.io/extending-taskiq/broker.html Use this template to define a new broker by extending AsyncBroker and implementing the required kick and listen methods. ```python from collections.abc import AsyncGenerator from taskiq import AckableMessage, AsyncBroker, BrokerMessage class MyBroker(AsyncBroker): def __init__(self) -> None: # Please call this super method to set default values to # many different fields. super().__init__() async def startup(self) -> None: # Here you can do some startup magic. # Like opening a connection. return await super().startup() async def shutdown(self) -> None: # Here you can perform shutdown operations. # Like closing connections. return await super().shutdown() async def kick(self, message: BrokerMessage) -> None: # Send a message.message. pass async def listen(self) -> AsyncGenerator[bytes | AckableMessage, None]: while True: # Get new message. new_message: bytes = ... # type: ignore # Yield it! yield new_message ``` -------------------------------- ### Taskiq Task and Function for Testing Source: https://taskiq-python.github.io/guide/testing-taskiq.html Defines a Taskiq task 'parse_int' and a function 'parse_and_add_one' that uses this task. This setup is suitable for testing functions that orchestrate tasks. ```python from your_project.tkq import broker @broker.task async def parse_int(val: str) -> int: return int(val) async def parse_and_add_one(val: str) -> int: task = await parse_int.kiq(val) result = await task.wait_result() return result.return_value + 1 ``` -------------------------------- ### Class as Dependency with Annotated Source: https://taskiq-python.github.io/guide/state-and-deps.html Shows how to use a class as a dependency with Annotated type hints and other dependencies. ```python from typing import Annotated from taskiq import TaskiqDepends async def db_connection() -> str: return "let's pretend as this is a connection" class MyDAO: def __init__(self, db_conn: Annotated[str, TaskiqDepends(db_connection)]) -> None: self.db_conn = db_conn def get_users(self) -> str: return self.db_conn.upper() def my_task(dao: Annotated[MyDAO, TaskiqDepends()]) -> None: print(dao.get_users()) ``` -------------------------------- ### Initialize RedisScheduleSource Source: https://taskiq-python.github.io/available-components/schedule-sources.html Use RedisScheduleSource for dynamic schedule updates at runtime. Requires the 'taskiq-redis' package. ```python from taskiq_redis import RedisScheduleSource from taskiq import TaskiqScheduler redis_source = RedisScheduleSource("redis://localhost:6379/0") scheduler = TaskiqScheduler(broker, sources=[redis_source]) ``` -------------------------------- ### Broker Startup/Shutdown with on_event (Deprecated) Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-fastapi.html This method uses FastAPI's deprecated `on_event` decorator for managing broker startup and shutdown. It's recommended to use the `lifespan` approach instead. ```python from fastapi import FastAPI from your_project.taskiq import broker app = FastAPI() @app.on_event("startup") async def app_startup(): if not broker.is_worker_process: await broker.startup() @app.on_event("shutdown") async def app_shutdown(): if not broker.is_worker_process: await broker.shutdown() ``` -------------------------------- ### Initialize Taskiq with Aiogram Broker Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-aiogram.html Initialize the Taskiq broker with Aiogram integration. This involves calling `taskiq_aiogram.init` with the broker and paths to the dispatcher and bot instances. ```python import asyncio import taskiq_aiogram from aiogram import Bot from taskiq import TaskiqDepends from taskiq_redis import ListQueueBroker broker = ListQueueBroker("redis://localhost") # This line is going to initialize everything. taskiq_aiogram.init( broker, # This is path to the dispatcher. "bot:dp", # This is path to the bot instance. "bot:bot", # You can specify more bots here. ) ``` -------------------------------- ### Dependencies with Default Values Source: https://taskiq-python.github.io/guide/state-and-deps.html Shows how to define dependencies using default values without Annotated. ```python import random from taskiq import TaskiqDepends def common_dep() -> int: # For example it returns 8 return random.randint(1, 10) def dep1(cd: int = TaskiqDepends(common_dep)) -> int: # This function will return 9 return cd + 1 def dep2(cd: int = TaskiqDepends(common_dep)) -> int: # This function will return 10 return cd + 2 def my_task( d1: int = TaskiqDepends(dep1), d2: int = TaskiqDepends(dep2), ) -> int: # This function will return 19 return d1 + d2 ``` -------------------------------- ### Enable OpenTelemetry with Middleware Source: https://taskiq-python.github.io/available-components/middlewares.html Alternatively, enable OpenTelemetry tracing by adding OpenTelemetryMiddleware directly to the broker's middleware list. ```python from taskiq import ZeroMQBroker from taskiq.middlewares.opentelemetry_middleware import OpenTelemetryMiddleware broker = ZeroMQBroker().with_middlewares( OpenTelemetryMiddleware, ) ``` -------------------------------- ### Execute Broker Script Source: https://taskiq-python.github.io/guide/getting-started.html Running the broker script and observing output. ```bash $ python broker.py Task execution took: 0.0 seconds. Returned value: None ``` -------------------------------- ### Send Task with Custom Broker and Labels Source: https://taskiq-python.github.io/guide/architecture-overview.html Demonstrates sending a task using a different broker than the default and adding custom labels. Ensure the `InMemoryBroker` is imported. ```python import asyncio from taskiq.brokers.inmemory_broker import InMemoryBroker broker = InMemoryBroker() second_broker = InMemoryBroker() @broker.task async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") async def main(): # This task was initially assigned to broker, # but this time it is going to be sent using # the second broker with additional label `delay=1`. task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq() print(await task.get_result()) asyncio.run(main()) ``` -------------------------------- ### Complete Broker Implementation Source: https://taskiq-python.github.io/guide/getting-started.html Full code including Redis result backend integration. ```python # broker.py import asyncio from taskiq_aio_pika import AioPikaBroker from taskiq_redis import RedisAsyncResultBackend broker = AioPikaBroker( "amqp://guest:guest@localhost:5672", ).with_result_backend(RedisAsyncResultBackend("redis://localhost")) @broker.task async def add_one(value: int) -> int: return value + 1 async def main() -> None: await broker.startup() # Send the task to the broker. task = await add_one.kiq(1) # Wait for the result. result = await task.wait_result(timeout=2) print(f"Task execution took: {result.execution_time} seconds.") if not result.is_err: print(f"Returned value: {result.return_value}") else: print("Error found while executing task.") await broker.shutdown() if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------- ### Class as Dependency with Default Values Source: https://taskiq-python.github.io/guide/state-and-deps.html Demonstrates using a class as a dependency with default values, omitting explicit `TaskiqDepends` when the class is inferred. ```python from taskiq import TaskiqDepends async def db_connection() -> str: return "let's pretend as this is a connection" class MyDAO: def __init__(self, db_conn: str = TaskiqDepends(db_connection)) -> None: self.db_conn = db_conn def get_users(self) -> str: return self.db_conn.upper() def my_task(dao: MyDAO = TaskiqDepends()) -> None: print(dao.get_users()) ``` -------------------------------- ### Run Taskiq Worker with File System Discovery Source: https://taskiq-python.github.io/guide/architecture-overview.html Explains how to use the `-fsd` flag to enable automatic discovery of task modules within the current directory recursively. This simplifies worker startup when tasks are organized in modules named 'tasks'. ```bash taskiq worker test_project.broker:broker -fsd ``` -------------------------------- ### Run Taskiq Worker with Manual Task Module Specification Source: https://taskiq-python.github.io/guide/architecture-overview.html Demonstrates how to manually specify all task modules for the worker to import. This is useful for complex project structures. ```bash taskiq worker test_project.broker:broker test_project.submodule.tasks test_project.utils.tasks ``` -------------------------------- ### Configure InMemoryBroker with await_inplace for Testing Source: https://taskiq-python.github.io/guide/testing-taskiq.html Initialize the InMemoryBroker with 'await_inplace=True'. This setting automatically awaits all tasks as soon as they are called, simplifying testing of functions that don't explicitly await task results. ```python broker = InMemoryBroker(await_inplace=True) ``` -------------------------------- ### Configure TaskiqScheduler with LabelScheduleSource Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Setting up the scheduler to handle tasks defined with cron schedules in labels. ```python from taskiq_aio_pika import AioPikaBroker from taskiq import TaskiqScheduler from taskiq.schedule_sources import LabelScheduleSource broker = AioPikaBroker("amqp://guest:guest@localhost:5672/") scheduler = TaskiqScheduler( broker=broker, sources=[LabelScheduleSource(broker)], ) @broker.task(schedule=[{"cron": "*/5 * * * *", "args": [1]}]) async def heavy_task(value: int) -> int: return value + 1 ``` -------------------------------- ### Execute Custom Command with Arguments Source: https://taskiq-python.github.io/extending-taskiq/cli.html Run your custom command with its defined arguments. The output will reflect the parsed arguments and the logic within your command's `exec` method. ```bash $ taskiq demo --test aaa ``` -------------------------------- ### Implement a manual task loop Source: https://taskiq-python.github.io/guide/scheduling-tasks.html A manual approach to task execution using a while loop and asyncio sleep. ```python while True: await heavy_task.kiq(1) await asyncio.sleep(timedelta(minutes=5).total_seconds) ``` -------------------------------- ### Integrate Taskiq with FastStream Broker Source: https://taskiq-python.github.io/framework_integrations/faststream.html Configure FastStream to use Taskiq's scheduler by wrapping the FastStream broker with BrokerWrapper. This allows leveraging both libraries' features. ```python from faststream import FastStream from faststream.kafka import KafkaBroker from taskiq_faststream import BrokerWrapper broker = KafkaBroker("localhost:9092") app = FastStream(broker) taskiq_broker = BrokerWrapper(broker) ``` -------------------------------- ### Task Discovery with Patterns Source: https://taskiq-python.github.io/guide/cli.html Enables automatic discovery of task files. Use `--tasks-pattern` to specify glob patterns for task files (e.g., `**/tasks.py`), or `--fs-discover` to recursively search the file system. ```bash taskiq worker --tasks-pattern "**/my_tasks.py" mybroker:broker ``` ```bash taskiq worker -tp "**/tasks.py" -tp "**/other_tasks.py" mybroker:broker ``` ```bash taskiq worker --fs-discover -fsd mybroker:broker ``` -------------------------------- ### Enable Prometheus Middleware Source: https://taskiq-python.github.io/available-components/middlewares.html Add PrometheusMiddleware to the broker to expose Taskiq worker metrics. Configure the server address and port where metrics will be available. ```python from taskiq import ZeroMQBroker, PrometheusMiddleware broker = ZeroMQBroker().with_middlewares( PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000), ) ``` -------------------------------- ### Register Custom Command with Poetry Source: https://taskiq-python.github.io/extending-taskiq/cli.html Use Poetry to register your custom command by adding an entry point in `pyproject.toml` under the `[tool.poetry.plugins.taskiq_cli]` section. This method is specific to projects managed by Poetry. ```toml [tool.poetry.plugins.taskiq_cli] demo = "my_project.cmd:MyCommand" ``` -------------------------------- ### Implement TaskiqCMD for Custom Command Source: https://taskiq-python.github.io/extending-taskiq/cli.html Define a class that inherits from `taskiq.abc.cmd.TaskiqCMD` to create a new subcommand. Implement the `exec` method to handle command logic and argument parsing. The `args` parameter can be ignored as Taskiq handles argument shifting. ```python from argparse import ArgumentParser from collections.abc import Sequence from taskiq.abc.cmd import TaskiqCMD class MyCommand(TaskiqCMD): short_help = "Demo command" def exec(self, args: Sequence[str]) -> None: parser = ArgumentParser() parser.add_argument( "--test", dest="test", default="default", help="My test parameter.", ) parsed = parser.parse_args(args) print(parsed) ``` -------------------------------- ### Annotated Dependencies with Default Values Source: https://taskiq-python.github.io/guide/state-and-deps.html Demonstrates using Annotated for dependencies with default values in Python 3.10+. ```python import random from typing import Annotated from taskiq import TaskiqDepends def common_dep() -> int: # For example it returns 8 return random.randint(1, 10) def dep1(cd: Annotated[int, TaskiqDepends(common_dep)]) -> int: # This function will return 9 return cd + 1 def dep2(cd: Annotated[int, TaskiqDepends(common_dep)]) -> int: # This function will return 10 return cd + 2 def my_task( d1: Annotated[int, TaskiqDepends(dep1)], d2: Annotated[int, TaskiqDepends(dep2)], ) -> int: # This function will return 19 return d1 + d2 ``` -------------------------------- ### Initialize NATSKeyValueScheduleSource Source: https://taskiq-python.github.io/available-components/schedule-sources.html Use NATSKeyValueScheduleSource for dynamic schedule updates at runtime with NATS. Requires the 'taskiq-nats' package. Schedules are added via the 'add_schedule' method. ```python from taskiq_nats import NATSKeyValueScheduleSource from taskiq.scheduler import TaskiqScheduler broker = ... scheduler = TaskiqScheduler( broker=broker, sources=[NATSKeyValueScheduleSource(broker)], ) ``` -------------------------------- ### Define Task with Labels using Kicker Source: https://taskiq-python.github.io/guide/architecture-overview.html Illustrates adding labels to a task using the `kicker().with_labels()` method. This is equivalent to defining labels in the decorator. ```python @broker.task async def my_async_task() -> None: """My lovely task.""" await asyncio.sleep(1) print("Hello") async def main(): await my_async_task.kicker().with_labels( my_label=1, label2="something", ).kiq() ``` -------------------------------- ### Enable Simple Retry Middleware Source: https://taskiq-python.github.io/available-components/middlewares.html Add SimpleRetryMiddleware to the broker to automatically restart tasks on errors. Configure default retry counts when initializing the middleware. ```python from taskiq import ZeroMQBroker, SimpleRetryMiddleware broker = ZeroMQBroker().with_middlewares( SimpleRetryMiddleware(default_retry_count=3), ) ``` -------------------------------- ### Verify Execution Results Source: https://taskiq-python.github.io/guide/getting-started.html Output after running the updated broker script. ```bash $ python broker.py Task execution took: 1.0013580322265625e-05 seconds. Returned value: 2 ``` -------------------------------- ### Run tests Source: https://taskiq-python.github.io/contrib.html Execute tests using pytest or tox for environment-specific testing. ```bash pytest ``` ```bash pytest -n 2 ``` ```bash tox ``` -------------------------------- ### Task Execution Output Source: https://taskiq-python.github.io/guide/getting-started.html This is the expected output when running the `broker.py` script with the InMemoryBroker. ```bash ❯ python broker.py Task execution took: 7.3909759521484375e-06 seconds. Returned value: 2 ``` -------------------------------- ### Dependencies with Disabled Cache Source: https://taskiq-python.github.io/guide/state-and-deps.html Demonstrates disabling cache for a dependency using `use_cache=False` without Annotated. ```python import random from taskiq import TaskiqDepends def common_dep() -> int: return random.randint(1, 10) def dep1(cd: int = TaskiqDepends(common_dep)) -> int: return cd + 1 def dep2(cd: int = TaskiqDepends(common_dep, use_cache=False)) -> int: return cd + 2 def my_task( d1: int = TaskiqDepends(dep1), d2: int = TaskiqDepends(dep2), ) -> int: return d1 + d2 ``` -------------------------------- ### Synchronous Generator Dependency (Annotated) Source: https://taskiq-python.github.io/guide/state-and-deps.html Use synchronous generator dependencies for startup and teardown logic before and after task execution. Requires Python 3.10+ for Annotated syntax. ```python from collections.abc import Generator from typing import Annotated from taskiq import TaskiqDepends def dependency() -> Generator[str, None, None]: print("Startup") yield "value" print("Shutdown") async def my_task(dep: Annotated[str, TaskiqDepends(dependency)]) -> None: print(dep.upper()) ``` -------------------------------- ### Configure Redis Result Backend Source: https://taskiq-python.github.io/guide/getting-started.html Update the broker configuration to include a Redis result backend. ```python from taskiq_redis import RedisAsyncResultBackend broker = AioPikaBroker( "amqp://guest:guest@localhost:5672", ).with_result_backend(RedisAsyncResultBackend("redis://localhost")) ``` -------------------------------- ### Schedule Tasks with Time, Cron, and Interval Source: https://taskiq-python.github.io/guide/scheduling-tasks.html Methods for scheduling tasks using specific timestamps, cron expressions, or fixed intervals. ```python await redis_source.startup() await my_task.schedule_by_time( redis_source, # It's better to use UTC time, or add tzinfo to datetime. datetime.datetime.now(datetime.UTC) + datetime.timedelta(minutes=1, seconds=5), # You can pass args and kwargs here as usual 11, arg2="arg2", ) ``` ```python await my_task.schedule_by_cron( redis_source, "*/5 * * * *", 11, arg2="arg2", ) ``` ```python await my_task.schedule_by_interval( redis_source, datetime.timedelta(seconds=5), 11, arg2="arg2", ) ``` -------------------------------- ### Enable OpenTelemetry with TaskiqInstrumentor Source: https://taskiq-python.github.io/available-components/middlewares.html Use TaskiqInstrumentor for auto-instrumentation of Taskiq operations with OpenTelemetry. This is the preferred method for enabling tracing. ```python from taskiq import ZeroMQBroker from taskiq.instrumentation import TaskiqInstrumentor TaskiqInstrumentor().instrument() broker = ZeroMQBroker() ``` -------------------------------- ### Broker Lifespan Management with FastAPI Source: https://taskiq-python.github.io/framework_integrations/taskiq-with-fastapi.html Use the `asynccontextmanager` for the `lifespan` parameter in FastAPI to manage broker startup and shutdown, ensuring it only runs when not in a worker process. ```python from contextlib import asynccontextmanager from fastapi import FastAPI from your_project.taskiq import broker @asynccontextmanager async def lifespan(app: FastAPI): # Startup if not broker.is_worker_process: await broker.startup() yield # Shutdown if not broker.is_worker_process: await broker.shutdown() app = FastAPI(lifespan=lifespan) ```