### Full Quickstart Example Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/basic.md A complete example demonstrating the setup of the outbox table, broker, app, a subscriber, and publishing a message upon application startup. This serves as a comprehensive starting point for using FastStream Outbox. ```python from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from faststream import FastStream from faststream_outbox import OutboxBroker, make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker(engine, outbox_table=outbox_table) app = FastStream(broker) @broker.subscriber("orders", max_workers=4) async def handle(order_id: int) -> None: print(f"order {order_id}") session_factory = async_sessionmaker(engine, expire_on_commit=False) @app.after_startup async def publish_one() -> None: async with session_factory() as session, session.begin(): await broker.publish(1, queue="orders", session=session) ``` -------------------------------- ### FastAPI Quickstart with OutboxRouter Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/fastapi.md Example demonstrating how to set up FastAPI, SQLAlchemy, and FastStream Outbox for transactional message publishing. ```python from collections.abc import AsyncIterator from fastapi import Depends, FastAPI from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from faststream_outbox import make_outbox_table from faststream_outbox.fastapi import OutboxBroker, OutboxRouter metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://localhost/app") session_factory = async_sessionmaker(engine, expire_on_commit=False) async def get_session() -> AsyncIterator[AsyncSession]: async with session_factory() as s, s.begin(): yield s router = OutboxRouter(engine, outbox_table=outbox_table) @router.subscriber("orders") async def handle( body: dict, session: AsyncSession = Depends(get_session), ) -> None: ... # domain writes on `session` commit with any chained outbox publishes @router.post("/orders") async def create_order( order: OrderIn, broker: OutboxBroker, session: AsyncSession = Depends(get_session), ) -> dict: session.add(Order(...)) await broker.publish({"order_id": ...}, queue="orders", session=session) return {"ok": True} app = FastAPI() app.include_router(router) ``` -------------------------------- ### Install faststream-outbox with combined extras Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Install faststream-outbox with multiple optional extras, such as 'asyncpg', 'fastapi', and 'prometheus', by separating them with commas. ```bash pip install 'faststream-outbox[asyncpg,fastapi,prometheus]' ``` -------------------------------- ### Install faststream-outbox with opentelemetry extra Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Install faststream-outbox with the 'opentelemetry' extra to enable the OpenTelemetryRecorder metrics adapter and OutboxTelemetryMiddleware for observability. ```bash pip install 'faststream-outbox[opentelemetry]' ``` -------------------------------- ### Install faststream-outbox with prometheus extra Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Install faststream-outbox with the 'prometheus' extra to enable the PrometheusRecorder metrics adapter and OutboxPrometheusMiddleware for observability. ```bash pip install 'faststream-outbox[prometheus]' ``` -------------------------------- ### Prometheus Adapter Setup Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Integrate the Prometheus recorder with FastStream's ASGI application. This setup exposes Prometheus metrics at the /metrics endpoint and a health check at /healthz. Ensure you install the necessary dependencies. ```bash pip install 'faststream-outbox[prometheus]' uvicorn ``` ```python # app.py — run with `uvicorn app:app --host 0.0.0.0 --port 8000` from faststream.asgi import AsgiFastStream, make_ping_asgi from prometheus_client import REGISTRY, make_asgi_app from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine from faststream_outbox import OutboxBroker, make_outbox_table from faststream_outbox.metrics.prometheus import PrometheusRecorder metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker( engine, outbox_table=outbox_table, metrics_recorder=PrometheusRecorder(app_name="checkout", registry=REGISTRY), ) @broker.subscriber("orders", max_workers=4) async def handle_order(body: dict) -> None: ... app = AsgiFastStream( broker, asgi_routes=[ ("/metrics", make_asgi_app(registry=REGISTRY)), ("/healthz", make_ping_asgi(broker, timeout=2.0)), ], ) ``` -------------------------------- ### FastStream Outbox Setup and Usage Source: https://github.com/modern-python/faststream-outbox/blob/main/README.md Demonstrates the basic setup of OutboxBroker with SQLAlchemy, including table creation, engine configuration, and defining a subscriber. It also shows how a producer can publish messages within an existing SQLAlchemy transaction. ```python from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from faststream import FastStream from faststream_outbox import OutboxBroker, make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://localhost/app") broker = OutboxBroker(engine, outbox_table=outbox_table) app = FastStream(broker) @broker.subscriber("orders", max_workers=4) async def handle(order_id: int) -> None: print(f"order {order_id}") # Producer side — share the caller's open transaction: session_factory = async_sessionmaker(engine, expire_on_commit=False) async with session_factory() as session, session.begin(): session.add(Order(id=1)) await broker.publish(1, queue="orders", session=session) ``` -------------------------------- ### Install FastStream Outbox with FastAPI Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/fastapi.md Install the necessary package for FastAPI integration using pip. ```bash pip install 'faststream-outbox[fastapi]' ``` -------------------------------- ### Install faststream-outbox with asyncpg extra Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Install faststream-outbox with the 'asyncpg' extra to enable SQLAlchemy driver for LISTEN/NOTIFY wakeups, reducing idle latency in the subscriber's fetch loop. ```bash pip install 'faststream-outbox[asyncpg]' ``` -------------------------------- ### Start PostgreSQL instance with Docker Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md This command starts a PostgreSQL 17 instance in a Docker container, configured with the specified user, password, and database name. It's useful for setting up a local development environment. ```bash docker run -d -p 5432:5432 \ -e POSTGRES_USER=outbox \ -e POSTGRES_PASSWORD=outbox \ -e POSTGRES_DB=outbox \ postgres:17 ``` -------------------------------- ### Install FastStream Outbox with OpenTelemetry and Prometheus for Full Tracing Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Install packages for FastStream Outbox with OpenTelemetry and Prometheus, including OTLP trace exporter, for comprehensive monitoring. ```bash pip install 'faststream-outbox[opentelemetry,prometheus]' \ opentelemetry-exporter-otlp opentelemetry-exporter-prometheus uvicorn ``` -------------------------------- ### Install FastStream Outbox with OpenTelemetry and Prometheus Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Install the necessary packages for FastStream Outbox with OpenTelemetry and Prometheus support, along with uvicorn for running the ASGI application. ```bash pip install 'faststream-outbox[opentelemetry,prometheus]' \ opentelemetry-exporter-prometheus uvicorn ``` -------------------------------- ### Install faststream-outbox with uv Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Use this command to add faststream-outbox to your project when using uv as your package manager. ```bash uv add faststream-outbox ``` -------------------------------- ### Install FastStream Outbox with Validation Extra Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/schema-validation.md Install the package with the 'validate' extra to enable schema validation functionality. Calling validate_schema() without this extra will result in an ImportError. ```bash pip install 'faststream-outbox[validate]' ``` -------------------------------- ### Prometheus Query Examples Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Examples of PromQL queries for monitoring handler throughput, error rates, latency, and lease status, as well as publish throughput and latency per queue. These queries leverage the 'broker="outbox"' label. ```promql # Handler throughput (acked / sec) rate(faststream_received_processed_messages_total{broker="outbox",status="acked"}[1m]) ``` ```promql # Handler error rate rate(faststream_received_processed_messages_total{broker="outbox",status!="acked"}[5m]) / rate(faststream_received_processed_messages_total{broker="outbox"}[5m]) ``` ```promql # P99 handler latency histogram_quantile(0.99, rate(faststream_received_processed_messages_duration_seconds_bucket{broker="outbox"}[5m])) ``` ```promql # In-flight gauge faststream_received_messages_in_process{broker="outbox"} ``` ```promql # Operator playbook: lease_ttl_seconds is too low for this handler's P99 rate(faststream_outbox_lease_lost_total[5m]) > 0 ``` ```promql # Publish throughput per queue rate(faststream_published_messages_total{broker="outbox",status="success"}[1m]) ``` ```promql # P99 publish (INSERT) latency per queue histogram_quantile(0.99, rate(faststream_published_messages_duration_seconds_bucket{broker="outbox"}[5m])) ``` -------------------------------- ### Install faststream-outbox with poetry Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/installation.md Use this command to add faststream-outbox to your project if you are managing dependencies with poetry. ```bash poetry add faststream-outbox ``` -------------------------------- ### Opting Out of Retries with NoRetry Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/subscriber.md This example shows how to explicitly opt out of all retry mechanisms for a subscriber by using the NoRetry strategy. ```python @broker.subscriber("audit", retry_strategy=NoRetry()) # opt out of retries async def handle_audit(payload: dict) -> None: ... ``` -------------------------------- ### OpenTelemetry Metrics Adapter Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Use the OpenTelemetry adapter for metrics. It requires `faststream-outbox[opentelemetry]` to be installed. This adapter is meter-only. ```python from faststream_outbox.metrics.opentelemetry import OpenTelemetryRecorder from faststream_outbox import OutboxBroker, OutboxBrokerConfig # OpenTelemetryRecorder requires OpenTelemetry libraries # Ensure you have installed faststream-outbox[opentelemetry] recorder = OpenTelemetryRecorder() config = OutboxBrokerConfig() broker = OutboxBroker( config=config, metrics_recorder=recorder ) ``` -------------------------------- ### Prometheus Metrics Adapter Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Use the Prometheus adapter for metrics. It requires `faststream-outbox[prometheus]` to be installed. Note the different label sets for consume vs. publish. ```python from faststream_outbox.metrics.prometheus import PrometheusRecorder from faststream_outbox import OutboxBroker, OutboxBrokerConfig # PrometheusRecorder requires Prometheus client library # Ensure you have installed faststream-outbox[prometheus] recorder = PrometheusRecorder() config = OutboxBrokerConfig() broker = OutboxBroker( config=config, metrics_recorder=recorder ) ``` -------------------------------- ### FastStream Outbox with Native Middleware for Spans and Bus Parity Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Configure FastStream Outbox with native middleware for OpenTelemetry tracing (spans) and metrics, ensuring parity with FastStream's core bus instrumentation. This setup pushes traces to an OTLP endpoint. ```python # app.py — run with `OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 \ # uvicorn app:app --host 0.0.0.0 --port 8000` from faststream.asgi import AsgiFastStream from opentelemetry import metrics, trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from prometheus_client import REGISTRY, make_asgi_app from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine from faststream_outbox import OutboxBroker, make_outbox_table from faststream_outbox.metrics.prometheus import PrometheusRecorder from faststream_outbox.opentelemetry import OutboxTelemetryMiddleware from faststream_outbox.prometheus import OutboxPrometheusMiddleware # ----- OTel SDK ----- resource = Resource.create({"service.name": "my-outbox-service"}) tracer_provider = TracerProvider(resource=resource) tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) trace.set_tracer_provider(tracer_provider) meter_provider = MeterProvider(resource=resource, metric_readers=[PrometheusMetricReader()]) metrics.set_meter_provider(meter_provider) ``` -------------------------------- ### Create Broker and App Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/basic.md Initialize the `OutboxBroker` with an asynchronous SQLAlchemy engine and the defined outbox table. Then, create a `FastStream` application instance using this broker. ```python from sqlalchemy.ext.asyncio import create_async_engine from faststream import FastStream from faststream_outbox import OutboxBroker engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker(engine, outbox_table=outbox_table) app = FastStream(broker) ``` -------------------------------- ### Configure Outbox Broker with Observability Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Set up the OutboxBroker with OutboxTelemetryMiddleware and OutboxPrometheusMiddleware for tracing and metrics. Also configures a PrometheusRecorder for internal outbox events. Requires an async engine and metadata. ```python from faststream.observability.telemetry import TracerProvider, MeterProvider from faststream.prometheus import PrometheusRecorder, REGISTRY from faststream.outbox import OutboxBroker, OutboxTelemetryMiddleware, OutboxPrometheusMiddleware from faststream.db import MetaData, create_async_engine from faststream.asgi import AsgiFastStream, make_asgi_app metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") tracer_provider = TracerProvider() meter_provider = MeterProvider() broker = OutboxBroker( engine, outbox_table=outbox_table, middlewares=[ # Bus-scope spans + meters around consume_scope / publish_scope. OutboxTelemetryMiddleware(tracer_provider=tracer_provider, meter_provider=meter_provider), OutboxPrometheusMiddleware(registry=REGISTRY, app_name="my-outbox-service"), ], # Outbox-internal events (fetched, lease_lost, terminal reasons) that have # no message context and can't reach the middleware bus. metrics_recorder=PrometheusRecorder(registry=REGISTRY, app_name="my-outbox-service"), ) @broker.subscriber("orders", max_workers=4) async def handle_order(body: dict) -> None: ... app = AsgiFastStream(broker, asgi_routes=[("/metrics", make_asgi_app(registry=REGISTRY))]) ``` -------------------------------- ### Basic Test with TestOutboxBroker Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/testing.md Demonstrates a basic unit test using TestOutboxBroker. It sets up an OutboxBroker, defines a subscriber, and publishes a message within the test context. The handler runs synchronously. ```python import pytest from faststream_outbox import OutboxBroker, make_outbox_table from faststream_outbox.testing import TestOutboxBroker from sqlalchemy import MetaData @pytest.mark.asyncio async def test_handler() -> None: metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") broker = OutboxBroker(None, outbox_table=outbox_table) # engine not needed received: list[int] = [] @broker.subscriber("orders") async def handle(order_id: int) -> None: received.append(order_id) async with TestOutboxBroker(broker): await broker.publish(1, queue="orders") # Handler has already run. assert received == [1] ``` -------------------------------- ### Configure Outbox Broker with DLQ Table Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/dlq.md This snippet demonstrates how to set up the `OutboxBroker` with both an outbox table and a dead-letter queue table. Ensure the `MetaData` object is shared between both table definitions. The broker will automatically use the `dlq_table` for archiving failed messages. ```python from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine from faststream_outbox import OutboxBroker, make_dlq_table, make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") dlq_table = make_dlq_table(metadata, table_name="outbox_dlq") engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker(engine, outbox_table=outbox_table, dlq_table=dlq_table) ``` -------------------------------- ### Create and use a typed publisher with broker.publisher Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/publisher.md Instantiate a queue-scoped `OutboxPublisher` using `broker.publisher` to encapsulate per-queue configuration like static headers. This wrapper simplifies publishing to a specific queue and ensures transactional integrity when used within a handler. ```python orders_pub = broker.publisher("orders", headers={"source": "checkout"}) async def checkout(order: Order, session: AsyncSession) -> None: session.add(order) # domain write await orders_pub.publish({"order_id": order.id}, session=session) await session.commit() # row + domain commit together ``` -------------------------------- ### Full signature for broker.publish_batch Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/publisher.md This illustrates the parameters for `broker.publish_batch`, highlighting that it returns `None` and supports batch scheduling via `activate_in` or `activate_at`. ```python await broker.publish_batch( *bodies, queue: str, session: AsyncSession, headers: dict[str, str] | None = None, activate_in: timedelta | None = None, activate_at: datetime | None = None, ) -> None ``` -------------------------------- ### Configure OutboxBroker with DLQ Table Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Pass the created DLQ table to the OutboxBroker constructor to archive terminal failures. The default broker behavior remains unchanged if dlq_table is None. ```python OutboxBroker(..., dlq_table=...) ``` -------------------------------- ### Testing Publishers with TestOutboxBroker Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/testing.md Shows how to test message publishers using TestOutboxBroker. It sets up a broker and a publisher, then publishes a message within the test context, verifying that the subscriber receives it. ```python async def test_publisher() -> None: metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") broker = OutboxBroker(None, outbox_table=outbox_table) received: list[dict] = [] @broker.subscriber("orders") async def handle(body: dict) -> None: received.append(body) pub = broker.publisher("orders") async with TestOutboxBroker(broker): await pub.publish({"order_id": 1}) assert received == [{"order_id": 1}] ``` -------------------------------- ### Create DLQ Table Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Returns a sibling audit table for archiving terminal failures. Pass this to OutboxBroker to enable DLQ functionality. ```python make_dlq_table(metadata, table_name="outbox_dlq") ``` -------------------------------- ### Full signature for broker.publish Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/publisher.md This shows the complete set of parameters available for the `broker.publish` method, including optional headers, scheduling options, and timer IDs. ```python await broker.publish( body, *, queue: str, session: AsyncSession, headers: dict[str, str] | None = None, correlation_id: str | None = None, activate_in: timedelta | None = None, activate_at: datetime | None = None, timer_id: str | None = None, ) -> int | None ``` -------------------------------- ### Create an OutboxRouter and Define a Subscriber Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/router.md Instantiate an OutboxRouter and use its subscriber decorator to define message handlers for specific queues. ```python from faststream_outbox import OutboxRouter router = OutboxRouter() @router.subscriber("orders") async def handle_order(order_id: int) -> None: print(f"order {order_id}") ``` -------------------------------- ### Testing DLQ with TestOutboxBroker Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/dlq.md Use `TestOutboxBroker` to simulate message processing and verify DLQ behavior. It accumulates audit rows in `broker.fake_client.dlq_rows` and removes the source row from `fake_client.rows`. ```python from faststream_outbox import NoRetry, OutboxBroker, TestOutboxBroker, make_dlq_table, make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") dlq_table = make_dlq_table(metadata, table_name="outbox_dlq") broker = OutboxBroker(outbox_table=outbox_table, dlq_table=dlq_table) @broker.subscriber("orders", retry_strategy=NoRetry()) async def handle(body: dict) -> None: raise RuntimeError("boom") test_broker = TestOutboxBroker(broker) async with test_broker: await broker.publish({"order_id": 1}, queue="orders") assert test_broker.fake_client.rows == [] assert len(test_broker.fake_client.dlq_rows) == 1 assert test_broker.fake_client.dlq_rows[0]["failure_reason"] == "retry_terminal" ``` -------------------------------- ### FastAPI with Annotated Context Shortcuts Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/fastapi.md Using annotated context shortcuts like `OutboxMessage` and `OutboxBroker` within FastAPI subscriber handlers. ```python from faststream_outbox.fastapi import OutboxBroker, OutboxMessage @router.subscriber("orders") async def handle( msg: OutboxMessage, broker: OutboxBroker, session: AsyncSession = Depends(get_session), ) -> None: ... ``` -------------------------------- ### Custom Metrics Recorder Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Implement a custom metrics recorder by providing a callable to `OutboxBroker`. The recorder must not block the event loop. ```python from typing import Callable, Mapping, Any from faststream_outbox import OutboxBroker, OutboxBrokerConfig def my_metrics_recorder(name: str, labels: Mapping[str, Any]) -> None: print(f"Metric: {name}, Labels: {labels}") # Add your custom metric recording logic here # Ensure this function does not block the event loop config = OutboxBrokerConfig() broker = OutboxBroker( config=config, metrics_recorder=my_metrics_recorder ) ``` -------------------------------- ### Annotated Handler Parameters with Context Shortcuts Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/subscriber.md Utilize `faststream_outbox.annotations` for concise handler signatures, providing access to `OutboxMessage` and `OutboxBroker` via `Context`. ```python from faststream_outbox.annotations import OutboxBroker, OutboxMessage @broker.subscriber("orders") async def handle(msg: OutboxMessage, broker: OutboxBroker) -> None: ... ``` -------------------------------- ### Configuring ExponentialRetry for Subscriber Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/subscriber.md This snippet demonstrates how to configure an ExponentialRetry strategy for a subscriber. It specifies initial delay, maximum delay, number of attempts, and jitter factor. ```python from faststream_outbox import ExponentialRetry, ConstantRetry, LinearRetry, NoRetry @broker.subscriber( "orders", retry_strategy=ExponentialRetry( initial_delay_seconds=1.0, max_delay_seconds=300.0, max_attempts=5, jitter_factor=0.5, ), ) async def handle(order_id: int) -> None: ... ``` -------------------------------- ### Include an OutboxRouter in the Broker Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/router.md Attach a defined OutboxRouter to an OutboxBroker instance using the include_router method. Ensure the broker is initialized with the necessary engine and outbox_table. ```python from faststream import FastStream from faststream_outbox import OutboxBroker from myapp.routers import router broker = OutboxBroker(engine, outbox_table=outbox_table) broker.include_router(router) app = FastStream(broker) ``` -------------------------------- ### Configuring TestOutboxBroker for Loop-Driven Mode Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/testing.md Illustrates how to enable loop-driven mode in TestOutboxBroker for testing polling semantics, retry rescheduling, and lease expiry. This mode allows testing real polling behavior. ```python async with TestOutboxBroker(broker, run_loops=True): ... # use feed() / poll until handler observes the row ``` -------------------------------- ### FastStream Outbox with OpenTelemetry Metrics Only Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/observability.md Configure FastStream Outbox to expose OpenTelemetry metrics that are scraped by Prometheus. The metrics are available at the /metrics endpoint. ```python # app.py — run with `uvicorn app:app --host 0.0.0.0 --port 8000` from faststream.asgi import AsgiFastStream from opentelemetry import metrics from opentelemetry.exporter.prometheus import PrometheusMetricReader from opentelemetry.sdk.metrics import MeterProvider from prometheus_client import REGISTRY, make_asgi_app from sqlalchemy import MetaData from sqlalchemy.ext.asyncio import create_async_engine from faststream_outbox import OutboxBroker, make_outbox_table from faststream_outbox.metrics.opentelemetry import OpenTelemetryRecorder # OTel meters → Prometheus reader (scraped at /metrics below) prometheus_reader = PrometheusMetricReader() meter_provider = MeterProvider(metric_readers=[prometheus_reader]) metrics.set_meter_provider(meter_provider) metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") engine = create_async_engine("postgresql+asyncpg://outbox:outbox@localhost:5432/outbox") broker = OutboxBroker( engine, outbox_table=outbox_table, metrics_recorder=OpenTelemetryRecorder(meter_provider=meter_provider), ) @broker.subscriber("orders", max_workers=4) async def handle_order(body: dict) -> None: ... app = AsgiFastStream(broker, asgi_routes=[("/metrics", make_asgi_app(registry=REGISTRY))]) ``` -------------------------------- ### Schedule Timer with Absolute Activation Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/timers.md Schedule an outbox row to fire at a specific, timezone-aware UTC instant. This ensures precise timing for future events. ```python # Fire at a specific UTC instant: await broker.publish( {"x": 1}, queue="orders", session=session, activate_at=dt.datetime(2026, 6, 1, 9, tzinfo=dt.UTC), ) ``` -------------------------------- ### Pytest Configuration for Asyncio Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/testing.md Provides the necessary configuration for pytest to handle asyncio tests automatically. This is typically added to the pyproject.toml file. ```toml [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" ``` -------------------------------- ### Publishing from a FastStream Handler Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/publisher.md Use `OutboxMessage` for message typing and `OutboxBroker` to publish messages within a subscriber handler. Ensure you have a session factory available for transactional publishing. ```python from faststream_outbox.annotations import OutboxBroker, OutboxMessage @broker.subscriber("orders") async def handle(msg: OutboxMessage, broker: OutboxBroker) -> None: async with session_factory() as session, session.begin(): await broker.publish({"chained": True}, queue="downstream", session=session) ``` -------------------------------- ### Schedule Timer with Relative Activation Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/timers.md Schedule an outbox row to fire a specified duration from the current time. Use `timer_id` for deduplication. ```python import datetime as dt # Fire 30 seconds from now, deduplicated by timer_id: await broker.publish( {"order_id": 1}, queue="orders", session=session, activate_in=dt.timedelta(seconds=30), timer_id=f"order-confirm-{order.id}", ) ``` -------------------------------- ### Publishing a Message with Transactional Outbox Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/introduction/how-it-works.md Demonstrates how to publish a message within an existing SQLAlchemy transaction. The outbox row is inserted into the same session, ensuring atomicity with the domain write. The broker does not flush or commit, relying on the caller's session to commit. ```python async with session_factory() as session, session.begin(): session.add(order) # domain write await broker.publish(order.id, queue="orders", session=session) # session.begin() commits both atomically on exit ``` -------------------------------- ### Publishing multiple messages with FastStream Outbox Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Use `broker.publish_batch` for inserting multiple messages into the outbox table. It requires an `AsyncSession` and operates within the caller's transaction. This method does not return any value. ```python await broker.publish_batch(*bodies, queue=queue, session=session) ``` -------------------------------- ### Publish multiple messages with broker.publish_batch Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/publisher.md Utilize `broker.publish_batch` to efficiently insert multiple outbox rows in a single database operation. This method requires an `AsyncSession` and does not accept a `timer_id`. ```python async with session_factory() as session, session.begin(): await broker.publish_batch( {"order_id": 1}, {"order_id": 2}, {"order_id": 3}, queue="orders", session=session, ) ``` -------------------------------- ### No-Operation Metrics Recorder Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md The default `_noop_recorder` allows instrumentation sites to call unconditionally without any side effects. ```python from faststream_outbox import OutboxBroker, OutboxBrokerConfig # No need to explicitly set metrics_recorder if you want the default no-op behavior broker = OutboxBroker(config=OutboxBrokerConfig()) ``` -------------------------------- ### Publishing a single message with FastStream Outbox Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Use `broker.publish` to insert a message into the outbox table. This method requires an `AsyncSession` and commits with the caller's domain writes. It returns the ID of the inserted row. ```python await broker.publish(body, queue=queue, session=session) ``` -------------------------------- ### Declare Outbox Table Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/basic.md Define the outbox table schema using `make_outbox_table`. This table is used by the broker to store messages before they are published. The package does not handle schema creation or migration; this is typically done with tools like Alembic. ```python from sqlalchemy import MetaData from faststream_outbox import make_outbox_table metadata = MetaData() outbox_table = make_outbox_table(metadata, table_name="outbox") ``` -------------------------------- ### Subscriber Options Configuration Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/subscriber.md Configure advanced subscriber options like `max_workers`, `fetch_batch_size`, and `lease_ttl_seconds` directly in the `@broker.subscriber` decorator for fine-grained control. ```python @broker.subscriber( "high-priority", max_workers=8, fetch_batch_size=50, min_fetch_interval=0.1, max_fetch_interval=1.0, lease_ttl_seconds=120.0, ) async def handle_urgent(body: dict) -> None: ... ``` -------------------------------- ### Test Broker Fake Publish Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md The test broker mirrors producer-side emissions for testing purposes. Synthetic events use `duration_seconds=0.0`. ```python from faststream_outbox.testing import FakeOutboxProducer producer = FakeOutboxProducer() # Example of publishing a single message published_event = producer.publish("test message") # Assertions can be made on published_event, e.g., published_event.duration_seconds == 0.0 # Example of publishing a batch of messages published_batch_events = producer.publish_batch(["msg1", "msg2"]) ``` -------------------------------- ### Basic Subscriber Registration Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/subscriber.md Register a handler function to process messages from a specific queue using the `@broker.subscriber` decorator. Ensure the broker is initialized. ```python from faststream import FastStream from faststream_outbox import OutboxBroker broker: OutboxBroker = ... app = FastStream(broker) @broker.subscriber("orders") async def handle(order_id: int) -> None: print(f"order {order_id}") ``` -------------------------------- ### Define Routes Up-front with OutboxRoute Source: https://github.com/modern-python/faststream-outbox/blob/main/docs/usage/router.md Declare handlers and their corresponding queues using OutboxRoute, which is useful for code generation or plugin patterns. Options like max_workers can be configured. ```python from faststream_outbox import OutboxRoute, OutboxRouter async def handle_order(order_id: int) -> None: print(f"order {order_id}") router = OutboxRouter( handlers=[ OutboxRoute(handle_order, "orders", max_workers=4), ], ) ``` -------------------------------- ### Publishing a message using an OutboxPublisher instance Source: https://github.com/modern-python/faststream-outbox/blob/main/CLAUDE.md Call the `publish` method on an `OutboxPublisher` instance to send a message. This method adheres to the transactional contract, requiring a session and committing with the caller's domain writes. ```python await pub.publish(body, session=session, headers=None, correlation_id=None, activate_in=None, activate_at=None, timer_id=None) ```