### Minimal Kafka Concurrent Processing Setup Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md This snippet demonstrates the basic setup for concurrent message processing. It initializes the middleware, defines a lifespan for starting and stopping concurrent processing, and sets up a subscriber with manual acknowledgment. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) # Create broker broker = KafkaBroker(url="kafka://localhost:9092") # Register middleware broker.add_middleware(KafkaConcurrentProcessingMiddleware) # Define lifespan @asynccontextmanager async def lifespan(app_context: ContextRepo): # Initialize concurrent processing await initialize_concurrent_processing( context=broker.context, concurrency_limit=10, ) try: yield finally: # Gracefully shut down await stop_concurrent_processing(broker.context) # Create app app = AsgiFastStream(broker, lifespan=lifespan) # Define subscriber @broker.subscriber( "my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL, # ← Required for concurrent processing ) async def handle_message(msg: str): print(f"Processing: {msg}") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` -------------------------------- ### Rebalance-Safe Setup Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Illustrates how to set up a FastStream application to be rebalance-safe, ensuring that message processing is handled correctly during Kafka consumer group rebalances. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.kafka.listener import ConsumerRebalanceListener class MyRebalanceListener(ConsumerRebalanceListener): async def on_partitions_revoked(self, revocation): print(f"Revoked partitions: {revocation}. Committing offsets...") # Ensure all in-flight messages are committed before revocation await self.app.commit() app = KafkaFastStream( listener=MyRebalanceListener(), # ... other configurations ) @app.subscriber("rebalance-safe-topic") async def handle(msg): print(f"Processing message: {msg.raw_message}") await msg.ack() ``` -------------------------------- ### Full Kafka Concurrent Processing with Configuration Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md This example shows a more comprehensive setup with configurable parameters for concurrency, batch size, and timeouts. It also includes an example of an HTTP endpoint for readiness and liveness checks, and demonstrates error handling within the subscriber. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, is_kafka_handler_healthy, ) import httpx # Configuration KAFKA_BROKERS = "kafka://localhost:9092" CONCURRENCY_LIMIT = 20 COMMIT_BATCH_SIZE = 100 COMMIT_BATCH_TIMEOUT_SEC = 5.0 # Create broker broker = KafkaBroker(url=KAFKA_BROKERS) broker.add_middleware(KafkaConcurrentProcessingMiddleware) # Lifespan @asynccontextmanager async def lifespan(app_context: ContextRepo): await initialize_concurrent_processing( context=broker.context, concurrency_limit=CONCURRENCY_LIMIT, commit_batch_size=COMMIT_BATCH_SIZE, commit_batch_timeout_sec=COMMIT_BATCH_TIMEOUT_SEC, shutdown_timeout_sec=30.0, ) try: yield finally: await stop_concurrent_processing(broker.context) # FastStream app app = AsgiFastStream(broker, lifespan=lifespan) # Subscriber with error handling @broker.subscriber( "orders", group_id="order-processor", ack_policy=AckPolicy.MANUAL, ) async def process_order(order_id: int, total: float): """Process an order concurrently.""" try: # Simulate I/O operation async with httpx.AsyncClient() as client: response = await client.post( "https://payment-service/charge", json={"order_id": order_id, "amount": total}, timeout=10.0, ) response.raise_for_status() print(f"Order {order_id} processed successfully") except Exception as e: print(f"Order {order_id} failed: {e}") # Do not re-raise; the middleware will skip offset commit # Message is redelivered on restart (at-least-once) raise # Propagate exception; logged by handler, offset not committed # Health check endpoint @app.get("/health/ready") async def readiness(): if is_kafka_handler_healthy(broker.context): return {"status": "ready"} return {"status": "not_ready"}, 503 @app.get("/health/live") async def liveness(): if is_kafka_handler_healthy(broker.context): return {"status": "live"} return {"status": "dead"}, 503 if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` -------------------------------- ### Kafka Partition Rebalance Listener Setup Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Example of initializing concurrent processing and creating a rebalance listener for a FastStream Kafka subscriber. Ensure manual acknowledgment is enabled and the listener is passed during subscriber creation. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( initialize_concurrent_processing, stop_concurrent_processing, ) broker = KafkaBroker(url="kafka://localhost:9092") @asynccontextmanager async def lifespan(context: ContextRepo): handler = await initialize_concurrent_processing(context, ...) listener = handler.create_rebalance_listener() @broker.subscriber( "my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL, listener=listener, ) async def handle(msg: str): ... try: yield finally: await stop_concurrent_processing(context) ``` -------------------------------- ### Integration Testing Setup with Testcontainers Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/architecture.md Outlines the steps for integration testing with real Kafka containers using testcontainers. It mentions pre-creating topics, starting the broker, and asserting message processing and commits. ```python # See tests/test_integration.py # Use AIOKafkaAdminClient to pre-create topics # Start broker with await broker.start() # Sleep 1.5 sec for rebalance # Publish messages # Assert they're processed and committed ``` -------------------------------- ### Minimal Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt A minimal FastStream application demonstrating basic Kafka message consumption and processing with concurrent handlers. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message app = KafkaFastStream() @app.subscriber("test-topic") async def handle(msg: Message): print(f"Received message: {msg.raw_message}") await msg.ack() ``` -------------------------------- ### Install Dependencies Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/CLAUDE.md Installs and synchronizes all project dependencies. Run this after pulling changes or modifying pyproject.toml. ```bash just install ``` -------------------------------- ### Install faststream-concurrent-aiokafka Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/README.md Install the library using pip. This command installs the package and its dependencies. ```bash pip install faststream-concurrent-aiokafka ``` -------------------------------- ### Full Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt A comprehensive FastStream application showcasing advanced features like concurrent processing, batch committing, and error handling for Kafka consumers. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.kafka.middleware import KafkaConcurrentProcessingMiddleware from faststream.types import KafkaAckableMessage app = KafkaFastStream( middleware=( KafkaConcurrentProcessingMiddleware( concurrency_limit=20, commit_batch_size=10, commit_batch_timeout_sec=5.0, shutdown_timeout_sec=30.0, ), ) ) @app.subscriber("data-topic") async def process_data(msg: KafkaAckableMessage): print(f"Processing: {msg.raw_message}") # Simulate some work await asyncio.sleep(0.1) await msg.ack() ``` -------------------------------- ### Integration Testing Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Demonstrates integration testing for a FastStream Kafka application, including setting up Kafka and testing end-to-end message flow. ```python import asyncio import pytest from faststream.kafka.test.test_kafka import TestKafkaApp # Assume 'app' is your KafkaFastStream instance defined elsewhere # from your_app import app # Mock app for demonstration from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message app = KafkaFastStream() @app.subscriber("integration-topic") async def handle(msg: Message): await msg.ack() @pytest.fixture(scope="session") def event_loop(): return asyncio.get_event_loop() @pytest.mark.asyncio async def test_integration_flow(): async with TestKafkaApp(app, log_level="DEBUG") as test_app: await test_app.start() # Publish a message await test_app.publish("Test message for integration", topic="integration-topic") # Give some time for processing await asyncio.sleep(2) # Assertions about message processing or side effects would go here # For example, checking if a database was updated or another service called. # This example focuses on the setup. await test_app.stop() ``` -------------------------------- ### Unit Testing Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Provides an example of how to unit test FastStream Kafka handlers using the test client, allowing for isolated testing of message processing logic. ```python import asyncio import pytest from faststream.kafka.test.test_kafka import TestKafkaApp # Assume 'app' is your KafkaFastStream instance defined elsewhere # from your_app import app # Mock app for demonstration from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message app = KafkaFastStream() @app.subscriber("test-topic") async def handle(msg: Message): await msg.ack() @pytest.mark.asyncio async def test_handle_message(): async with TestKafkaApp(app) as test_app: calls = await test_app.publish( "Hello, FastStream!", topic="test-topic" ) assert calls["test-topic"].call_count == 1 message = calls["test-topic"].call_args.args[0] assert message.raw_message.body == b'Hello, FastStream!' ``` -------------------------------- ### start Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Starts the handler and its batch committer's background task. This operation is idempotent. ```APIDOC ## start() ### Description Start the handler and its batch committer's background task. Idempotent — calling multiple times has no effect on an already-running handler. ### Method `async def start(self) -> None` ### Effects: - Sets `_is_running = True` - Calls `committer.spawn()` to start the committer's streaming loop - Logs info messages ``` -------------------------------- ### Basic Kafka Broker Setup with Concurrent Processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Sets up a Kafka broker with the concurrent processing middleware. Includes subscribers for both manual and automatic ack policies. ```python from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import \ KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, broker = KafkaBroker(url="kafka://localhost:9092") broker.add_middleware(KafkaConcurrentProcessingMiddleware) @broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL) async def handle_message(msg: str): # This runs concurrently as an asyncio task, limited by concurrency_limit print(f"Processing: {msg}") # No manual commit call needed — the middleware handles it @broker.subscriber("other-topic", group_id="other-group") async def handle_other(msg: str): # Non-MANUAL ack policy; processed sequentially # FastStream commits offset automatically print(f"Processing other: {msg}") ``` -------------------------------- ### KafkaConcurrentHandler Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Demonstrates the basic usage of KafkaConcurrentHandler for processing messages concurrently. This handler is typically used within the FastStream application setup. ```python from faststream.kafka.handler import KafkaConcurrentHandler from faststream.types import Message async def process_message(message: Message): # Your message processing logic here print(f"Processing message: {message.raw_message}") await message.ack() handler = KafkaConcurrentHandler(process_message) ``` -------------------------------- ### DI Integration Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Demonstrates integrating Dependency Injection (DI) with FastStream handlers to manage dependencies like database connections or external services. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message from dependency_injector import containers, providers class Services(containers.DeclarativeContainer): db_connection = providers.Singleton(lambda: "mock_db_connection") services = Services() app = KafkaFastStream( # ... other configurations ) @app.subscriber("di-topic") async def handle_with_di( msg: Message, db: str = services.db_connection(), ): print(f"Processing with DB: {db}. Message: {msg.raw_message}") await msg.ack() ``` -------------------------------- ### Debugging Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Provides guidance on debugging FastStream Kafka applications, including enabling debug logging and using print statements effectively. ```python import logging from faststream.kafka.faststream import KafkaFastStream # Enable debug logging for FastStream logging.basicConfig(level=logging.DEBUG) app = KafkaFastStream( # ... other configurations ) @app.subscriber("debug-topic") async def handle_debug(msg): print(f"Received message: {msg.raw_message}") # Use print for immediate feedback # Add more print statements or logging within the handler for detailed debugging await msg.ack() # To run with debug logging: # uvicorn your_app:app --reload --log-level debug ``` -------------------------------- ### At-Least-Once Guarantees Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/architecture.md Demonstrates how at-least-once delivery is achieved during shutdown and restart by reprocessing cancelled tasks' offsets. ```text Consumer processes offsets [0, 1, 2, 3, 4] ├─ Offset 0, 1 complete and committed ├─ Offset 2, 3 are in-flight tasks │ Stop is called ├─ Cancel tasks for offsets 2, 3 ├─ Committer flushes commit(partition: 2) (next offset to fetch is 2) │ Restart ├─ Consumer.poll() returns offsets [2, 3, 4, ...] ├─ Offsets 2, 3 are reprocessed (at-least-once) ``` -------------------------------- ### Sequential Router Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Demonstrates a basic Kafka router that processes messages sequentially. This is useful for critical topics where order and guaranteed processing are paramount. ```python from faststream.kafka import KafkaRouter sequential_router = KafkaRouter() @sequential_router.subscriber( "critical-topic", group_id="critical-processor", ) async def handle_critical(msg: str): # Processed sequentially by FastStream (auto-ack) await process_critical(msg) # Include both routers broker.include_router(concurrent_router) broker.include_router(sequential_router) ``` -------------------------------- ### Monitoring and Health Checks Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Demonstrates how to integrate monitoring and health check endpoints into a FastStream application for observing its operational status. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message import prometheus_client app = KafkaFastStream() # Example metrics pub_counter = prometheus_client.Counter('faststream_messages_processed_total', 'Total number of messages processed') @app.subscriber("metrics-topic") async def handle_metrics(msg: Message): pub_counter.inc() print(f"Processed message for metrics: {msg.raw_message}") await msg.ack() # Health check endpoint (example using a simple HTTP server) # In a real app, you'd integrate with a proper monitoring system async def health_check_handler(): if await app.is_kafka_handler_healthy(): return "OK", 200 else: return "Error", 500 ``` -------------------------------- ### Multiple Concurrency Limits Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Shows how to configure different concurrency limits for various topics or handlers within the same FastStream application. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.kafka.middleware import KafkaConcurrentProcessingMiddleware app = KafkaFastStream( middleware=( KafkaConcurrentProcessingMiddleware( concurrency_limit=10, # Default concurrency limit ), ) ) @app.subscriber("high-priority-topic", concurrency=5) async def handle_high_priority(msg): print("Handling high priority message") await msg.ack() @app.subscriber("low-priority-topic", concurrency=2) async def handle_low_priority(msg): print("Handling low priority message") await msg.ack() ``` -------------------------------- ### Subscriber Handler Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/architecture.md Example of a subscriber function decorated to process messages from a Kafka topic with manual acknowledgment. ```python from faststream.kafka import KafkaBroker from faststream.kafka.schemas import AckPolicy broker = KafkaBroker() @broker.subscriber("my-topic", ack_policy=AckPolicy.MANUAL) async def handle(msg: str): # This is a subscriber handler print(f"Processing: {msg}") ``` -------------------------------- ### Initialize Concurrent Processing with Broker Context Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Example demonstrating how to initialize concurrent processing using the broker's context in an ASGI FastStream application. It highlights the importance of using `broker.context` for handler storage. ```python from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.utils.context import ContextRepo from faststream.concurrent_aiokafka import initialize_concurrent_processing, stop_concurrent_processing from contextlib import asynccontextmanager broker = KafkaBroker(url="kafka://localhost:9092") @asynccontextmanager async def lifespan(app_context: ContextRepo): # app_context is app-level; broker.context is broker-level handler = await initialize_concurrent_processing( context=broker.context, # ← Use broker.context concurrency_limit=20, ) try: yield finally: await stop_concurrent_processing(broker.context) app = AsgiFastStream(broker, lifespan=lifespan) ``` -------------------------------- ### ConsumerRebalanceListener Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Shows how to implement a custom ConsumerRebalanceListener to handle partition rebalances in Kafka. This allows for custom logic during partition assignment and revocation. ```python from faststream.kafka.listener import ConsumerRebalanceListener from aiokafka import TopicPartition class MyRebalanceListener(ConsumerRebalanceListener): async def on_partitions_assigned(self, assignment): print(f"Partitions assigned: {assignment}") # Custom logic for assigned partitions async def on_partitions_revoked(self, revocation): print(f"Partitions revoked: {revocation}") # Custom logic for revoked partitions # Usage within FastStream application: # app = KafkaFastStream(..., listener=MyRebalanceListener()) ``` -------------------------------- ### Scoped Processing Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Demonstrates how to achieve scoped processing within FastStream, allowing for context-specific message handling and resource management. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.types import Message from contextvars import ContextVar request_id_var: ContextVar[str] = ContextVar('request_id') async def get_request_id(message: Message) -> str: # In a real app, this might come from headers or be generated return f"req-{hash(message.raw_message)}" app = KafkaFastStream( # ... other configurations ) @app.subscriber("scoped-topic", pre_execute=[get_request_id]) async def handle_scoped(msg: Message, request_id: str = Depends(request_id_var)): request_id_var.set(request_id) print(f"Processing message with ID {request_id}: {msg.raw_message}") await msg.ack() ``` -------------------------------- ### Concurrent Processing with Dependency Injection Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Demonstrates integrating `KafkaConcurrentProcessingMiddleware` with `modern-di-faststream` for dependency injection. It's critical to register the middleware *before* the DI setup. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from modern_di_faststream import setup_di from dependency_injector import containers, providers from faststream import AsgiFastStream from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) # Dummy OrderRepository for example class OrderRepository: def __init__(self, db): self.db = db # DI container class Container(containers.DeclarativeContainer): database = providers.Singleton(lambda: "dummy_db") repository = providers.Factory(OrderRepository, db=database) # Broker broker = KafkaBroker(url="kafka://localhost:9092") # Register KCM FIRST (outer position) broker.add_middleware(KafkaConcurrentProcessingMiddleware) # Lifespan @asynccontextmanager async def lifespan(app_context: ContextRepo): container = Container() # Register DI AFTER KCM setup_di(app, container=container) # Assuming 'app' is accessible here or passed # Initialize concurrent processing await initialize_concurrent_processing( context=broker.context, concurrency_limit=10, ) try: yield finally: await stop_concurrent_processing(broker.context) # App # Note: 'app' needs to be defined before setup_di is called within lifespan # A common pattern is to define app first, then pass it to setup_di # For this example, we'll assume 'app' is defined globally or passed correctly. # If not, the setup_di call might need adjustment based on actual app structure. # For demonstration, let's assume app is defined like this: app = AsgiFastStream(broker, lifespan=lifespan) # The setup_di call inside lifespan would then correctly reference this 'app' instance. ``` -------------------------------- ### Start Concurrent Handler Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Starts the handler and its batch committer's background task. This method is idempotent. ```python async def start(self) -> None ``` -------------------------------- ### Initialize Concurrent Processing Configuration Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/README.md Example of configuring concurrent message processing with parameters like concurrency limit, commit batch size, and timeouts. ```python await initialize_concurrent_processing( context=broker.context, # Required: FastStream ContextRepo concurrency_limit=10, # Max concurrent tasks (1–100+) commit_batch_size=10, # Messages per commit batch commit_batch_timeout_sec=10.0, # Seconds before auto-flush shutdown_timeout_sec=20.0, # Max seconds for shutdown ) ``` -------------------------------- ### Run All Tests Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/CLAUDE.md Executes all project tests within Docker, including starting Redpanda, running pytest, and tearing down containers. ```bash just test ``` -------------------------------- ### initialize_concurrent_processing() Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/INDEX.md Creates and starts the concurrent message handler. This is the primary function to initiate the processing pipeline. ```APIDOC ## initialize_concurrent_processing() ### Description Creates and starts the handler for concurrent message processing. ### Method async function ### Parameters None explicitly documented. ### Request Example ```python # Example usage within a FastStream application from faststream_concurrent_aiokafka import initialize_concurrent_processing # Assuming 'broker' is your FastStream broker instance # and 'MyHandler' is your handler class async def start_processing(): await initialize_concurrent_processing(broker=broker, handler_class=MyHandler) # This would typically be called during application startup ``` ### Response None explicitly documented, but it initiates background processing. ``` -------------------------------- ### Concurrent Subscriber with Dependency Injection Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Example of a subscriber that injects dependencies and runs concurrently. Ensure DI is registered after KCM for proper scope management. ```python from faststream.kafka import KafkaBroker from faststream.types import AckPolicy broker = KafkaBroker() # Assume OrderRepository is defined elsewhere and registered with DI # class OrderRepository: # async def save_order(self, msg: str): # pass @broker.subscriber("orders", ack_policy=AckPolicy.MANUAL) async def handle_order(msg: str, repo: OrderRepository): # repo is injected by DI container # Runs concurrently with proper DI scope await repo.save_order(msg) ``` -------------------------------- ### initialize_concurrent_processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/README.md Initializes and starts the concurrent processing handler, storing it in the FastStream context. This function sets up the processing engine with specified concurrency and commit configurations. ```APIDOC ## initialize_concurrent_processing(context, ...) ### Description Create and start the concurrent processing handler; store it in FastStream's context. ### Parameters #### Path Parameters None #### Query Parameters None #### Request Body None ### Parameters | Parameter | Default | Description | |---|---|---| | `context` | required | FastStream `ContextRepo` instance | | `concurrency_limit` | `10` | Max concurrent asyncio tasks (minimum: 1) | | `commit_batch_size` | `10` | Max messages per commit batch | | `commit_batch_timeout_sec` | `10.0` | Max seconds before flushing a batch | | `shutdown_timeout_sec` | `20.0` | Max seconds the batch committer waits for its background task to drain before forcing cancellation | | `max_uncommitted_tasks` | `10000` | Max tasks accepted but not yet committed before the consume path blocks (backpressure). `None` disables the bound. | ### Request Example None ### Response #### Success Response (200) Returns the `KafkaConcurrentHandler` instance. #### Response Example None > **Tuning `max_uncommitted_tasks`:** each uncommitted entry holds only commit metadata — a task reference, its `TopicPartition`, offset, and consumer reference — not the message payload, so the default of `10000` is on the order of a few MB. Lower it to tighten the memory bound during a commit or broker outage, at the cost of stalling consumption sooner. Keep it `>= commit_batch_size` so size-based batching can still trigger (below that, commits fall back to the timeout/flush path); set it to `None` to disable the bound and restore unbounded buffering. ``` -------------------------------- ### Initialize and Manage Concurrent Processing with ContextRepo Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/types.md Example of using `ContextRepo` within a lifespan function to initialize and manage concurrent processing handlers. It highlights the importance of using `broker.context` for handler storage and retrieval, especially when dealing with app-level contexts. ```python from faststream import ContextRepo async def lifespan(context: ContextRepo): # context here is app-level; use broker.context for handler storage handler = await initialize_concurrent_processing( context=broker.context, # Not `context` concurrency_limit=20, ) try: yield finally: await stop_concurrent_processing(broker.context) ``` -------------------------------- ### Configure Concurrent Message Processing in FastStream Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/README.md This example demonstrates how to integrate KafkaConcurrentProcessingMiddleware into a FastStream application. It shows the required manual ack policy and the initialization/stopping of concurrent processing within the lifespan context manager. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) broker = KafkaBroker(...) # Register KCM on the broker before any other middleware (see DI note below) broker.add_middleware(KafkaConcurrentProcessingMiddleware) @asynccontextmanager async def lifespan(_context: ContextRepo): await initialize_concurrent_processing( context=broker.context, concurrency_limit=20, # max concurrent tasks (minimum: 1) commit_batch_size=100, # commit after this many completed tasks commit_batch_timeout_sec=5.0, # or after this many seconds ) try: yield finally: await stop_concurrent_processing(broker.context) app = AsgiFastStream(broker, lifespan=lifespan) @broker.subscriber("my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL) async def handle(msg: str) -> None: ... # Process message concurrently # Subscribers without AckPolicy.MANUAL are passed through unchanged @broker.subscriber("other-topic", group_id="other-group") async def handle_other(msg: str) -> None: ... # Process message sequentially ``` -------------------------------- ### Error Handling and Idempotence Example Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/DOCUMENTATION_MANIFEST.txt Shows how to implement robust error handling and ensure idempotence in message processing to prevent duplicate processing of messages. ```python from faststream.kafka.faststream import KafkaFastStream from faststream.types import KafkaAckableMessage from faststream.kafka.errors import KafkaMessageError import asyncio processed_ids = set() app = KafkaFastStream() @app.subscriber("idempotent-topic") async def handle_idempotent(msg: KafkaAckableMessage): message_id = msg.raw_message.headers.get(b'message_id', None) if message_id and message_id.decode() in processed_ids: print(f"Skipping already processed message ID: {message_id.decode()}") await msg.ack() # Acknowledge to avoid reprocessing return try: print(f"Processing message: {msg.raw_message}") # Simulate processing that might fail await asyncio.sleep(0.1) # If processing is successful, add to processed IDs if message_id: processed_ids.add(message_id.decode()) await msg.ack() except Exception as e: print(f"Error processing message: {e}") # Optionally, nack or log the error for manual intervention # await msg.nack() raise KafkaMessageError("Processing failed") from e ``` -------------------------------- ### Rebalance-Safe Setup with Listener Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Ensures no duplicate processing during rebalances by integrating a rebalance listener created from the concurrent processing handler. The listener is passed to the subscriber's `listener` argument. ```python from contextlib import asynccontextmanager from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) from faststream import ContextRepo, AsgiFastStream broker = KafkaBroker(url="kafka://localhost:9092") broker.add_middleware(KafkaConcurrentProcessingMiddleware) @asynccontextmanager async def lifespan(context: ContextRepo): handler = await initialize_concurrent_processing( context=broker.context, concurrency_limit=10, ) # Create rebalance listener listener = handler.create_rebalance_listener() @broker.subscriber( "my-topic", group_id="my-group", ack_policy=AckPolicy.MANUAL, listener=listener, # ← Pass listener ) async def handle(msg: str): ... try: yield finally: await stop_concurrent_processing(broker.context) app = AsgiFastStream(broker, lifespan=lifespan) ``` -------------------------------- ### Testing KafkaBatchCommitter Loop Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/batch-committer-internals.md Provides an example of testing the internal loop of the KafkaBatchCommitter. It involves spawning the committer, queuing a task, allowing the loop to process it, and then asserting that the consumer's commit method was called. ```python committer = KafkaBatchCommitter() committer.spawn() # Queue a task await committer.send_task( KafkaCommitTask( asyncio_task=asyncio.create_task(asyncio.sleep(0)), topic_partition=TopicPartition("topic", 0), offset=0, consumer=mock_consumer, ) ) # Let the loop process it await asyncio.sleep(0.5) # Verify commit was called assert mock_consumer.commit.called ``` -------------------------------- ### Acquiring and Releasing Semaphore in Task Handling Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/architecture.md Demonstrates how to acquire a semaphore slot before starting a task and release it upon task completion. The `acquire()` method blocks if the concurrency limit is reached, ensuring tasks are processed within the defined bounds. ```python async def handle_task(self, coroutine, record, kafka_message): await self._limiter.acquire() # ← Blocks if limit reached task = asyncio.ensure_future(coroutine) task.add_done_callback(self._finish_task) ... def _finish_task(self, task): self._limiter.release() # ← Frees a slot ... ``` -------------------------------- ### Initialize Concurrent Kafka Processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Initializes and starts concurrent message processing for a Kafka broker. Requires MANUAL ack policy for subscribers. Ensure KafkaConcurrentProcessingMiddleware is registered before DI middleware. ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import \ KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing broker = KafkaBroker(url="kafka://localhost:9092") broker.add_middleware(KafkaConcurrentProcessingMiddleware) @asynccontextmanager async def lifespan(context: ContextRepo): handler = await initialize_concurrent_processing( context=broker.context, concurrency_limit=20, commit_batch_size=100, commit_batch_timeout_sec=5.0, shutdown_timeout_sec=30.0, ) try: yield finally: await stop_concurrent_processing(broker.context) app = AsgiFastStream(broker, lifespan=lifespan) ``` -------------------------------- ### Initialize Concurrent Processing with Environment Variables Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Demonstrates how to read configuration from environment variables and pass them as keyword arguments to initialize_concurrent_processing. Ensure all necessary environment variables are set before application startup. ```python import os concurrency_limit = int(os.getenv("CONCURRENCY_LIMIT", "10")) commit_batch_size = int(os.getenv("COMMIT_BATCH_SIZE", "10")) commit_batch_timeout_sec = float(os.getenv("COMMIT_BATCH_TIMEOUT_SEC", "10.0")) shutdown_timeout_sec = float(os.getenv("SHUTDOWN_TIMEOUT_SEC", "20.0")) handler = await initialize_concurrent_processing( context=broker.context, concurrency_limit=concurrency_limit, commit_batch_size=commit_batch_size, commit_batch_timeout_sec=commit_batch_timeout_sec, shutdown_timeout_sec=shutdown_timeout_sec, ) ``` -------------------------------- ### Check Handler Running Status Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Returns True if the handler is currently running (after start() and before stop()), False otherwise. ```python @property def is_running(self) -> bool ``` -------------------------------- ### Build Docker Image Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/CLAUDE.md Builds the application's Docker image. ```bash just build ``` -------------------------------- ### Per-Subscriber Configuration with Multiple Handlers Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Illustrates how to configure different settings for distinct subscribers by creating separate handlers and KafkaRouters. Each router can be associated with a specific context and handler, allowing for tailored concurrency and batching. ```python from faststream.kafka import KafkaRouter # Handler for critical topics critical_handler = await initialize_concurrent_processing( context=critical_context, concurrency_limit=5, commit_batch_size=1, commit_batch_timeout_sec=1.0, ) # Handler for high-throughput topics fast_handler = await initialize_concurrent_processing( context=fast_context, concurrency_limit=50, commit_batch_size=1000, commit_batch_timeout_sec=10.0, ) # Route critical topics critical_router = KafkaRouter( middlewares=[KafkaConcurrentProcessingMiddleware], ) @critical_router.subscriber("critical-topic", ack_policy=AckPolicy.MANUAL) async def handle_critical(msg: str): ... # Route fast topics fast_router = KafkaRouter( middlewares=[KafkaConcurrentProcessingMiddleware], ) @fast_router.subscriber("high-volume-topic", ack_policy=AckPolicy.MANUAL) async def handle_fast(msg: str): ... broker.include_router(critical_router) broker.include_router(fast_router) ``` -------------------------------- ### initialize_concurrent_processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/api-reference.md Initializes and starts concurrent message processing for a Kafka broker. It sets up a handler for managing concurrent asyncio tasks and offset commits. ```APIDOC ## initialize_concurrent_processing() ### Description Initialize and start concurrent message processing for a Kafka broker. This function sets up a handler that manages concurrent asyncio tasks for processing messages and handles offset commits. ### Method `async def initialize_concurrent_processing( context: faststream.ContextRepo, concurrency_limit: int = 10, commit_batch_size: int = 10, commit_batch_timeout_sec: float = 10.0, shutdown_timeout_sec: float = 20.0, ) -> KafkaConcurrentHandler` ### Parameters #### Path Parameters - None #### Query Parameters - None #### Request Body - None ### Parameters - `context` (`ContextRepo`) - Required - FastStream `ContextRepo` instance where the handler is stored under key "concurrent_processing". - `concurrency_limit` (`int`) - Optional - Default: `10` - Maximum number of concurrent asyncio tasks processing messages simultaneously. Must be ≥ 1. Enforced via `asyncio.Semaphore`. - `commit_batch_size` (`int`) - Optional - Default: `10` - Number of completed tasks before the committer flushes offsets to Kafka. Batch commits are also triggered by timeout or flush events. - `commit_batch_timeout_sec` (`float`) - Optional - Default: `10.0` - Seconds before the committer automatically flushes pending offsets even if `commit_batch_size` is not reached. Minimum interval is per-partition. - `shutdown_timeout_sec` (`float`) - Optional - Default: `20.0` - Maximum seconds to wait for the batch committer's background task to drain pending offsets during shutdown. If exceeded, the task is forcibly cancelled. ### Return Value **`KafkaConcurrentHandler`** - The handler instance, stored in `context` under the key "concurrent_processing". Use this to: - Call `create_rebalance_listener()` for rebalance-safe offset committing - Access properties `is_running` and `is_healthy` for monitoring - Call `stop()` directly if needed (though `stop_concurrent_processing()` is preferred) ### Raises - `RuntimeError` - if `context` is invalid or the handler cannot be stored ### Request Example ```python from contextlib import asynccontextmanager from faststream import ContextRepo from faststream.asgi import AsgiFastStream from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) broker = KafkaBroker(url="kafka://localhost:9092") broker.add_middleware(KafkaConcurrentProcessingMiddleware) @asynccontextmanager async def lifespan(context: ContextRepo): handler = await initialize_concurrent_processing( context=broker.context, concurrency_limit=20, commit_batch_size=100, commit_batch_timeout_sec=5.0, shutdown_timeout_sec=30.0, ) try: yield finally: await stop_concurrent_processing(broker.context) app = AsgiFastStream(broker, lifespan=lifespan) ``` ### Notes - **Idempotent**: Calling multiple times returns the existing handler if still running, logs a warning, and does not create a new one - **Context key**: The handler is stored globally in `context` under the constant "concurrent_processing" (see `faststream_concurrent_aiokafka.consts.PROCESSING_CONTEXT_KEY`) - **MANUAL ack policy required**: All subscribers that receive concurrent processing must use `ack_policy=AckPolicy.MANUAL`. Non-MANUAL subscribers are passed through sequentially and are not affected - **Order of middleware registration**: `KafkaConcurrentProcessingMiddleware` must be registered **before** any DI framework middleware (e.g., `modern-di-faststream.setup_di()`) so the DI scope lifetime aligns with the background task's lifetime ``` -------------------------------- ### Publish to PyPI Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/CLAUDE.md Bumps the version, builds the application, and publishes it to PyPI. Requires GITHUB_REF_NAME environment variable. ```bash just publish ``` -------------------------------- ### Production Configuration for Moderate Throughput Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Recommended settings for production environments handling moderate throughput (around 1K messages/sec), balancing concurrency and commit batching. ```python await initialize_concurrent_processing( context=broker.context, concurrency_limit=10, commit_batch_size=100, commit_batch_timeout_sec=5.0, shutdown_timeout_sec=20.0, ) ``` -------------------------------- ### Initialize Concurrent Processing with Pydantic Settings Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Shows how to use Pydantic's BaseSettings to manage configuration, which can load settings from environment variables prefixed with 'KAFKA_'. This approach externalizes configuration from the main application code. ```python # Example with Pydantic from pydantic_settings import BaseSettings class Settings(BaseSettings): concurrency_limit: int = 10 commit_batch_size: int = 10 commit_batch_timeout_sec: float = 10.0 shutdown_timeout_sec: float = 20.0 class Config: env_prefix = "KAFKA_" settings = Settings() # Loads from env vars handler = await initialize_concurrent_processing( context=broker.context, concurrency_limit=settings.concurrency_limit, commit_batch_size=settings.commit_batch_size, commit_batch_timeout_sec=settings.commit_batch_timeout_sec, shutdown_timeout_sec=settings.shutdown_timeout_sec, ) ``` -------------------------------- ### Different Concurrency Limits per Topic Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Illustrates how to configure different concurrency limits for different topics by using separate brokers and applying the `KafkaConcurrentProcessingMiddleware` with specific `concurrency_limit` and `commit_batch_size` values during initialization. ```python from faststream.kafka import KafkaRouter from faststream import ContextRepo from faststream.kafka import KafkaBroker from faststream.middlewares import AckPolicy from faststream_concurrent_aiokafka import ( KafkaConcurrentProcessingMiddleware, initialize_concurrent_processing, stop_concurrent_processing, ) from faststream import AsgiFastStream from contextlib import asynccontextmanager # Broker for fast topics fast_broker = KafkaBroker(url="kafka://localhost:9092") fast_broker.add_middleware(KafkaConcurrentProcessingMiddleware) # Broker for critical topics critical_broker = KafkaBroker(url="kafka://localhost:9092") critical_broker.add_middleware(KafkaConcurrentProcessingMiddleware) @asynccontextmanager async def lifespan(context: ContextRepo): # Fast handler: high concurrency await initialize_concurrent_processing( context=fast_broker.context, concurrency_limit=50, commit_batch_size=1000, ) # Critical handler: low concurrency, high frequency commits await initialize_concurrent_processing( context=critical_broker.context, concurrency_limit=2, commit_batch_size=1, ) try: yield finally: await stop_concurrent_processing(fast_broker.context) await stop_concurrent_processing(critical_broker.context) app = AsgiFastStream( routers=[fast_broker, critical_broker], lifespan=lifespan, ) @fast_broker.subscriber("events", ack_policy=AckPolicy.MANUAL) async def handle_event(msg: str): # High concurrency pass @critical_broker.subscriber("alerts", ack_policy=AckPolicy.MANUAL) async def handle_alert(msg: str): # Low concurrency, frequent commits pass ``` -------------------------------- ### Watermark Importance in Rebalance Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/architecture.md Explains why clearing cancellation watermarks during partition revocation is crucial to prevent consumers from getting stuck on previously processed offsets. ```text Consumer A (partition 0) ├─ Process offsets [0, 1, 2] ├─ Cancel task at offset 2 (watermark set to 2) ├─ Commit only offsets [0, 1] │ Consumer A is revoked partition 0 ├─ But watermark (id(Consumer A), partition 0) → 2 is still in memory │ Consumer B (new partition 0 assignment) ├─ Pulls tasks at offsets [2, 3, 4, ...] ├─ Tries to commit offsets [2, 3, 4, ...] ├─ Watermark blocks at 2 → can't advance ├─ Stuck! │ Solution: clear_cancellation_watermarks on revoke └─ Delete watermark before Consumer B starts ``` -------------------------------- ### Development Configuration for Concurrent Processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md Recommended settings for local development environments, prioritizing fast shutdown due to low network latency. ```python await initialize_concurrent_processing( context=broker.context, concurrency_limit=4, commit_batch_size=10, commit_batch_timeout_sec=1.0, shutdown_timeout_sec=5.0, ) ``` -------------------------------- ### Idempotent Order Processing Subscriber Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/quick-start.md Implement an idempotent subscriber to safely handle message redelivery. This example checks if an order has already been processed before saving and sending an email. ```python from faststream.kafka import KafkaBroker from faststream.types import AckPolicy broker = KafkaBroker() # Assume db and send_email are defined elsewhere # async def send_email(user_id: int): # pass @broker.subscriber("orders", ack_policy=AckPolicy.MANUAL) async def process_order(order_id: int, user_id: int): try: # Check if already processed (idempotence) if await db.orders.find_one(order_id): print(f"Order {order_id} already processed, skipping") return # Safe to skip; offset will commit # Process the order await db.orders.insert_one({"id": order_id, "user_id": user_id}) # Call external service (may fail, causing redelivery) await send_email(user_id) except Exception as e: # Log but don't handle; offset stays uncommitted # On restart, the message is redelivered print(f"Order {order_id} failed: {e}") raise ``` -------------------------------- ### Full Signature of initialize_concurrent_processing Source: https://github.com/modern-python/faststream-concurrent-aiokafka/blob/main/_autodocs/configuration.md This shows the complete signature of the function used to initialize concurrent processing, including all parameters and their default values. ```python async def initialize_concurrent_processing( context: faststream.ContextRepo, concurrency_limit: int = 10, commit_batch_size: int = 10, commit_batch_timeout_sec: float = 10.0, shutdown_timeout_sec: float = 20.0, ) -> KafkaConcurrentHandler ```