### RustStream Quickstart Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/index.md This snippet demonstrates a basic RustStream application setup. It utilizes the `#[ruststream::app]` macro to generate the main function, allowing for easy service scaffolding and AsyncAPI document generation. ```rust --8<-- "examples/quickstart.rs" ``` -------------------------------- ### Install and Scaffold RustStream Project Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/quickstart.md Install the ruststream CLI and scaffold a new project. Then navigate into the project directory. ```bash cargo install ruststream --features cli ruststream new my-service cd my-service ``` -------------------------------- ### Batch Subscriber Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/subscribers.md Demonstrates the setup for a batch subscriber using the `batch(..)` wrapper, which enables whole-batch consumption. ```rust --8<-- "examples/subscribers.rs:batch" ``` -------------------------------- ### Complete AsyncAPI Server Example with Axum Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/asyncapi.md An example demonstrating how to serve both the AsyncAPI document and the interactive viewer using the Axum web framework. This example requires specific features to be enabled. ```rust --8<-- "examples/asyncapi_http.rs" ``` -------------------------------- ### Run Example and Test Metrics Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/metrics.md Instructions to run the complete metrics HTTP server example and test its functionality using `curl`. ```bash cargo run --example metrics_http --features macros,memory,metrics curl -X POST http://127.0.0.1:8080/orders -d '{"id":1,"quantity":3}' curl http://127.0.0.1:8080/metrics ``` -------------------------------- ### Start Memory Broker for Testing Source: https://github.com/powersemmi/ruststream/blob/main/docs/brokers/memory.md Use MemoryBroker::start() to get a test client for driving and observing the broker from the outside. ```rust let broker = MemoryBroker::start(); ``` -------------------------------- ### Create and Connect MemoryBroker Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/08-memory-broker.md Demonstrates creating a new MemoryBroker instance and connecting it. This example shows basic setup for subscribing and publishing within a single process. ```rust use ruststream::memory::MemoryBroker; #[tokio::main] async fn main() -> Result<(), Box> { let broker = MemoryBroker::new(); broker.connect().await?; // Subscribe and publish let mut sub = broker.subscribe("orders")?; let pub = broker.publisher(); broker.shutdown().await?; Ok(()) } ``` -------------------------------- ### Wire Up Metrics Layers Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/metrics.md Instantiate a `Metrics` object and install its consume and publish layers to start collecting metrics. This setup is necessary for tracking message handling. ```rust let (metrics, consume_layer, publish_layer) = Metrics::new(); ``` -------------------------------- ### Complete Metrics HTTP Server Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/metrics.md This example demonstrates a full implementation of a metrics server using `axum`. It includes setting up the metrics, defining routes for orders and metrics export, and running the server. ```rust use axum::extract::State; use axum::http::StatusCode; use axum::routing::{get, post}; use axum::Router; use ruststream::metrics::{Metrics, MetricsLayer}; use ruststream::prelude::*; use std::net::SocketAddr; use std::sync::Arc; use tokio::signal; #[derive(Clone, Default)] struct AppState { metrics: Metrics, } #[tokio::main] async fn main() { let metrics = Metrics::new(); let consume_layer = MetricsLayer::new(metrics.clone()); let publish_layer = MetricsLayer::new(metrics.clone()); let app = Router::new() .route("/", get(root)) .route("/metrics", get(metrics_handler).with_state(metrics.clone())) .route("/orders", post(orders_handler).with_state(publish_layer)) .with_state(AppState { metrics }); let addr = SocketAddr::from(([127, 0, 0, 1], 8080)); println!("listening on {}", addr); axum::Server::bind(addr) .serve(app.into_make_service()) .with_graceful_shutdown(shutdown_signal()) .await .unwrap(); } async fn root() -> &'static str { "Hello, World!" } async fn metrics_handler(State(metrics): State) -> String { metrics.export().unwrap() } async fn orders_handler( State(publish_layer): State>, // we get `Bytes` assuming that the request contains // valid json we can echo back axum::body::Bytes(body): axum::body::Bytes, ) -> (StatusCode, Bytes) { // here we just echo the data back, but in a real application // you would process the order and publish it. // publish_layer.publish(order).await.unwrap(); (StatusCode::OK, body) } async fn shutdown_signal() { let ctrl_c = async { signal::} .await .expect("failed to install CTRL+C handler"); #[cfg(unix)] { use tokio::signal::unix::SignalKind; let sigterm = async { signal::unix::signal(SignalKind::terminate)? .recv() .await }; tokio::select! { either sigterm, ctrl_c } else { println!("received SIGTERM or CTRL+C, shutting down gracefully..."); } } #[cfg(windows)] { let _ = ctrl_c; println!("received CTRL+C, shutting down gracefully..."); } } ``` -------------------------------- ### Open Subscription Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/06-subscriptions-capabilities.md An example function demonstrating how to open a subscription using the SubscriptionSource trait. It takes a source and a broker, then calls the subscribe method. ```rust use ruststream::{Broker, SubscriptionSource}; async fn open(source: S, broker: &B) -> Result where B: Broker, S: SubscriptionSource, { source.subscribe(broker).await } ``` -------------------------------- ### Main Application Setup Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/tutorial.md Set up the main application by defining the module, importing necessary components, and using the RustStream runtime. ```rust mod orders; use ruststream::memory::MemoryBroker; use ruststream::runtime::{AppInfo, RustStream}; use crate::orders::handle; --8<-- "examples/quickstart.rs:app" ``` -------------------------------- ### DescribeServer Example Usage Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/06-subscriptions-capabilities.md Demonstrates how to use the `describe_server` method to retrieve server specifications and format connection details. ```rust use ruststream::DescribeServer; fn get_spec(broker: &B) -> String { let spec = broker.describe_server(); format!("{}: {}", spec.protocol, spec.host) } ``` -------------------------------- ### Broker Lifecycle Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/01-core-traits.md Demonstrates how to use the `connect` and `shutdown` methods of the `Broker` trait. ```rust use ruststream::Broker; async fn lifecycle(broker: &B) -> Result<(), B::Error> { broker.connect().await?; // ... work ... broker.shutdown().await } ``` -------------------------------- ### Install RustStream CLI Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/installation.md Install the RustStream command-line interface tool using cargo install, enabling the 'cli' feature for scaffolding and running subcommands. ```bash cargo install ruststream --features cli ``` -------------------------------- ### Subscribe Capability Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/06-subscriptions-capabilities.md An example function showing how to use the Subscribe capability to open a subscription to a named topic. It requires the broker to implement the Subscribe trait. ```rust use ruststream::{Broker, Subscribe}; async fn open(broker: &B) -> Result { broker.subscribe("orders").await } ``` -------------------------------- ### HandlerExt with Layer Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Demonstrates how to use the `with` method from `HandlerExt` to apply a `TracingLayer` to a handler. ```rust use ruststream::runtime::{HandlerExt, layers::TracingLayer}; async fn my_handler(msg: &str) {} let traced = my_handler.with(TracingLayer::default()); ``` -------------------------------- ### ServerSpec Constructor and Builder Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/06-subscriptions-capabilities.md Shows how to create a `ServerSpec` using the `new` constructor and chain the `with_description` builder method. ```rust use ruststream::capability::ServerSpec; let spec = ServerSpec::new("nats.example.com:4222", "nats") .with_description("Production NATS cluster"); ``` -------------------------------- ### DecodeFailure Handling Examples Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/09-typed-handlers.md Examples demonstrating how to configure `typed` handlers to either drop or requeue messages upon decoding failure. ```rust use ruststream::runtime::DecodeFailure; // Drop unparseable messages silently let handler1 = typed(codec, handler).on_decode_failure(DecodeFailure::Drop); // Requeue unparseable messages for later retry let handler2 = typed(codec, handler).on_decode_failure(DecodeFailure::Requeue); ``` -------------------------------- ### Publisher Publish Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/01-core-traits.md Demonstrates publishing a new message using the `publish` method of the `Publisher` trait. ```rust use ruststream::{OutgoingMessage, Publisher}; async fn emit(publisher: &P) -> Result<(), P::Error> { let msg = OutgoingMessage::new("orders.created", b"{}"); publisher.publish(msg).await } ``` -------------------------------- ### Example Handler Implementation Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/05-handlers-context.md A basic example of a handler function that processes a String message and returns HandlerResult::Ack. Note the async nature and return type. ```rust use ruststream::runtime::{Context, Handler, HandlerResult}; fn my_handler(msg: &String, ctx: &mut Context) -> impl std::future::Future + Send { async { HandlerResult::Ack } } ``` -------------------------------- ### Subscriber Consumption Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/01-core-traits.md Shows how to consume messages from a `Subscriber` stream using `StreamExt::next`. ```rust use ruststream::Subscriber; use futures::StreamExt; async fn consume(mut sub: S) -> Result<(), S::Error> { let mut stream = sub.stream(); while let Some(delivery) = stream.next().await { let msg = delivery?; // Process message, then ack/nack let _ = msg.ack().await; } Ok(()) } ``` -------------------------------- ### Name Constructor Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/06-subscriptions-capabilities.md Demonstrates creating a Name subscription source bound to a specific name. The new constructor takes a name that can be converted into Cow<'static, str>. ```rust use ruststream::{Broker, Subscribe, SubscriptionSource, Name}; async fn open(broker: &B) -> Result { Name::new("orders").subscribe(broker).await } ``` -------------------------------- ### Run the RustStream Service Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/quickstart.md Start the RustStream service using the provided CLI command. The service runs with an in-memory broker, requiring no external dependencies. ```bash ruststream run # or: cargo run -- run ``` -------------------------------- ### Startup Hook with Error Propagation Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Example of a startup hook that can fail. If the hook returns an error (e.g., from `open_database().await?`), the application startup is aborted. ```rust let app = RustStream::new(info) .on_startup(|state| async { // If this fails, startup aborts open_database().await?; Ok(state) }); ``` -------------------------------- ### State Usage Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/04-runtime.md Demonstrates how to insert and retrieve values from the State struct. Values must implement `Any + Send + Sync`. ```rust use ruststream::runtime::State; let mut state = State::default(); state.insert("database_url".to_string()); assert_eq!(state.get::(), Some(&"database_url".to_string())); ``` -------------------------------- ### Raw Handler Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/09-typed-handlers.md Example of a raw handler that processes incoming messages as raw bytes without any decoding. ```rust use ruststream::runtime::{Context, HandlerResult}; async fn raw_handler(msg: &impl IncomingMessage, ctx: &mut Context) -> HandlerResult { let payload = msg.payload(); println!("Raw bytes: {:?}", payload); HandlerResult::Ack } ``` -------------------------------- ### TracingLayer with Target Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Demonstrates how to configure `TracingLayer` to use a specific target for its logs, overriding the default. ```rust use ruststream::runtime::{RustStream, layers::TracingLayer}; let app = RustStream::new(info) .layer(TracingLayer::default().with_target("my_service")); ``` -------------------------------- ### RustStream Constructor and Basic Usage Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/04-runtime.md Demonstrates creating a new RustStream application, inserting state, and configuring a broker with handlers. This is a common starting point for building a RustStream service. ```rust use ruststream::runtime::{AppInfo, RustStream}; use ruststream::memory::MemoryBroker; async fn build_app() -> Result<(), Box> { let app = RustStream::new(AppInfo::new("orders", "0.1.0")) .insert_state("config_value".to_string()) .with_broker(MemoryBroker::new(), |b| { // Register handlers here }); app.run().await?; Ok(()) } ``` -------------------------------- ### Example Context Usage Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/05-handlers-context.md Shows how to access channel name, application state (like a database pool), named publishers, and modify message headers within a handler. ```rust use ruststream::runtime::Context; async fn handle_message(msg: &str, ctx: &mut Context) { let channel = ctx.name(); if let Some(pool) = ctx.get::>() { // Use database pool } if let Some(pub) = ctx.publisher("events") { // Publish using the named publisher } // Modify headers ctx.headers_mut().insert("processed-at", "2026-06-05"); } ``` -------------------------------- ### Identity Layer Usage Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Shows how `RustStream` uses the `Identity` layer by default when initialized. ```rust use ruststream::runtime::{Identity, RustStream}; let app = RustStream::new(info); // Uses Identity by default ``` -------------------------------- ### Example Usage of PublishMiddleware with HeaderMiddleware Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Demonstrates creating a HeaderMiddleware that adds a custom header to outgoing messages before proceeding. The next.run() call is essential to continue the pipeline. ```rust use ruststream::runtime::{PublishMiddleware, Outgoing, PublishNext}; struct HeaderMiddleware; impl PublishMiddleware for HeaderMiddleware { fn on_publish<'a>(&'a self, out: &'a mut Outgoing, next: PublishNext<'a>) { Box::pin(async move { out.headers_mut().insert("X-Custom", "value"); next.run().await }) } } ``` -------------------------------- ### IntoHandlerResult Example Implementations Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/05-handlers-context.md Illustrates how different return types (unit, Result, Result with HandlerResult) are converted to HandlerResult, showing automatic Ack/Nack behavior. ```rust use ruststream::runtime::IntoHandlerResult; // All of these return HandlerResult::Ack async fn handler1() {} // unit type async fn handler2() -> Result<(), &'static str> { Ok(()) } // This returns HandlerResult::Nack { requeue: false } async fn handler3() -> Result<(), &'static str> { Err("failed") } ``` -------------------------------- ### Stack Composition via RustStream::layer() Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Illustrates how `Stack` is implicitly built by chaining `.layer()` calls on `RustStream`, composing middleware. ```rust use ruststream::runtime::{RustStream, layers::TracingLayer}; // Stack chains automatically via RustStream::layer() let app = RustStream::new(info) .layer(TracingLayer::default()) .layer(OtherLayer); ``` -------------------------------- ### NATS Application Setup Source: https://github.com/powersemmi/ruststream/blob/main/docs/brokers/nats.md Integrate the NATS broker into your RustStream application using the #[ruststream::app] macro and the with_broker/include configuration. ```rust use ruststream::app; use ruststream_nats::NatsBroker; #[app] struct App; impl App { async fn app(self) -> ruststream::App { ruststream::App::new(self) .with_broker(NatsBroker::new()) .include(vec![handler::handler_fn()]) } } ``` -------------------------------- ### NATS Request-Reply Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/brokers/nats.md Implement request-reply functionality using NatsPublisher's request method, specifying a timeout for the reply. ```rust use std::time::Duration; use ruststream::{IncomingMessage, OutgoingMessage, RequestReply}; use ruststream_nats::NatsPublisher; #[ruststream::after_startup] async fn request(publisher: &NatsPublisher) { let request = OutgoingMessage::new("ping"); match publisher.request(request, Duration::from_secs(1)).await { Ok(reply) => println!("Received reply: {:?}", reply.payload), Err(e) => println!("Request failed: {:?}", e), } } ``` -------------------------------- ### JsonCodec Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/03-codecs.md Shows how to use the JsonCodec for encoding a struct into JSON bytes. This codec is enabled by default via the 'json' feature. ```rust use ruststream::codec::JsonCodec; use serde::{Serialize, Deserialize}; #[derive(Serialize, Deserialize)] struct Event { id: u32, data: String } let codec = JsonCodec; let event = Event { id: 1, data: "test".to_string() }; let bytes = codec.encode(&event)?; ``` -------------------------------- ### Initialize Default Logging Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/10-logging-metrics.md Use the convenience `init` function for a quick setup of the default colored console logger. It respects the `RUST_LOG` environment variable for filter configuration. ```rust ruststream::logging::init()?; tracing::warn!(retries = 3, "broker reconnecting"); ``` -------------------------------- ### Write a RustStream Service Source: https://github.com/powersemmi/ruststream/blob/main/README.md Example of a basic RustStream service. It defines an 'Order' struct and a handler function 'handle' for orders. The 'app' function sets up the service with a MemoryBroker and registers the handler. ```rust use ruststream::memory::MemoryBroker; use ruststream::runtime::{AppInfo, HandlerResult, RustStream}; use ruststream::subscriber; use schemars::JsonSchema; use serde::Deserialize; #[derive(Debug, Deserialize, JsonSchema)] struct Order { id: u64, } #[subscriber("orders")] async fn handle(order: &Order) -> HandlerResult { println!("got order {}", order.id); HandlerResult::Ack } #[ruststream::app] fn app() -> RustStream { RustStream::new(AppInfo::new("orders", "0.1.0")) .with_broker(MemoryBroker::new(), |b| b.include(handle)) } ``` -------------------------------- ### Example Usage of DynStack with LogMiddleware Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/07-middleware.md Demonstrates how to create a DynStack with a custom LogMiddleware for processing String inputs. Ensure DynMiddleware trait is implemented for custom middleware. ```rust use ruststream::runtime::{DynStack, DynMiddleware, Next, Context}; use std::sync::Arc; struct LogMiddleware; impl DynMiddleware for LogMiddleware { fn handle<'a>(&'a self, input: &'a String, ctx: &'a mut Context, next: Next<'a, String>) { Box::pin(async move { println!("Processing: {}", input); next.run(input, ctx).await }) } } let middleware: Vec>> = vec![ Arc::new(LogMiddleware), ]; let stack = DynStack::new(middleware); ``` -------------------------------- ### Subscriber by Name Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/subscribers.md A basic example of subscribing to a topic by its name using the `#[subscriber]` macro. This is the most common way to define a subscriber. ```rust use ruststream::subscriber; #[subscriber("orders")] async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack } ``` -------------------------------- ### Broker-Specific Descriptor Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/subscribers.md Shows how to use broker-specific subscription options by providing a descriptor type directly in the `#[subscriber]` macro. This allows for advanced configurations like consumer groups or durable names. ```rust use ruststream::subscriber; // Assuming OrdersStream is a stand-in for a broker crate's SubscriptionSource type // and 'orders' and 'workers' are arguments to its constructor. #[subscriber(OrdersStream::new("orders", "workers"))] async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack } ``` -------------------------------- ### Test Handler Logic with MemoryBroker Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/08-memory-broker.md An example demonstrating how to use MemoryBroker for testing handler logic in isolation. It sets up a RustStream application, subscribes to a topic, and publishes a test message. ```rust #[tokio::test] async fn test_handler() -> Result<(), Box> { use ruststream::memory::MemoryBroker; use ruststream::runtime::{AppInfo, RustStream, HandlerResult}; use serde::Deserialize; #[derive(Deserialize)] struct Order { id: u64 } let broker = MemoryBroker::new(); let app = RustStream::new(AppInfo::new("test", "0.1.0")) .with_broker(broker.clone(), |b| { let sub = b.broker().subscribe("orders")?; b.handle( sub, |msg: &Order| async { HandlerResult::Ack }, // ... metadata ); }); // Publish a test message let pub = broker.publisher(); pub.publish(OutgoingMessage::new("orders", b"{\"id\":1}")).await?; Ok(()) } ``` -------------------------------- ### Delayed Redelivery Example Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/subscribers.md Demonstrates using `retry_after` to implement delayed redelivery for messages. This is useful when a handler depends on external factors that might not be immediately available. ```rust async fn handle(message: IncomingMessage) -> HandlerResult { if !dependency_is_ready().await { return HandlerResult::retry_after(Duration::from_secs(5)); } // ... process message HandlerResult::Ack } ``` -------------------------------- ### Wiring NATS Broker into RustStream App Source: https://github.com/powersemmi/ruststream/blob/main/docs/broker-authors/example-nats.md Demonstrates how to integrate the NATS broker into a RustStream application. This setup includes defining application information and configuring the broker with publishing capabilities. ```rust use ruststream::runtime::{AppInfo, RustStream, TypedPublisher}; let app = RustStream::new(AppInfo::new("orders", "0.1.0")) .with_broker(NatsBroker::new("nats://localhost:4222"), |b| { let replies = TypedPublisher::new(b.broker().publisher()); b.include_publishing(confirm, replies); }); ``` -------------------------------- ### Running Integration Tests Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/testing.md Instructions on how to run integration tests against a live NATS server. This involves starting a NATS server with JetStream enabled and then executing the integration tests with the NATS_TEST_URL environment variable set. ```bash docker run -d -p 4222:4222 nats:latest -js NATS_TEST_URL=nats://127.0.0.1:4222 cargo test --test integration_nats ``` -------------------------------- ### Create and Configure AppInfo Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/04-runtime.md Demonstrates how to create a new AppInfo instance with a title and version, and subsequently add a description. ```rust use ruststream::runtime::AppInfo; let info = AppInfo::new("orders", "0.1.0") .with_description("Order processing service"); ``` -------------------------------- ### Insert State During Startup Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Illustrates how to set up and insert dependencies, such as a client, into the application state during the startup phase, making them available to all handlers. ```rust app.on_startup(|mut state| async { let client = setup_client().await?; state.insert(client); Ok(state) }) ``` -------------------------------- ### Typed Handler Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/09-typed-handlers.md Example demonstrating how to use the `typed` function with `JsonCodec` and a closure to handle decoded `Order` structs. ```rust use ruststream::codec::JsonCodec; use ruststream::runtime::typed; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] struct Order { id: u32 } let codec = JsonCodec; let handler = |order: &Order| async { println!("Order: {}", order.id); }; let decoder = typed(codec, handler); // decoder is now a Handler ``` -------------------------------- ### Build RustStream API Reference Locally Source: https://github.com/powersemmi/ruststream/blob/main/docs/reference.md Build the RustStream API reference documentation locally with all features enabled and open it in a web browser. ```bash cargo doc --all-features --open ``` -------------------------------- ### Defining Lifecycle Hooks for Resource Management Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/lifespan.md Implement `on_startup` to initialize asynchronous resources like database connections and `on_shutdown` for cleanup. The `Database` struct demonstrates this pattern. ```rust --8<-- "examples/lifespan.rs:hooks" ``` -------------------------------- ### Initializing the Logger Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Demonstrates the basic usage of initializing the logging system and handling potential initialization errors. ```rust use ruststream::logging; match logging::init() { Ok(()) => tracing::info!("logger initialized"), Err(e) => eprintln!("Failed to initialize logger: {}", e), } ``` -------------------------------- ### Building a Router Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/routing.md Construct a router by chaining registration methods like `handle`, `subscribe`, and `with_codec`. Each call returns a new router instance. ```rust use ruststream::runtime::Router; // Placeholder for router builder code // This is a conceptual example, actual code would define handlers and registrations. ``` -------------------------------- ### Create and Populate Headers Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/02-message-types.md Demonstrates creating a new Headers map and inserting key-value pairs. It also shows how to access specific headers like 'content-type' and verify the map's size. ```rust use ruststream::Headers; let mut h = Headers::new(); h.insert("Content-Type", "application/json"); h.insert("X-Tenant-Id", "acme"); assert_eq!(h.content_type(), Some("application/json")); assert_eq!(h.get("x-tenant-id"), Some(b"acme".as_slice())); assert_eq!(h.len(), 2); ``` -------------------------------- ### DefaultCodec Usage with Features Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/03-codecs.md Illustrates how the DefaultCodec automatically selects JsonCodec when the 'json' feature is enabled, as shown in this subscriber example. ```rust #[subscriber("orders")] async fn handle(order: &Order) { // Automatically uses JsonCodec } ``` -------------------------------- ### Typed Handler with Decode Failure Override Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/09-typed-handlers.md Example showing how to chain the `on_decode_failure` method onto a handler created by `typed`. ```rust use ruststream::runtime::{typed, DecodeFailure}; let typed_handler = typed(codec, handler) .on_decode_failure(DecodeFailure::Requeue); ``` -------------------------------- ### Run RustStream Service Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Starts the RustStream service and runs it until SIGINT or SIGTERM signals are received, ensuring a graceful shutdown. ```rust let app = RustStream::new(info) .with_broker(broker, |b| { /* handlers */ }); app.run().await?; ``` -------------------------------- ### Connect to PostgreSQL using Environment Variable Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Shows how to securely connect to a PostgreSQL database by reading the connection URL from the DATABASE_URL environment variable. ```rust let db_url = std::env::var("DATABASE_URL")?; let pool = PgPool::connect(&db_url).await?; ``` -------------------------------- ### Global Middleware for Header Enrichment Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/context.md Example of a globally mounted middleware layer that ensures a `x-request-id` header is present in the context for all handlers. ```rust app.layer(middleware::SetHeader::new("x-request-id", "123")); ``` -------------------------------- ### Add RustStream to Dependencies Source: https://github.com/powersemmi/ruststream/blob/main/README.md Add RustStream to your project's Cargo.toml file. This example includes the 'macros', 'memory', and 'json' features. ```toml [dependencies] ruststream = { version = "0.3", features = ["macros", "memory", "json"] } serde = { version = "1", features = ["derive"] } schemars = "1" ``` -------------------------------- ### Handling Message Acknowledgement Errors Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Example of processing a message and handling potential errors during the acknowledgement phase using `msg.ack().await`. ```rust use ruststream::IncomingMessage; async fn handle_delivery(msg: M) -> Result<(), Box> { // Process the message process(&msg)?; // Acknowledge match msg.ack().await { Ok(()) => println!("Message acknowledged"), Err(e) => eprintln!("Failed to ack: {}", e), } Ok(()) } ``` -------------------------------- ### Record Service Servers Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/asyncapi.md Build a `ServerSpec` to include server information in the AsyncAPI document's `servers` section. Broker crates might also provide a `DescribeServer` capability. ```rust --8<-- "examples/asyncapi_http.rs:server" ``` -------------------------------- ### Subscriber Macro Usage with Typed Handler Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/09-typed-handlers.md Example of how the `#[subscriber]` macro implicitly uses typed handlers when a concrete message type is provided. ```rust use ruststream::subscriber; use serde::Deserialize; #[derive(Deserialize)] struct Order { id: u32 } #[subscriber("orders")] async fn handle(order: &Order) { // Handler receives &Order (decoded from payload) println!("Processing order {}", order.id); } ``` -------------------------------- ### Cargo.toml Feature Configuration Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/03-codecs.md Provides examples of how to configure RustStream codecs in Cargo.toml by enabling specific features like 'json', 'msgpack', or 'cbor'. ```toml # Default (JSON only) ruststream = "0.2" # JSON + MessagePack ruststream = { version = "0.2", features = ["json", "msgpack"] } # CBOR only (drop JSON default) ruststream = { version = "0.2", default-features = false, features = ["cbor"] } ``` -------------------------------- ### Example HandlerResult Usage Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/05-handlers-context.md Demonstrates how to use HandlerResult within an async function to acknowledge successful work or negatively acknowledge and requeue on error. ```rust use ruststream::runtime::HandlerResult; async fn process() -> HandlerResult { match do_work().await { Ok(_) => HandlerResult::Ack, Err(_) => HandlerResult::retry(), // Will be requeued } } ``` -------------------------------- ### init Function Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/10-logging-metrics.md Convenience function to initialize the default colored console logger. ```APIDOC ### init Function Convenience function that installs the default colored console logger with filter `info` (overridable via `RUST_LOG`), terminal-detected colors, and targets shown. ### Signature `pub fn init() -> Result<(), LoggingInitError>` ### Returns `Result<(), LoggingInitError>` ### Example ```rust ruststream::logging::init()?; tracing::warn!(retries = 3, "broker reconnecting"); ``` ``` -------------------------------- ### JetStream Durable Consumer with Decorator Source: https://github.com/powersemmi/ruststream/blob/main/docs/brokers/nats.md Alternatively, configure JetStream durable consumers directly within the #[subscriber(..)] decorator for a more concise setup. ```rust use ruststream::handler; #[handler] #[subscriber(stream = "my-stream", durable = "my-consumer")] async fn handler(msg: ruststream::IncomingMessage) -> ruststream::HandlerResult { println!("Received message: {:?}", msg.payload); ruststream::HandlerResult::Ack } ``` -------------------------------- ### Run project checks and tests Source: https://github.com/powersemmi/ruststream/blob/main/README.md Use these commands to ensure code quality and run the test suite. 'just check' performs formatting and clippy checks, while 'just test' executes the test suite. ```bash just check # fmt, clippy, and feature checks ``` ```bash just test # the test suite ``` -------------------------------- ### Codec Trait Usage Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/03-codecs.md Demonstrates how to use a Codec implementation (JsonCodec) to encode a Rust struct into bytes and then decode those bytes back into the struct. ```rust use ruststream::codec::Codec; let codec = JsonCodec; #[derive(serde::Serialize, serde::Deserialize)] struct Order { id: u32 } let bytes = codec.encode(&Order { id: 1 })?; let back: Order = codec.decode(&bytes)?; ``` -------------------------------- ### Enable CLI Feature Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/README.md To enable the `ruststream` binary with clap and anyhow, add `cli` to your Cargo.toml features. ```toml [dependencies.ruststream] features = ["cli"] ``` -------------------------------- ### Run Routing Suite with TestClient Source: https://github.com/powersemmi/ruststream/blob/main/docs/broker-authors/conformance.md Use harness::run_suite to check the routing surface against your in-process TestClient. This requires enabling your crate's 'testing' feature. ```rust use ruststream::conformance::harness; --8<-- "tests/conformance_self.rs:run_suite" ``` -------------------------------- ### Configure Lifespan Hooks for RustStream Service Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Sets up startup and shutdown hooks to execute logic at different stages of the service lifecycle. The `on_startup` hook receives and returns mutable state, while others receive shared state. ```rust let app = RustStream::new(info) .on_startup(|mut state| async { let pool = PgPool::connect("postgres://...").await?; state.insert(pool); Ok(state) }) .after_startup(|state| async { println!("Service started, all brokers connected"); Ok(()) }) .on_shutdown(|state| async { println!("Shutdown initiated"); Ok(()) }); ``` -------------------------------- ### Ruststream Dependency Configuration Source: https://github.com/powersemmi/ruststream/blob/main/docs/broker-authors/index.md Example TOML snippet for adding ruststream as a dependency with default features turned off. This is typically used when building an independent broker crate. ```toml [dependencies] ruststream = { version = "0.3", default-features = false } ``` -------------------------------- ### Handler Result for Message Processing Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Illustrates how a handler returns a HandlerResult to indicate success (Ack) or failure (Nack). This example shows a Nack without requeue. ```rust async fn handler(msg: &Message) -> HandlerResult { match process(msg).await { Ok(_) => HandlerResult::Ack, Err(_) => HandlerResult::drop(), // Nack without requeue } } ``` -------------------------------- ### Generate AsyncAPI Documentation Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/quickstart.md Generate AsyncAPI documentation for the service. The output can be printed to stdout or saved to a file in JSON or YAML format. ```bash ruststream asyncapi gen # prints JSON to stdout ``` ```bash ruststream asyncapi gen -o asyncapi.json ``` ```bash ruststream asyncapi gen --yaml ``` -------------------------------- ### Configure Application Metadata with RustStream Builder Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Sets the service title, version, and an optional description using AppInfo. ```rust use ruststream::runtime::{AppInfo, RustStream}; let app = RustStream::new( AppInfo::new("orders", "0.2.0") .with_description("Order processing and fulfillment") ); ``` -------------------------------- ### Configuring Decode Failure Handling Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Example of setting the decode failure behavior for a typed handler. This controls whether a failed decode results in dropping or requeuing the message. ```rust use ruststream::runtime::{DecodeFailure, typed}; let handler = typed(codec, inner) .on_decode_failure(DecodeFailure::Requeue); // If decode fails: nack with requeue=true ``` -------------------------------- ### Run Request-Reply Capability Suite Source: https://github.com/powersemmi/ruststream/blob/main/docs/broker-authors/conformance.md Use capabilities::request_reply to verify a broker's Request-Reply implementation. This performs a real connect and requires gating behind an environment variable for live servers. ```rust use ruststream::conformance::capabilities; use ruststream_nats::{NatsBroker, SubscribeOptions}; --8<-- "tests/doc_conformance_nats.rs:request_reply" ``` -------------------------------- ### Create New Rust Project Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/tutorial.md Use cargo to create a new Rust project and navigate into the project directory. ```bash cargo new orders-service cd orders-service ``` -------------------------------- ### NATS JetStream Subscription Decorator Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/subscribers.md An example of configuring a NATS JetStream subscription using a builder chain within the `#[subscriber]` decorator. This allows for fluent configuration of subscription options. ```rust use ruststream::subscriber; use ruststream_nats::SubscribeOptions; #[subscriber(SubscribeOptions::new("orders").durable("workers").ack_policy(AckPolicy::Explicit))] async fn handle(order: &Order) -> HandlerResult { HandlerResult::Ack } ``` -------------------------------- ### Implement Confirmation Handler and Publisher Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/tutorial.md Implement a confirmation handler that returns a reply value and uses `publish` to name the destination. Mount it with a publisher carrying the reply codec. ```rust use ruststream::runtime::TypedPublisher; // inside with_broker(...), with `confirm` imported from the orders module let replies = TypedPublisher::new(b.broker().publisher()); b.include_publishing(confirm, replies); ``` -------------------------------- ### Export Metrics Data Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/metrics.md Call the `export()` method on the `Metrics` instance to get the current metric values in Prometheus exposition format. This is typically used to serve metrics over HTTP. ```rust let body = metrics.export()?; ``` -------------------------------- ### Apply Middleware Layers Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/configuration.md Demonstrates the correct ordering of middleware layers, applying TracingLayer before MetricsLayer to ensure outer-to-inner processing. ```rust app.layer(TracingLayer::default()) .layer(MetricsLayer::default()) ``` -------------------------------- ### Run Lifecycle Check with Real Broker Source: https://github.com/powersemmi/ruststream/blob/main/docs/broker-authors/conformance.md Use harness::lifecycle to check the lazy-startup contract end to end against a connected broker. This involves factories for broker creation, subscription, and publishing. ```rust use ruststream::conformance::harness; use ruststream_nats::{NatsBroker, SubscribeOptions}; --8<-- "tests/doc_conformance_nats.rs:lifecycle" ``` -------------------------------- ### Create and Use RawMessage Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/api-reference/02-message-types.md Demonstrates how to construct a new RawMessage with a name and payload, and access its properties. Assumes `Bytes` and `Headers` are available. ```rust use ruststream::RawMessage; let msg = RawMessage::new("orders", b"{}"); assert_eq!(msg.name(), "orders"); assert_eq!(msg.payload(), b"{}"); ``` -------------------------------- ### Testing Handlers with NATS Semantics Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/testing.md Testing a NATS service against the NATS-flavoured test broker to ensure correct subject semantics. This example uses a wildcard subscriber to audit order events. ```rust use ruststream::testing::{TestClient, TestService}; use ruststream_nats::{NatsBroker, Order}; use std::time::Duration; #[tokio::test] async fn test_order_handler_nats() { let mut service = TestService::new(); service.after_startup(handle_order); let client = service.start(NatsBroker::new("nats://localhost:4222")).await; let order = Order { id: "123".to_string() }; let published = client .publish("orders.*", order) .await .expect("publish failed") .expect_published("orders.*", 1, Duration::from_secs(1)) .await; assert_eq!(published.len(), 1); assert_eq!(published[0].payload, "processed"); service.run_until(client.shutdown()).await; } ``` -------------------------------- ### Run the Application Source: https://github.com/powersemmi/ruststream/blob/main/docs/getting-started/tutorial.md Execute the Rust application using cargo run with the 'run' argument. ```bash cargo run -- run ``` -------------------------------- ### Message Trait Implementation Example Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/types.md Demonstrates how to implement the `Message` trait for a struct, either manually or using the derive macro. This defines the message's name and description for AsyncAPI documentation. ```rust use ruststream::Message; #[derive(Message)] /// An order placed by a customer. struct Order { id: u32, } // Equivalent to: impl Message for Order { const NAME: &'static str = "Order"; const DESCRIPTION: Option<&'static str> = Some("An order placed by a customer."); } ``` -------------------------------- ### Inserting Application State at Build Time Source: https://github.com/powersemmi/ruststream/blob/main/docs/guides/context.md Demonstrates how to insert application-level state using `RustStream::insert_state` for values that do not require asynchronous initialization. ```rust let mut app = RustStream::new(); app.insert_state(Config::new()); ``` -------------------------------- ### Handling RustStream Service Errors Source: https://github.com/powersemmi/ruststream/blob/main/_autodocs/errors.md Example of running a RustStream application and handling potential lifecycle errors returned by `app.run().await`. Exits the process with status code 1 on error. ```rust use ruststream::runtime::RustStream; async fn run_app() { let app = RustStream::new(info) .with_broker(broker, |b| { /* ... */ }); match app.run().await { Ok(()) => println!("Service stopped gracefully"), Err(e) => { eprintln!("Service error: {}", e); std::process::exit(1); } } } ``` -------------------------------- ### Run and Manage RustStream Services Source: https://github.com/powersemmi/ruststream/blob/main/README.md Common commands for running a RustStream service, generating its AsyncAPI document, and scaffolding new projects. ```bash ruststream run # start the service (or: cargo run -- run) ruststream asyncapi gen # print the AsyncAPI document ruststream new my-service # scaffold a new project ```