### Create Worker Startup Handler with Taskiq-FastAPI Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt This example demonstrates how to create a custom worker startup handler using `startup_event_generator`. This is useful for advanced scenarios where manual control over broker event wiring is needed. ```python from taskiq import ZeroMQBroker, TaskiqEvents from taskiq_fastapi.initializator import startup_event_generator, shutdown_event_generator broker = ZeroMQBroker() # Register handlers manually for full control. broker.add_event_handler( TaskiqEvents.WORKER_STARTUP, startup_event_generator(broker, "myapp:app"), ) broker.add_event_handler( TaskiqEvents.WORKER_SHUTDOWN, shutdown_event_generator(broker), ) # Equivalent short form: # taskiq_fastapi.init(broker, "myapp:app") ``` -------------------------------- ### Initialize Taskiq with FastAPI App Source: https://github.com/taskiq-python/taskiq-fastapi/blob/master/README.md Initialize Taskiq with your FastAPI application. Use a string to resolve the app to prevent circular imports. This setup is crucial for enabling dependency sharing. ```python from fastapi import FastAPI, Request from pydantic import BaseModel from redis.asyncio import ConnectionPool, Redis from fastapi import Depends as FastAPIDepends from taskiq import TaskiqDepends import taskiq_fastapi from taskiq import ZeroMQBroker broker = ZeroMQBroker() app = FastAPI() @app.on_event("startup") async def app_startup(): ##################### # IMPORTANT NOTE # ##################### # If you won't check that this is not # a worker process, you'll # create an infinite recursion. Because in worker processes # fastapi startup will be called. if not broker.is_worker_process: print("Starting broker") await broker.startup() print("Creating redis pool") app.state.redis_pool = ConnectionPool.from_url("redis://localhost") @app.on_event("shutdown") async def app_shutdown(): ##################### # IMPORTANT NOTE # ##################### # If you won't check that this is not # a worker process, you'll # create an infinite recursion. Because in worker processes # fastapi startup will be called. if not broker.is_worker_process: print("Shutting down broker") await broker.shutdown() print("Stopping redis pool") await app.state.redis_pool.disconnect() # Here we call our magic function. taskiq_fastapi.init(broker, "test_script:app") # We use TaskiqDepends here, because if we use FastAPIDepends fastapi # initialization will fail. def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: return request.app.state.redis_pool @broker.task async def my_redis_task( key: str, val: str, # Here we depend using TaskiqDepends. # Please use TaskiqDepends for all tasks to be resolved correctly. # Or dependencies won't be injected. pool: ConnectionPool = TaskiqDepends(get_redis_pool), ): async with Redis(connection_pool=pool) as redis: await redis.set(key, val) print("Value set.") class MyVal(BaseModel): key: str val: str @app.post("/val") async def setval_endpoint(val: MyVal) -> None: await my_redis_task.kiq( key=val.key, val=val.val, ) print("Task sent") @app.get("/val") async def getval_endpoint( key: str, pool: ConnectionPool = FastAPIDepends(get_redis_pool), ) -> str: async with Redis(connection_pool=pool, decode_responses=True) as redis: return await redis.get(key) ``` -------------------------------- ### init — Register FastAPI lifecycle hooks with a Taskiq broker Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt Registers WORKER_STARTUP and WORKER_SHUTDOWN event handlers on the given broker. This ensures that the FastAPI application is automatically started up and torn down when Taskiq workers begin or end their processes. The `app_or_path` argument can be a dotted import path string, a FastAPI instance, or a callable that returns a FastAPI instance. ```APIDOC ## init ### Description Registers `WORKER_STARTUP` and `WORKER_SHUTDOWN` event handlers on the given broker so that, in worker processes, the FastAPI application is started up and torn down automatically. The `app_or_path` argument accepts a dotted import path string (recommended to avoid circular imports), a `FastAPI` instance directly, or a zero-argument callable that returns a `FastAPI` instance. ### Parameters - **broker**: The Taskiq broker instance. - **app_or_path**: A string representing the dotted import path to the FastAPI application, a `FastAPI` instance, or a callable that returns a `FastAPI` instance. ### Example Usage ```python from fastapi import FastAPI from taskiq import ZeroMQBroker import taskiq_fastapi broker = ZeroMQBroker() app = FastAPI() # Initialize taskiq-fastapi with the broker and the FastAPI app taskiq_fastapi.init(broker, "myapp:app") ``` ``` -------------------------------- ### Create Worker Shutdown Handler with Taskiq-FastAPI Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt This snippet shows how to create a worker shutdown handler using `shutdown_event_generator`. This handler ensures that FastAPI's shutdown sequence is triggered, for example, to close connection pools. ```python from taskiq import ZeroMQBroker, TaskiqEvents from taskiq_fastapi.initializator import shutdown_event_generator broker = ZeroMQBroker() # Attach only the shutdown handler (e.g., when startup is handled elsewhere). broker.add_event_handler( TaskiqEvents.WORKER_SHUTDOWN, shutdown_event_generator(broker), ) ``` -------------------------------- ### Initialize Taskiq with FastAPI and Broker Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt This snippet shows how to initialize Taskiq with a FastAPI app and a broker instance. It ensures that tasks can access FastAPI's dependency injection system. ```python import pytest from fastapi import FastAPI, Request from redis.asyncio import ConnectionPool, Redis from taskiq import InMemoryBroker, TaskiqDepends import taskiq_fastapi broker = InMemoryBroker() app = FastAPI() @app.on_event("startup") async def startup(): app.state.redis_pool = ConnectionPool.from_url("redis://localhost") taskiq_fastapi.init(broker, app) # pass the instance directly in tests def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: return request.app.state.redis_pool @broker.task async def ping_redis(pool: ConnectionPool = TaskiqDepends(get_redis_pool)) -> str: async with Redis(connection_pool=pool, decode_responses=True) as redis: return await redis.ping() @pytest.fixture(autouse=True) async def setup_app(anyio_backend="asyncio"): # Manually trigger FastAPI startup so app.state is populated. async with app.router.lifespan_context(app) as asgi_state: # Inject the live app state into the broker's dependency resolver. taskiq_fastapi.populate_dependency_context(broker, app, asgi_state) yield # Cleanup handled by the lifespan context manager exit. @pytest.mark.anyio async def test_ping_redis(): result = await ping_redis.kiq() # InMemoryBroker executes tasks immediately and returns the result. assert await result.get_result() == "PONG" ``` -------------------------------- ### Initialize Taskiq Broker with FastAPI Lifecycle Hooks Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt Register FastAPI's startup and shutdown events with a Taskiq broker. Use a dotted import path for the FastAPI app to prevent circular imports. This ensures FastAPI resources are available in worker processes. ```python from fastapi import FastAPI, Request from redis.asyncio import ConnectionPool, Redis from taskiq import ZeroMQBroker, TaskiqDepends from fastapi import Depends as FastAPIDepends from pydantic import BaseModel import taskiq_fastapi broker = ZeroMQBroker() app = FastAPI() @app.on_event("startup") async def app_startup(): # Guard: skip broker startup inside the worker itself to avoid # infinite recursion (worker already started the broker). if not broker.is_worker_process: await broker.startup() app.state.redis_pool = ConnectionPool.from_url("redis://localhost") @app.on_event("shutdown") async def app_shutdown(): if not broker.is_worker_process: await broker.shutdown() await app.state.redis_pool.disconnect() # Wire FastAPI into the broker using a dotted import path. # The string form avoids circular imports because the module is not # imported until the worker process actually starts up. taskiq_fastapi.init(broker, "myapp:app") # Shared dependency — works in both FastAPI routes and taskiq tasks. def get_redis_pool(request: Request = TaskiqDepends()) -> ConnectionPool: return request.app.state.redis_pool # Background task that reuses the FastAPI dependency. @broker.task async def store_value(key: str, val: str, pool: ConnectionPool = TaskiqDepends(get_redis_pool)): async with Redis(connection_pool=pool) as redis: await redis.set(key, val) class Payload(BaseModel): key: str val: str # FastAPI route that dispatches the background task. @app.post("/values", status_code=202) async def create_value(body: Payload) -> dict: task = await store_value.kiq(key=body.key, val=body.val) return {"task_id": task.task_id} # FastAPI route that reads back the value directly (uses FastAPIDepends). @app.get("/values/{key}") async def read_value(key: str, pool: ConnectionPool = FastAPIDepends(get_redis_pool)) -> dict: async with Redis(connection_pool=pool, decode_responses=True) as redis: value = await redis.get(key) if value is None: from fastapi import HTTPException raise HTTPException(status_code=404, detail="Key not found") return {"key": key, "value": value} ``` -------------------------------- ### populate_dependency_context — Manually inject FastAPI state into broker dependencies Source: https://context7.com/taskiq-python/taskiq-fastapi/llms.txt Directly populates the broker's `dependency_overrides` with mock `Request` and `HTTPConnection` objects. This is useful for tests using `InMemoryBroker` where the normal worker startup flow does not occur. ```APIDOC ## populate_dependency_context ### Description Directly populates the broker's `dependency_overrides` with mock `Request` and `HTTPConnection` objects backed by the provided `FastAPI` app instance and optional ASGI state mapping. This is essential when using `InMemoryBroker` (e.g., in tests) because the broker never goes through the normal worker startup flow that `init` relies on. ### Parameters - **broker**: The Taskiq broker instance. - **app**: The FastAPI application instance. - **asgi_state**: An optional dictionary to map ASGI state. ### Example Usage ```python from fastapi import FastAPI from taskiq import InMemoryBroker import taskiq_fastapi broker = InMemoryBroker() app = FastAPI() # Manually populate the dependency context for testing taskiq_fastapi.populate_dependency_context(broker, app) ``` ``` -------------------------------- ### Manually Update Dependency Context with InMemoryBroker Source: https://github.com/taskiq-python/taskiq-fastapi/blob/master/README.md Manually update the dependency context when using `InMemoryBroker`, which is also useful for setting up tests. This ensures dependencies are correctly populated. ```python import taskiq_fastapi from taskiq import InMemoryBroker broker = InMemoryBroker() app = FastAPI() taskiq_fastapi.init(broker, "test_script:app") taskiq_fastapi.populate_dependency_context(broker, app) ``` === COMPLETE CONTENT === This response contains all available snippets from this library. No additional content exists. Do not make further requests.