### Spring Cloud Stream Application Startup Output Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/preface.adoc Example console output when a Spring Cloud Stream application starts, showing binder provisioning, connection attempts, and application startup confirmation. ```log --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672] --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . . . . . --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg . . . --- [ main] c.e.l.LoggingConsumerApplication : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897) ``` -------------------------------- ### Start Kafka Cluster and Control Center UI Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Starts both the Kafka cluster (using kafka-cluster.yml) and the Confluent Control Center UI (using control-center-ui.yml). The UI will be accessible at http://localhost:9021. ```shell docker-compose -f ./kafka-cluster.yml -f ./control-center-ui.yml up ``` -------------------------------- ### HTTP POST Request Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/producing-and-consuming-messages.adoc This is an example of an HTTP POST request to send a plain text message to a local endpoint, typically used to trigger the `delegateToSupplier` method. ```shell curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" http://localhost:8080/ ``` -------------------------------- ### Start Kafka Cluster, Control Center UI, and Schema Registry Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Starts all components: Kafka cluster, Control Center UI, and Schema Registry. This command chains multiple docker-compose files together. ```shell docker-compose -f kafka-cluster.yml -f ./control-center-ui.yml -f ./schema-registry.yml up ``` -------------------------------- ### Start Binding Actuator Endpoint Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Spring-Cloud-Stream-2.0.0-Release-Notes Use this curl command to start a specific binding via the actuator endpoint. Replace 'inOne' with the actual channel name. ```bash curl -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/start/inOne ``` -------------------------------- ### Binder Definition Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-binder-api.adoc Specifies a binder implementation in the `META-INF/spring.binders` file. This example shows how to register the Kafka binder. ```properties kafka:\ org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration ``` -------------------------------- ### Start Kafka Cluster and Schema Registry Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Starts the Kafka cluster (using kafka-cluster.yml) and the Confluent Schema Registry (using schema-registry.yml). The Schema Registry will be available at http://localhost:8081. ```shell docker-compose -f ./kafka-cluster.yml -f ./schema-registry.yml up ``` -------------------------------- ### Binder Definition Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-binder-api.adoc An example of a `META-INF/spring.binders` file entry, which registers a binder implementation with the framework. This file is placed on the classpath and maps a binder name (e.g., 'kafka') to its configuration class. ```APIDOC ## Binder Definition Example ### Description Example of a `META-INF/spring.binders` file entry for registering a binder. ### File `META-INF/spring.binders` ### Content Example ``` kafka:\ org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration ``` ### Explanation * `kafka`: The name used to refer to this binder. * `org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration`: The fully qualified class name of the binder's configuration class. ``` -------------------------------- ### Example Kafka Streams Processor Beans Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/manually-starting-processors-selectively.adoc Defines three example Kafka Streams processor beans of different types (Function, Consumer, BiFunction). These can be used to demonstrate selective auto-startup control. ```java @Bean public Function, KStream> process1() { } @Bean public Consumer> process2() { } @Bean public BiFunction, KTable, KStream> process3() { } ``` -------------------------------- ### FileMessageBinderProvisioner Implementation Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-custom-binder-impl.adoc Example implementation of ProvisioningProvider that trims destination names. This is responsible for provisioning consumer and producer destinations. ```java public class FileMessageBinderProvisioner implements ProvisioningProvider { @Override public ProducerDestination provisionProducerDestination( final String name, final ProducerProperties properties) { return new FileMessageDestination(name); } @Override public ConsumerDestination provisionConsumerDestination( final String name, final String group, final ConsumerProperties properties) { return new FileMessageDestination(name); } private class FileMessageDestination implements ProducerDestination, ConsumerDestination { private final String destination; private FileMessageDestination(final String destination) { this.destination = destination; } @Override public String getName() { return destination.trim(); } @Override public String getNameForPartition(int partition) { throw new UnsupportedOperationException("Partitioning is not implemented for file messaging."); } } } ``` -------------------------------- ### Manually Start a Processor Binding Programmatically Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/manually-starting-processors-selectively.adoc Programmatically control the state of a Kafka Streams processor binding using the `BindingsEndpoint` API. This example starts the 'process3-in-0' binding when the application starts. ```java @Autowired BindingsEndpoint endpoint; @Bean public ApplicationRunner runner() { return args -> { endpoint.changeState("process3-in-0", State.STARTED); }; } ``` -------------------------------- ### Build and Test Project (Maven Wrapper) Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/README.adoc Compile and run tests for the project using the Maven wrapper. Ensure JDK 17 is installed. ```bash ./mvnw install ``` -------------------------------- ### Start a Kafka Streams Binding Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/binding-visualization-and-control-in-binder.adoc Send a POST request to the '/actuator/bindings/{bindingName}' endpoint with a JSON payload specifying the desired state 'STARTED' to resume message processing for a specific binding. ```bash curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0 ``` -------------------------------- ### Example Application YAML for Spring Cloud Stream Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/configuration-options.adoc Demonstrates how to configure application name and function definitions, along with binder-specific environment properties. ```yaml my: prop: foo spring: application: name: "messaging-test" cloud: function: definition: "produce;consume" stream: binders: rabbit: environment: my: prop: bar ``` -------------------------------- ### Start Kafka Cluster with Zookeeper Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Use this command to start a multi-node Kafka cluster with 3 brokers using Zookeeper for leader election. The brokers will be available at localhost:9091, localhost:9092, and localhost:9093. ```shell docker-compose -f ./kafka-cluster.yml up ``` -------------------------------- ### Manually Start a Processor Binding via REST Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/manually-starting-processors-selectively.adoc Control the state of a Kafka Streams processor binding using the Spring Boot Actuator's bindings endpoint. This example shows how to start a binding named 'process3-in-0' using curl. ```bash curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0 ``` -------------------------------- ### Kafka Processor Function Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka_tips.adoc A basic Kafka processor function that throws an exception for every message. This serves as a starting point for demonstrating DLQ functionality. ```java @Bean public Consumer processData() { return s -> { throw new RuntimeException(); }; } ``` -------------------------------- ### FileMessageHandler Implementation Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-custom-binder-impl.adoc Example implementation of MessageHandler, which provides the logic required to produce an event. ```java public class FileMessageHandler implements MessageHandler{ @Override ``` -------------------------------- ### Example of Linear Git Log Output Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Contributor-Guidelines This is an example of the expected output from `git log --graph --pretty=oneline` when a linear history is maintained. ```git * c129a02e6c752b49bacd4a445092a44f66c2a1e9 GH-2721 Increase Timers on JDBC Delayer Tests * 14e556ce23d49229c420632cef608630b1d82e7d GH-2620 Fix Debug Log * 6140aa7b2cfb6ae309c55a157e94b44e5d0bea4f GH-3037 Fix JDBC MS Discard After Completion * 077f2b24ea871a3937c513e08241d1c6cb9c9179 Update Spring Social Twitter to 1.0.5 * 6d4f2b46d859c903881a561c35aa28df68f8faf3 GH-3053 Allow task-executor on * 56f9581b85a8a40bbcf2461ffc0753212669a68d Update Spring Social Twitter version to 1.0.4 ``` -------------------------------- ### Function (Processor) Example Source: https://context7.com/spring-cloud/spring-cloud-stream/llms.txt Defines a message handler that consumes and produces messages. The framework auto-creates input and output bindings. ```java @SpringBootApplication public class MyFunctionBootApp { public static void main(String[] args) { SpringApplication.run(MyFunctionBootApp.class); } // Input binding: toUpperCase-in-0 // Output binding: toUpperCase-out-0 @Bean public Function toUpperCase() { return s -> s.toUpperCase(); } } ``` ```properties spring.cloud.function.definition=toUpperCase spring.cloud.stream.bindings.toUpperCase-in-0.destination=input-topic spring.cloud.stream.bindings.toUpperCase-out-0.destination=output-topic spring.cloud.stream.bindings.toUpperCase-in-0.group=myGroup ``` -------------------------------- ### Example Kafka Streams Application Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/binding-visualization-and-control-in-binder.adoc A sample Spring Boot application demonstrating Kafka Streams consumers and functions, which will have automatically named bindings. ```java @SpringBootApplication public class KafkaStreamsApplication { public static void main(String[] args) { SpringApplication.run(KafkaStreamsApplication.class, args); } @Bean public Consumer> consumer() { return s -> s.foreach((key, value) -> System.out.println(value)); } @Bean public Function, KStream> function() { return ks -> ks; } } ``` -------------------------------- ### List Bindings Actuator Endpoint Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Spring-Cloud-Stream-2.0.0-Release-Notes Access this URL via GET request to retrieve a list of existing bindings. ```bash http://localhost:8080/actuator/bindings/ ``` -------------------------------- ### Example Kafka Streams Processor Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/partition-support-on-the-outbound.adoc This is a standard Kafka Streams processor function. It's shown here for context before the partitioning configuration. ```java @Bean public Function, KStream> process() { ... } ``` -------------------------------- ### Enabling Schema Registry Client Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/schema-registry/spring-cloud-stream-schema-registry.adoc Example of how to enable the Schema Registry Client in a Spring Boot application using the `@EnableSchemaRegistryClient` annotation. ```APIDOC ## Enabling Schema Registry Client ### Description To enable the Schema Registry Client in your Spring Cloud Stream application, you can use the `@EnableSchemaRegistryClient` annotation on your main application class. ### Usage ```java @SpringBootApplication @EnableSchemaRegistryClient public class ConsumerApplication { } ``` ### Notes The default implementation (`DefaultSchemaRegistryClient`) does not cache responses. If you need caching, you can enable it via properties or by using the client directly and overriding its behavior. ``` -------------------------------- ### Supplier (Source) Example - Imperative and Reactive Source: https://context7.com/spring-cloud/spring-cloud-stream/llms.txt Produces messages without consuming input. The framework polls imperative Suppliers or subscribes to reactive Suppliers. ```java @SpringBootApplication public class SourceApplication { // Imperative: polled every second → sends "Hello from Supplier" to output @Bean public Supplier stringSupplier() { return () -> "Hello from Supplier"; } // Reactive: triggered once, emits a continuous stream @Bean public Supplier> reactiveSupplier() { return () -> Flux.fromStream( Stream.generate(() -> "Hello from reactive supplier") ).delayElements(Duration.ofSeconds(1)).share(); } // Pollable reactive: finite stream re-triggered by the framework @Bean @PollableBean public Supplier> pollableReactiveSupplier() { return () -> Flux.just("batch-item-1", "batch-item-2"); } } ``` ```properties # Customize polling interval (default 1000ms) spring.integration.poller.fixed-delay=2000 # Per-binding polling config spring.cloud.stream.bindings.stringSupplier-out-0.producer.poller.fixed-delay=5000 ``` -------------------------------- ### Testing PollableMessageSource Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/spring_integration_test_binder.adoc This example shows how to test applications using PollableMessageSource. It configures a test binder with a pollable source and retrieves messages from an OutputDestination to perform assertions. ```java @Test public void samplePollingTest() { ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class) .web(WebApplicationType.NONE) .run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination"); OutputDestination destination = context.getBean(OutputDestination.class); System.out.println("Message 1: " + new String(destination.receive().getPayload())); System.out.println("Message 2: " + new String(destination.receive().getPayload())); System.out.println("Message 3: " + new String(destination.receive().getPayload())); } @EnableTestBinder @EnableAutoConfiguration public static class SamplePolledConfiguration { @Bean public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) { return args -> { taskScheduler.execute(() -> { for (int i = 0; i < 3; i++) { try { if (!polledMessageSource.poll(m -> { String newPayload = ((String) m.getPayload()).toUpperCase(); output.send("myOutput", newPayload); })) { Thread.sleep(2000); } } catch (Exception e) { // handle failure } } }); }; } } ``` -------------------------------- ### Java Example: Receiving Batched Messages with Headers Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/rabbit/rabbit_overview/receiving-batch.adoc Demonstrates consuming messages as a `Message>` to access consolidated headers for each message in the batch. The headers are available under `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`. ```java @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean Consumer>> input() { return msg -> { List things = msg.getPayload(); System.out.println("Received " + things.size()); @SuppressWarnings("unchecked") List> headers = (List>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS); for (int i = 0; i < things.size(); i++) { System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader")); // ... } }; } @Bean public ApplicationRunner runner(RabbitTemplate template) { return args -> { template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> { msg.getMessageProperties().setHeader("myHeader", "headerValue1"); return msg; }); template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> { msg.getMessageProperties().setHeader("myHeader", "headerValue2"); return msg; }); }; } public static class Thing { private String field; public Thing() { } public Thing(String field) { this.field = field; } public String getfield() { return this.field; } public void setfield(String field) { this.field = field; } @Override public String toString() { return "Thing [field=" + this.field + "]"; } } } ``` -------------------------------- ### Add Spring Cloud Stream Dependency (Maven) Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-custom-binder-impl.adoc Include the spring-cloud-stream dependency in your Maven project to get started. ```xml org.springframework.cloud spring-cloud-stream ${spring.cloud.stream.version} ``` -------------------------------- ### Function with Consumer Observability Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-reactive-binder/reactive_observability.adoc Example of a Spring Cloud Stream Function that enables observability for incoming Kafka records. It starts a new observation for each received record and propagates context. ```java @Bean Function>, Flux>> receive(ObservationRegistry observationRegistry) { return s -> s.flatMap(record -> { Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start( null, KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE, () -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"), observationRegistry ); return Mono.deferContextual(contextView -> Mono.just(record) .map(rec -> new String(rec.value()).toLowerCase()) .map(rec -> MessageBuilder.withPayload(rec) .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView) .build())) .doOnTerminate(receiverObservation::stop) .doOnError(receiverObservation::error) .contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation)); }); } ``` -------------------------------- ### Include Spring Boot Starter Web Dependency Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Spring-Cloud-Stream-2.0.0-Release-Notes Add this dependency to use Spring Boot Starter Web. ```xml org.springframework.boot spring-boot-starter-web ``` -------------------------------- ### DLQ Error Message Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-error-handling.adoc An example of the additional information included in an error message when DLQ is configured, relevant to the original error. ```text . . . . x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15], headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1, deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e, amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}] at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) at. . . . . Payload: blah ``` -------------------------------- ### Custom AvroSchemaMessageConverter Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/schema-registry/spring-cloud-stream-schema-registry.adoc Provides an example of how to define a custom `AvroSchemaMessageConverter` bean. If you provide a custom converter, the default one is not created. ```java import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.cloud.stream.schema.avro.AvroSchemaMessageConverter; @Configuration public class CustomAvroConverter { @Bean public AvroSchemaMessageConverter avroSchemaMessageConverter() { // Custom configuration for AvroSchemaMessageConverter return new AvroSchemaMessageConverter(); } } ``` -------------------------------- ### Java Example: Receiving Batched Payloads Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/rabbit/rabbit_overview/receiving-batch.adoc Demonstrates a Spring Boot application that consumes messages as a List of payloads. Ensure the consumer binding is configured for batch mode. ```java @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean Consumer> input() { return list -> { System.out.println("Received " + list.size()); list.forEach(thing -> { System.out.println(thing); // ... }); }; } @Bean public ApplicationRunner runner(RabbitTemplate template) { return args -> { template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}"); template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}"); }; } public static class Thing { private String field; public Thing() { } public Thing(String field) { this.field = field; } public String getField() { return this.field; } public void setField(String field) { this.field = field; } @Override public String toString() { return "Thing [field=" + this.field + "]"; } } } ``` -------------------------------- ### Batch Listener with Kafka Headers Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-binder/consume-batches.adoc Example demonstrating how to access headers when using `Message>` for batch consumption. Headers are provided as a `Map` where values are `Collection`s, requiring manual correlation with the payload. ```java Message> message List payload = message.getPayload(); MessageHeaders headers = message.getHeaders(); // headers.get("kafka_batchConvertedHeaders") contains a list of maps // each map contains the headers for the corresponding record in the payload list. ``` -------------------------------- ### Basic Kafka Binder Configuration Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/multi-binders-with-based-binders-and-regular-binder.adoc This configuration sets up a Kafka binder named 'kafka3' for a processor, specifying its destination topics. ```properties spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3 spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3 ``` -------------------------------- ### Custom KafkaBinderHealth Implementation Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-binder/custom-health-ind.adoc Implement the KafkaBinderHealth interface to create a custom health check for Kafka brokers and topics. This example checks broker connectivity and topic connectivity. ```java public class KafkaBinderHealthImplementation implements KafkaBinderHealth { @Value("${spring.cloud.bus.destination}") private String topic; private final AdminClient client; public KafkaBinderHealthImplementation(final KafkaAdmin admin) { // More about configuring Kafka // https://docs.spring.io/spring-kafka/reference/html/#configuring-topics this.client = AdminClient.create(admin.getConfigurationProperties()); } @Override public Health health() { if (!checkBrokersConnection()) { logger.error("Error when connect brokers"); return Health.down().withDetail("BrokersConnectionError", "Error message").build(); } if (!checkTopicConnection()) { logger.error("Error when trying to connect with specific topic"); return Health.down().withDetail("TopicError", "Error message with topic name").build(); } return Health.up().build(); } public boolean checkBrokersConnection() { // Your implementation } public boolean checkTopicConnection() { // Your implementation } } ``` -------------------------------- ### Configure Consumer for Partitioning (YAML) Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/rabbit/rabbit_partitions.adoc This YAML configuration sets up the consumer binding named `listen-in-0` to receive messages from the partitioned destination. It specifies the `group`, enables `partitioned` consumption, and sets the `instance-index` to 0, indicating it will consume from the first partition. ```yaml spring: cloud: stream: bindings: listen-in-0: destination: partitioned.destination group: myGroup consumer: partitioned: true instance-index: 0 ``` -------------------------------- ### Configure Bindings for Multi-Binder Scenario Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/multi-binders-with-based-binders-and-regular-binder.adoc Map function bindings to specific binders when using multiple Kafka clusters. This example shows routing data from cluster 1 to cluster 2 using the regular `process` function. ```properties spring.cloud.function.definition=process;kstreamProcess # From cluster 1 to cluster 2 with regular process function spring.cloud.stream.bindings.process-in-0.destination=foo spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1 spring.cloud.stream.bindings.process-out-0.destination=bar spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2 ``` -------------------------------- ### Manually Start Kafka Streams Processors Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/manually-starting-processors.adoc Disable auto-startup for Kafka Streams processors and manually start them using StreamsBuilderFactoryManager. This is useful for applications with large state stores or when integrating with liveness probes. ```java @Bean public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) { return args -> { sbfm.start(); }; } ``` -------------------------------- ### Configure Dynamic Destinations with RabbitMQ Binder Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/event-routing.adoc Implement `NewDestinationBindingCallback` to configure producer properties for dynamically created destinations. This example shows how to set RabbitMQ-specific properties like queue groups and DLQ configuration. ```java @Bean public NewDestinationBindingCallback dynamicConfigurer() { return (name, channel, props, extended) -> { props.setRequiredGroups("bindThisQueue"); extended.setQueueNameGroupOnly(true); extended.setAutoBindDlq(true); extended.setDeadLetterQueueName("myDLQ"); }; } ``` -------------------------------- ### Add Web Dependency Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/binding_visualization_control.adoc Include the spring-boot-starter-web dependency to enable Actuator endpoints for web applications. ```xml org.springframework.boot spring-boot-starter-web ``` -------------------------------- ### Configure Kafka Consumer Start Offset Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka_tips.adoc Use `spring.cloud.stream.kafka.bindings..consumer.startOffset` to control whether a consumer starts from the earliest or latest available offset when first launched or after a rebalance. Defaults to 'earliest' for consumers with a `group.id` and 'latest' for anonymous consumers. ```properties spring.cloud.stream.kafka.bindings..consumer.startOffset=earliest ``` ```properties spring.cloud.stream.kafka.bindings..consumer.startOffset=latest ``` -------------------------------- ### Configure Kafka Streams Binder to Start from Latest Offset Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka_tips.adoc Use these properties to configure specific bindings within a Kafka Streams binder application to start consuming from the latest offset instead of the default earliest. This setting is ignored if committed offsets already exist. ```properties spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest ``` -------------------------------- ### Add RabbitMQ Starter Dependency (Maven) Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/intro.adoc Include this starter dependency for a streamlined setup of the RabbitMQ binder in your Spring Cloud Stream application. It simplifies the configuration process. ```xml org.springframework.cloud spring-cloud-starter-stream-rabbit ``` -------------------------------- ### Reactive Function with Manual Observation Tap Source: https://context7.com/spring-cloud/spring-cloud-stream/llms.txt This example demonstrates how to manually attach Micrometer observations to individual items within a reactive stream using `.tap(Micrometer.observation(registry))`. Ensure Hooks.enableAutomaticContextPropagation() is called. ```java // Reactive function with manual observation tap @SpringBootApplication public class DemoStreamApplication { public static void main(String[] args) { Hooks.enableAutomaticContextPropagation(); SpringApplication.run(DemoStreamApplication.class, args); } @Bean public Function, Flux> uppercase(ObservationRegistry registry) { return flux -> flux.flatMap(item -> Mono.just(item) .map(String::toUpperCase) .tap(Micrometer.observation(registry)) // attach observation per item ); } } ``` -------------------------------- ### Stop Kafka Cluster with Zookeeper Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Use this command to stop the multi-node Kafka cluster started with Zookeeper. ```shell docker-compose -f ./kafka-cluster.yml down ``` -------------------------------- ### Configure Producer for Partitioning (YAML) Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/rabbit/rabbit_partitions.adoc This YAML configuration enables partitioning for the producer binding named `generate-out-0`. It specifies the destination, sets `partitioned` to true, defines the `partition-key-expression` to use the `partitionKey` header, sets the `partition-count`, and lists the `required-groups` for consumer provisioning. ```yaml spring: cloud: stream: bindings: generate-out-0: destination: partitioned.destination producer: partitioned: true partition-key-expression: headers['partitionKey'] partition-count: 2 required-groups: - myGroup ``` -------------------------------- ### Stop Kafka Cluster and Schema Registry Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Stops the Kafka cluster and the Confluent Schema Registry that were started together. ```shell docker-compose -f ./kafka-cluster.yml -f ./schema-registry.yml down ``` -------------------------------- ### FileMessageProducer Implementation Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/spring-cloud-stream/overview-custom-binder-impl.adoc Example implementation of MessageProducer that extends MessageProducerSupport. It polls a file, archives messages, and discards duplicates. ```java public class FileMessageProducer extends MessageProducerSupport { public static final String ARCHIVE = "archive.txt"; private final ConsumerDestination destination; private String previousPayload; public FileMessageProducer(ConsumerDestination destination) { this.destination = destination; } @Override public void doStart() { receive(); } private void receive() { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); executorService.scheduleWithFixedDelay(() -> { String payload = getPayload(); if(payload != null) { Message receivedMessage = MessageBuilder.withPayload(payload).build(); archiveMessage(payload); sendMessage(receivedMessage); } }, 0, 50, MILLISECONDS); } private String getPayload() { try { List allLines = Files.readAllLines(Paths.get(destination.getName())); String currentPayload = allLines.get(allLines.size() - 1); if(!currentPayload.equals(previousPayload)) { previousPayload = currentPayload; return currentPayload; } } catch (IOException e) { throw new RuntimeException(e); } return null; } private void archiveMessage(String payload) { try { Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND); } catch (IOException e) { throw new RuntimeException(e); } } } ``` -------------------------------- ### Add Observability Dependencies Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/observability.adoc Include Spring Boot Actuator for observability endpoints and reactor-core-micrometer for Reactor integration. Add a tracer bridge like micrometer-tracing-brave for tracing capabilities. ```xml org.springframework.boot spring-boot-starter-actuator io.projectreactor reactor-core-micrometer ``` ```xml io.micrometer micrometer-tracing-bridge-brave ``` -------------------------------- ### Include Spring Boot Starter WebFlux Dependency Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Spring-Cloud-Stream-2.0.0-Release-Notes Add this dependency to use Spring Boot Starter WebFlux. ```xml org.springframework.boot spring-boot-starter-webflux ``` -------------------------------- ### Example JSON Payload for Person Class Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/preface.adoc Illustrates the JSON structure expected for a message payload that will be converted to the `Person` class. ```json {"name":"Sam Spade"} ``` -------------------------------- ### Configure Binder Selection Source: https://context7.com/spring-cloud/spring-cloud-stream/llms.txt Specify the binder to use globally or per binding when multiple binders (e.g., Kafka, RabbitMQ) are present on the classpath. This allows fine-grained control over message transport. ```properties # Global default spring.cloud.stream.default-binder=rabbit # Per-binding override: read from Kafka, write to Rabbit spring.cloud.stream.bindings.process-in-0.binder=kafka spring.cloud.stream.bindings.process-out-0.binder=rabbit ``` -------------------------------- ### Enable Publisher Confirms and Returns Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/rabbit/rabbit_overview/error-channels.adoc Configure the connection factory to enable publisher confirms and returns for producer error channel functionality. ```java ccf.setPublisherConfirms(true); ccf.setPublisherReturns(true); ``` -------------------------------- ### Override MessageConverter with Custom Implementation Source: https://github.com/spring-cloud/spring-cloud-stream/wiki/Spring-Cloud-Stream-2.0.0-Release-Notes Configure a custom MessageConverter by annotating a @Bean with @StreamMessageConverter. This example shows how to override with an AlwaysStringKryoMessageConverter. ```java @Bean @StreamMessageConverter public AlwaysStringKryoMessageConverter kryoOverrideMessageConverter() { return new AlwaysStringKryoMessageConverter(MimeType.valueOf("application/x-java-object")); } ``` -------------------------------- ### Branching KStream to Multiple Outputs Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/programming-model.adoc Illustrates how to process an input KStream and branch it into multiple output KStreams, each potentially going to a different topic. This example uses predicates to filter words and count them within time windows. ```java @Bean public Function, KStream[]> process() { Predicate isEnglish = (k, v) -> v.word.equals("english"); Predicate isFrench = (k, v) -> v.word.equals("french"); Predicate isSpanish = (k, v) -> v.word.equals("spanish"); return input -> { final Map> stringKStreamMap = input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, value) -> value) .windowedBy(TimeWindows.of(Duration.ofSeconds(5))) .count(Materialized.as("WordCounts-branch")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, ``` -------------------------------- ### Stop Kafka Cluster and Control Center UI Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/tools/kafka/docker-compose/README.adoc Stops the Kafka cluster and the Confluent Control Center UI that were started together. ```shell docker-compose -f ./kafka-cluster.yml -f ./control-center-ui.yml down ``` -------------------------------- ### Kafka Producer with Custom Header Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka_tips.adoc Example of a Kafka producer that sets a custom header 'foo' with value 'bar' on the outgoing message. ```java @Bean public Supplier> supply() { return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build(); } ``` -------------------------------- ### Run Standalone Schema Registry Server Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/schema-registry/spring-cloud-stream-schema-registry.adoc Download and run the schema registry server as a standalone JAR. This is useful for testing or when a dedicated schema registry instance is needed. ```bash wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar ``` -------------------------------- ### Running a Kafka Streams Processor Application Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/programming-model.adoc Command to run a built uber-jar application, specifying input and output binding destinations for Kafka topics. ```bash java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts ``` -------------------------------- ### Define a Kafka Streams Processing Function Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-streams-binder/ancillaries-to-the-programming-model.adoc Example of a Spring Bean defining a Kafka Streams processing function using BiFunction. ```java @Bean public BiFunction, KTable, KStream> process() { ... } ``` -------------------------------- ### Download EditorConfig and Spring Format Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/README.adoc Download the `.editorconfig` and `.springformat` files from the Spring Cloud Build repository to your project root. These files help apply default formatting rules. ```bash $ curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-build/master/.editorconfig -o .editorconfig $ touch .springformat ``` -------------------------------- ### Manual Acknowledgement Consumer Example Source: https://github.com/spring-cloud/spring-cloud-stream/blob/main/docs/modules/ROOT/pages/kafka/kafka-binder/manual-ack.adoc Configure `spring.cloud.stream.kafka.bindings.input.consumer.ackMode` to `MANUAL`. This consumer retrieves the Acknowledgment from the message headers and explicitly calls `acknowledge()`. ```java @SpringBootApplication public class ManuallyAcknowdledgingConsumer { public static void main(String[] args) { SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args); } @Bean public Consumer> process() { return message -> { Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); if (acknowledgment != null) { System.out.println("Acknowledgment provided"); acknowledgment.acknowledge(); } }; } } ```