### Running a Mono in a New Thread Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/schedulers.html This example demonstrates how to execute a Mono's operations and subscription in a new thread by manually creating and starting a `Thread`. The `subscribe()` call determines the thread where the subsequent operators and callbacks execute. ```java public static void main(String[] args) throws InterruptedException { final Mono mono = Mono.just("hello "); __**(1)** Thread t = new Thread(() -> mono .map(msg -> msg + "thread ") .subscribe(v -> __**(2)** System.out.println(v + Thread.currentThread().getName()) __**(3)** ) ); t.start(); t.join(); } ``` -------------------------------- ### Output of the handle example Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/programmatically-creating-sequence.html The expected output when the `handle` example is executed, showing the successfully mapped alphabet letters. ```text M I T ``` -------------------------------- ### Cold Flux Example Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/reactor-hotCold.html Demonstrates a cold Flux where each subscription triggers a new emission of all data. Use when each subscriber should receive the full sequence independently. ```java Flux source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple")) .map(String::toUpperCase); source.subscribe(d -> System.out.println("Subscriber 1: "+d)); source.subscribe(d -> System.out.println("Subscriber 2: "+d)); ``` -------------------------------- ### Using TupleUtils with Constructor Reference (Java) Source: https://projectreactor.io/docs/core/release/reference/apdx-reactorExtra.html Rewrite the previous example using TupleUtils and a constructor reference for a more concise way to map tuple elements to a Customer object. ```java .map(TupleUtils.function(Customer::new)); __**(1)** ``` -------------------------------- ### Read from and Write to Context Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/context.html This example demonstrates how to both read from and write to the Reactor Context within a Mono. It uses `contextWrite` to put a value and `deferContextual` to read it. ```java String key = "message"; Mono r = Mono.just("Hello") .flatMap(s -> Mono.deferContextual(ctx -> Mono.just(s + " " + ctx.get(key)))) .contextWrite(ctx -> ctx.put(key, "World")); StepVerifier.create(r) .expectNext("Hello World") .verifyComplete(); ``` -------------------------------- ### Example Output of Grouped Strings Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-three-sorts-batching.html Illustrates the console output when grouping strings by their first character with a specific data order and concurrency, potentially leading to deadlocks. ```text a alpha air aim apple ``` -------------------------------- ### Example of Operator Chain for Debugging Source: https://projectreactor.io/docs/core/release/reference/debugging.html This example demonstrates a more complex operator chain where understanding the error propagation sites is crucial for debugging. It highlights how errors can travel through multiple transformations before being observed. ```java FakeRepository.findAllUserByName(Flux.just("pedro", "simon", "stephane")) .transform(FakeUtils1.applyFilters) .transform(FakeUtils2.enrichUser) .blockLast(); ``` -------------------------------- ### Using StepVerifier with Virtual Time Source: https://projectreactor.io/docs/core/release/reference/testing.html Demonstrates how to use StepVerifier with virtual time to test time-based operators like Mono.delay. It shows the setup for virtual time and the sequence of expectations. ```java StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) //... continue expectations here ``` ```java StepVerifier.withVirtualTime(() -> Mono.delay(Duration.ofDays(1))) .expectSubscription() .expectNoEvent(Duration.ofDays(1)) .expectNext(0L) .verifyComplete(); ``` -------------------------------- ### Log error and re-throw in imperative code Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html This example shows the imperative approach to catching an exception, logging a message, and re-throwing the original error. ```java try { return callExternalService(k); } catch (RuntimeException error) { //make a record of the error log("uh oh, falling back, service failed for key " + k); throw error; } ``` -------------------------------- ### Hot Flux Example with Sinks Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/reactor-hotCold.html Simulates a hot Flux using `Sinks.Many` for programmatic emission. Late subscribers only receive elements emitted after their subscription. `tryEmitNext` with `orThrow` is suitable for tests. ```java Sinks.Many hotSource = Sinks.unsafe().many().multicast().directBestEffort(); Flux hotFlux = hotSource.asFlux().map(String::toUpperCase); hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d)); hotSource.emitNext("blue", FAIL_FAST); hotSource.tryEmitNext("green").orThrow(); hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: "+d)); hotSource.emitNext("orange", FAIL_FAST); hotSource.emitNext("purple", FAIL_FAST); hotSource.emitComplete(FAIL_FAST); ``` -------------------------------- ### Dynamic Fallback with onErrorResume and Error Filtering Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html This example demonstrates how to use onErrorResume with a function to dynamically choose a fallback strategy based on the type of error encountered. ```java Flux.just("timeout1", "unknown", "key2") .flatMap(k -> callExternalService(k) .onErrorResume(error -> { if (error instanceof TimeoutException) return getFromCache(k); else if (error instanceof UnknownKeyException) return registerNewEntry(k, "DEFAULT"); else return Flux.error(error); }) ); ``` -------------------------------- ### Imperative Fallback Method Example Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html This is the imperative equivalent of catching an error and retrieving data from a cache. ```java String v1; try { v1 = callExternalService("key1"); } catch (Throwable error) { v1 = getFromCache("key1"); } String v2; try { v2 = callExternalService("key2"); } catch (Throwable error) { v2 = getFromCache("key2"); } ``` -------------------------------- ### Example of Callback Hell in Java Source: https://projectreactor.io/docs/core/release/reference/reactiveProgramming.html This snippet demonstrates a complex scenario using nested callbacks, illustrating the difficulty in composing asynchronous operations and leading to unmaintainable code. It involves fetching user favorites and handling cases where favorites are empty by fetching suggestions. ```java userService.getFavorites(userId, new Callback>() { __**(1)** public void onSuccess(List list) { __**(2)** if (list.isEmpty()) { __**(3)** suggestionService.getSuggestions(new Callback>() { public void onSuccess(List list) { __**(4)** UiUtils.submitOnUiThread(() -> { __**(5)** list.stream() .limit(5) .forEach(uiList::show); __**(6)** }); } public void onError(Throwable error) { __**(7)** UiUtils.errorPopup(error); } }); } else { list.stream() __**(8)** .limit(5) .forEach(favId -> favoriteService.getDetails(favId, __**(9)** new Callback() { public void onSuccess(Favorite details) { UiUtils.submitOnUiThread(() -> uiList.show(details)); } public void onError(Throwable error) { UiUtils.errorPopup(error); } } )); } } public void onError(Throwable error) { UiUtils.errorPopup(error); } }); ``` -------------------------------- ### Automatic Connection with autoConnect Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-broadcast-multiple-subscribers-connectableflux.html Shows how to use `autoConnect(n)` to automatically trigger the subscription to the source Flux once `n` subscribers have been attached. The source is subscribed to when the second subscriber is added in this example. ```java Flux source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); Flux autoCo = source.publish().autoConnect(2); autoCo.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("subscribed first"); Thread.sleep(500); System.out.println("subscribing second"); autoCo.subscribe(System.out::println, e -> {}, () -> {}); ``` -------------------------------- ### Using subscribeOn to Schedule Subscription Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/schedulers.html This example demonstrates how to use subscribeOn to switch the execution context for the entire operator chain. The scheduler is created, the Flux is defined with subscribeOn, and then subscribed to on a separate thread which is immediately shifted to the scheduler's thread. ```java Scheduler s = Schedulers.newParallel("parallel-scheduler", 4); final Flux flux = Flux .range(1, 2) .map(i -> 10 + i) .subscribeOn(s) .map(i -> "value " + i); new Thread(() -> flux.subscribe(System.out::println)); ``` -------------------------------- ### Using TupleUtils with Lambda (Java) Source: https://projectreactor.io/docs/core/release/reference/apdx-reactorExtra.html This example demonstrates how to use TupleUtils to bridge functional interfaces with Tuples, simplifying access to tuple elements within a map operation. ```java .map(tuple -> { String firstName = tuple.getT1(); String lastName = tuple.getT2(); String address = tuple.getT3(); return new Customer(firstName, lastName, address); }); ``` -------------------------------- ### Exponential Backoff with retryWhen and Retry.backoff Source: https://projectreactor.io/docs/core/release/reference/faq.html Implement exponential backoff for retries using `Retry.backoff`. This example configures retries with a maximum of 3 attempts, no jitter, and logs messages before and after retry delays. It customizes the behavior on retry exhaustion to directly emit the cause. ```java AtomicInteger errorCount = new AtomicInteger(); Flux flux = Flux.error(new IllegalStateException("boom")) .doOnError(e -> { errorCount.incrementAndGet(); System.out.println(e + " at " + LocalTime.now()); }) .retryWhen(Retry .backoff(3, Duration.ofMillis(100)).jitter(0d) .doAfterRetry(rs -> System.out.println("retried at " + LocalTime.now() + ", attempt " + rs.totalRetries())) .onRetryExhaustedThrow((spec, rs) -> rs.failure()) ); ``` -------------------------------- ### Mono zipWith Never Called Example Source: https://projectreactor.io/docs/core/release/reference/faq.html Demonstrates a scenario where zipWith is not called because the preceding Mono returns Mono. ```java myMethod.process("a") // this method returns Mono .zipWith(myMethod.process("b"), combinator) //this is never called .subscribe(); ``` -------------------------------- ### Convert Sink to Flux Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/sinks.html Presents a Sinks.Many sink as a Flux view for downstream consumption. This example takes elements until they are less than 10, logs them, and blocks until the last element is received. ```java Flux fluxView = replaySink.asFlux(); fluxView .takeWhile(i -> i < 10) .log() .blockLast(); ``` -------------------------------- ### Custom Retry Logic with retryWhen Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html Implement custom retry strategies using retryWhen. This example emulates retry(3) by taking the first 3 signals from the retry companion. ```java Flux flux = Flux .error(new IllegalArgumentException()) .doOnError(System.out::println) .retryWhen(companion -> companion.take(3)); ``` -------------------------------- ### Activate Global Debug Mode Source: https://projectreactor.io/docs/core/release/reference/debugging.html Call `Hooks.onOperatorDebug()` at application start, before instantiating Flux or Mono, to enable stack trace capture for all operators. ```java Hooks.onOperatorDebug(); ``` -------------------------------- ### Create Mono and Flux with Factory Methods Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html Utilize factory methods like `Mono.empty()`, `Mono.just()`, `Flux.range()` to create Mono and Flux instances. `Flux.range(start, count)` generates a sequence of integers. ```java Mono noData = Mono.empty(); Mono data = Mono.just("foo"); Flux numbersFromFiveToSeven = Flux.range(5, 3); ``` -------------------------------- ### Custom Retry Logic with Retry.from Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html Implement custom retry logic by adapting a `Function` lambda. This example retries up to 3 times, logging each error, and then throws the original exception. ```java AtomicInteger errorCount = new AtomicInteger(); Flux flux = Flux.error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) .retryWhen(Retry.from(companion -> companion.map(rs -> { if (rs.totalRetries() < 3) return rs.totalRetries(); else throw Exceptions.propagate(rs.failure()); }) )); ``` -------------------------------- ### Resource Cleanup with `create` and `push` Source: https://projectreactor.io/docs/core/release/reference/producing.html Demonstrates how to handle resource cleanup when using `Flux.create` or `Flux.push` to ensure resources are released properly upon completion or cancellation. ```java Flux.create(sink -> { // Simulate resource acquisition Resource resource = acquireResource(); sink.onNext("Using resource"); sink.setCancellation(() -> releaseResource(resource)); // Cleanup on cancellation sink.complete(); }); Flux.push(sink -> { // Simulate resource acquisition Resource resource = acquireResource(); sink.next("Using resource"); sink.onDispose(() -> releaseResource(resource)); // Cleanup on dispose sink.complete(); }); ``` -------------------------------- ### Imperative Dynamic Fallback Value Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html An imperative example of catching an error and returning a specific error-holding wrapper. ```java try { Value v = erroringMethod(); return MyWrapper.fromValue(v); } catch (Throwable error) { return MyWrapper.fromError(error); } ``` -------------------------------- ### Create a Replay Sink Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/sinks.html Initializes a Sinks.Many sink that replays all pushed data to subscribers. ```java Sinks.Many replaySink = Sinks.many().replay().all(); ``` -------------------------------- ### Create and Subscribe to a Flux (No Arguments) Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html Demonstrates the simplest way to subscribe to a Flux without providing any consumers. This subscribes to the Flux but does not process or log emitted items. ```java Flux ints = Flux.range(1, 3); ints.subscribe(); ``` -------------------------------- ### Test Command Empty Path with PublisherProbe Source: https://projectreactor.io/docs/core/release/reference/testing.html Demonstrates using PublisherProbe to verify that the fallback path (doWhenEmpty) is executed when the command source is empty. It asserts subscription, request, and non-cancellation of the probe. ```java @Test public void testCommandEmptyPathIsUsed() { PublisherProbe probe = PublisherProbe.empty(); StepVerifier.create(processOrFallback(Mono.empty(), probe.mono())) .verifyComplete(); probe.assertWasSubscribed(); probe.assertWasRequested(); probe.assertWasNotCancelled(); } ``` -------------------------------- ### Manual Connection with ConnectableFlux Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-broadcast-multiple-subscribers-connectableflux.html Demonstrates how to manually connect a ConnectableFlux to its source after multiple subscribers have been set up. The source subscription is triggered only when `connect()` is called. ```java Flux source = Flux.range(1, 3) .doOnSubscribe(s -> System.out.println("subscribed to source")); ConnectableFlux co = source.publish(); co.subscribe(System.out::println, e -> {}, () -> {}); co.subscribe(System.out::println, e -> {}, () -> {}); System.out.println("done subscribing"); Thread.sleep(500); System.out.println("will now connect"); co.connect(); ``` -------------------------------- ### Test Zip Optional All Subscribed Source: https://projectreactor.io/docs/core/release/reference/faq.html Demonstrates using the singleOptional operator with zip to ensure all publishers are subscribed to, even when they are empty-completed. This preserves the desired subscription semantics. ```java @Test public void testZipOptionalAllSubscribed() { AtomicInteger cnt = new AtomicInteger(); Mono mono1 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono mono2 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono mono3 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono> zippedMono = Mono.zip( mono1.singleOptional(), Mono.zip(mono2.singleOptional(), mono3.singleOptional(), (v1, v2) -> v1), (v1, v2) -> v1); zippedMono.subscribe(); assertEquals(3, cnt.get()); } ``` -------------------------------- ### Subscribe to a Flux with a BaseSubscriber Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html Demonstrates how to subscribe to a Flux using a custom BaseSubscriber implementation. Ensure BaseSubscriber instances are single-use. ```java SampleSubscriber ss = new SampleSubscriber(); Flux ints = Flux.range(1, 4); ints.subscribe(ss); ``` -------------------------------- ### Customizing Initial Request with BaseSubscriber Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html Use BaseSubscriber to customize the initial request. Override hookOnSubscribe to manually request a specific number of elements. Be cautious to request enough elements to avoid blocking the Flux. ```java Flux.range(1, 10) .doOnRequest(r -> System.out.println("request of " + r)) .subscribe(new BaseSubscriber() { @Override public void hookOnSubscribe(Subscription subscription) { request(1); } @Override public void hookOnNext(Integer integer) { System.out.println("Cancelling after having received " + integer); cancel(); } }); ``` -------------------------------- ### Add Description and Force Stack Trace with checkpoint() Source: https://projectreactor.io/docs/core/release/reference/debugging.html Use checkpoint("description", true) to augment the traceback with a custom description while still including the stack trace. This is useful for identifying specific assembly sites. ```java Assembly trace from producer [reactor.core.publisher.ParallelSource], described as [descriptionCorrelation1234] : __**(1)** reactor.core.publisher.ParallelFlux.checkpoint(ParallelFlux.java:215) reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225) Error has been observed at the following site(s): |_ ParallelFlux.checkpoint ⇢ reactor.core.publisher.FluxOnAssemblyTest.parallelFluxCheckpointDescriptionAndForceStack(FluxOnAssemblyTest.java:225) ``` -------------------------------- ### Overlapping Windows with `window(maxSize, skip)` Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-three-sorts-batching.html Demonstrates how to create overlapping windows using `window(maxSize, skip)`. Elements are included in multiple windows when `maxSize` is greater than `skip`. Empty windows are represented as -1. ```java StepVerifier.create( Flux.range(1, 10) .window(5, 3) //overlapping windows .concatMap(g -> g.defaultIfEmpty(-1)) //show empty windows as -1 ) .expectNext(1, 2, 3, 4, 5) .expectNext(4, 5, 6, 7, 8) .expectNext(7, 8, 9, 10) .expectNext(10) .verifyComplete(); ``` -------------------------------- ### Handling Checked Exceptions in map Operator Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html Illustrates how to handle checked exceptions (like IOException) within a map operator by wrapping them or recovering. This example shows a conversion method that can throw an IOException. ```java public String convert(int i) throws IOException { if (i > 3) { throw new IOException("boom " + i); } return "OK " + i; } ``` -------------------------------- ### Create StepVerifier with Initial Context and Assert Context Propagation Source: https://projectreactor.io/docs/core/release/reference/testing.html Use StepVerifierOptions to provide an initial Context when creating a StepVerifier. This snippet demonstrates asserting the presence and content of the propagated Context before verifying the sequence data. ```java StepVerifier.create(Mono.just(1).map(i -> i + 10), StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2"))) .expectAccessibleContext() .contains("thing1", "thing2") .then() .expectNext(11) .verifyComplete(); ``` -------------------------------- ### Specify Nullability for Arrays and Varargs Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/null-safety.html Differentiate nullability of array elements from the array itself using specific syntax. For example, '@Nullable Object[]' allows null elements, while 'Object @Nullable []' allows a null array. ```java // @Nullable Object[] array: individual elements can be null, but the array itself cannot. // Object @Nullable [] array: individual elements cannot be null, but the array itself can. // @Nullable Object @Nullable [] array: both individual elements and the array can be null. ``` -------------------------------- ### Convert Java Mono/Flux to Kotlin Equivalents Source: https://projectreactor.io/docs/core/release/reference/kotlin.html Kotlin extensions provide a more concise way to create Mono and Flux instances compared to Java. These extensions require importing the relevant `reactor.kotlin` packages. ```kotlin "foo".toMono() ``` ```kotlin list.toFlux() ``` ```kotlin RuntimeException().toMono() ``` ```kotlin RuntimeException().toFlux() ``` -------------------------------- ### Overlapping Buffers with Flux.buffer() Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-three-sorts-batching.html Demonstrates creating overlapping buffers where each buffer contains a specified number of elements, and a new buffer starts after a certain skip count. This can lead to elements appearing in multiple buffers. ```java StepVerifier.create( Flux.range(1, 10) .buffer(5, 3) //overlapping buffers ) .expectNext(Arrays.asList(1, 2, 3, 4, 5)) .expectNext(Arrays.asList(4, 5, 6, 7, 8)) .expectNext(Arrays.asList(7, 8, 9, 10)) .expectNext(Collections.singletonList(10)) .verifyComplete(); ``` -------------------------------- ### Wrap Scheduler with Micrometer Metrics Source: https://projectreactor.io/docs/core/release/reference/metrics.html Wrap an existing Scheduler with Micrometer metrics to measure task execution times. Provide the original scheduler, a MeterRegistry, a meter name prefix, and optional tags. ```java Scheduler originalScheduler = Schedulers.newParallel("test", 4); Scheduler schedulerWithMetrics = Micrometer.timedScheduler( originalScheduler, applicationDefinedMeterRegistry, "testingMetrics", Tags.of(Tag.of("additionalTag", "yes")) ); ``` -------------------------------- ### Enhanced Stack Trace Example Source: https://projectreactor.io/docs/core/release/reference/debugging.html This is a representation of an enhanced stack trace when operator stack trace debugging is enabled. It includes truncated original traces, suppressed exceptions with assembly traces, and error observation sites. ```java java.lang.IndexOutOfBoundsException: Source emitted more than one item at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) __**(1)** Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: __**(2)** Assembly trace from producer [reactor.core.publisher.MonoSingle] : __**(3)** reactor.core.publisher.Flux.single(Flux.java:7915) reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) Error has been observed at the following site(s): __**(4)** *_______Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1017) __**(5)** |_ Mono.subscribeOn ⇢ at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1071) __**(6)** Original Stack Trace: __**(7)** at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:127) ... __**(8)** ... at reactor.core.publisher.Mono.subscribeWith(Mono.java:4363) at reactor.core.publisher.Mono.subscribe(Mono.java:4223) at reactor.core.publisher.Mono.subscribe(Mono.java:4159) at reactor.core.publisher.Mono.subscribe(Mono.java:4131) at reactor.guide.GuideTests.debuggingActivated(GuideTests.java:1067) Copied! ``` -------------------------------- ### Initialize ReactorDebugAgent Source: https://projectreactor.io/docs/core/release/reference/debugging.html Initialize the ReactorDebugAgent. This should be done early in your application's lifecycle, ideally before other operations. ```java ReactorDebugAgent.init(); ``` -------------------------------- ### Test Zip Empty Completion All Subscribed Source: https://projectreactor.io/docs/core/release/reference/faq.html Demonstrates a scenario where zip subscribes to all provided monos when they are empty-completed. This behavior is not guaranteed in all cases. ```java @Test public void testZipEmptyCompletionAllSubscribed() { AtomicInteger cnt = new AtomicInteger(); Mono mono1 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono mono2 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono zippedMono = Mono.zip(mono1, mono2, (v1, v2) -> v1); zippedMono.subscribe(); assertEquals(2, cnt.get()); } ``` -------------------------------- ### Instrument Publisher with Micrometer Metrics Source: https://projectreactor.io/docs/core/release/reference/metrics.html Enable metrics collection for a reactive pipeline using the tap operator and Micrometer.metrics API. This allows monitoring signals within the pipeline. ```java listenToEvents() .name("events") .tap(Micrometer.metrics( applicationDefinedMeterRegistry )) .doOnNext(event -> log.info("Received {}", event)) .delayUntil(this::processEvent) .retry() .subscribe(); ``` -------------------------------- ### Test Empty Path Execution Source: https://projectreactor.io/docs/core/release/reference/testing.html Verifies that the `processOrFallback` method correctly uses the fallback publisher when the source is empty. This test ensures the `switchIfEmpty` logic is functioning as expected. ```java @Test public void testEmptyPathIsUsed() { StepVerifier.create(processOrFallback(Mono.empty(), Mono.just("EMPTY_PHRASE"))) .expectNext("EMPTY_PHRASE") .verifyComplete(); } ``` -------------------------------- ### Subscribing to a Flux with Debugging Source: https://projectreactor.io/docs/core/release/reference/debugging.html This snippet shows how to subscribe to a Flux, potentially for debugging purposes. Ensure the Flux is properly instantiated before subscribing. ```java toDebug .subscribeOn(Schedulers.immediate()) .subscribe(System.out::println, Throwable::printStackTrace); ``` -------------------------------- ### Test Zip Empty Completion One Subscribed Source: https://projectreactor.io/docs/core/release/reference/faq.html Illustrates a case where zip does not subscribe to all monos due to early completion of one of the zipped publishers. This highlights the non-guaranteed subscription behavior. ```java @Test public void testZipEmptyCompletionOneSubscribed() { AtomicInteger cnt = new AtomicInteger(); Mono mono1 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono mono2 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono mono3 = Mono.create(sink -> { cnt.incrementAndGet(); sink.success(); }); Mono zippedMono = Mono.zip(mono1, Mono.zip(mono2, mono3, (v1, v2) -> v1), (v1, v2) -> v1); zippedMono.subscribe(); assertEquals(1, cnt.get()); } ``` -------------------------------- ### Log onNext Signals with Contextual MDC Source: https://projectreactor.io/docs/core/release/reference/faq.html Use this helper method with `doOnEach` to log onNext signals while including contextual information from Reactor's Context in MDC. Ensure the key is present in the Context to avoid errors. ```java @GetMapping("/byPrice") public Flux byPrice(@RequestParam Double maxPrice, @RequestHeader(required = false, name = "X-UserId") String userId) { String apiId = userId == null ? "" : userId; __**(1)** return restaurantService.byPrice(maxPrice)) .doOnEach(logOnNext(r -> LOG.debug("found restaurant {} for ${}", __**(2)** r.getName(), r.getPricePerPerson()))) .contextWrite(Context.of("CONTEXT_KEY", apiId)); __**(3)** } ``` -------------------------------- ### Context Write Before Read Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/context.html Demonstrates that `contextWrite` is executed before `deferContextual` during subscription, making the written value available for reading. ```java String key = "message"; Mono r = Mono.just("Hello") .flatMap(s -> Mono.deferContextual(ctx -> Mono.just(s + " " + ctx.get(key)))) __**(2)** .contextWrite(ctx -> ctx.put(key, "World")); __**(1)** StepVerifier.create(r) .expectNext("Hello World") __**(3)** .verifyComplete(); ``` -------------------------------- ### Instrument Publisher with Custom Tags Source: https://projectreactor.io/docs/core/release/reference/metrics.html Add custom tags to metrics collected from a reactive pipeline. Use the .tag() operator before tapping Micrometer.metrics to assign specific labels to your metrics. ```java listenToEvents() .name("events") .tag("source", "kafka") .tap(Micrometer.metrics(applicationDefinedRegistry)) .doOnNext(event -> log.info("Received {}", event)) .delayUntil(this::processEvent) .retry() .subscribe(); ``` -------------------------------- ### Using defaultIfEmpty Before zipWhen Source: https://projectreactor.io/docs/core/release/reference/faq.html Shows how to use defaultIfEmpty to provide a value for an empty Mono, ensuring subsequent operators like zipWhen are called. ```java myMethod.emptySequenceForKey("a") // this method returns empty Mono .defaultIfEmpty("") // this converts empty sequence to just the empty String .zipWhen(aString -> myMethod.process("b")) //this is called with the empty String .subscribe(); ``` -------------------------------- ### Predicate-Based Windowing with `windowWhile` Source: https://projectreactor.io/docs/core/release/reference/advancedFeatures/advanced-three-sorts-batching.html Illustrates predicate-based windowing using `windowWhile`. New windows are opened as long as the predicate holds true for consecutive elements. Elements not matching the predicate can lead to empty windows, shown as -1. ```java StepVerifier.create( Flux.just(1, 3, 5, 2, 4, 6, 11, 12, 13) .windowWhile(i -> i % 2 == 0) .concatMap(g -> g.defaultIfEmpty(-1)) ) .expectNext(-1, -1, -1) //respectively triggered by odd 1 3 5 .expectNext(2, 4, 6) // triggered by 11 .expectNext(12) // triggered by 13 // however, no empty completion window is emitted (would contain extra matching elements) .verifyComplete(); ``` -------------------------------- ### Equivalent Try-Catch Block in Java Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/error-handling.html Illustrates the imperative programming equivalent of the Reactor Flux error handling, showing how an exception in the loop is caught. ```java try { for (int i = 1; i < 11; i++) { String v1 = doSomethingDangerous(i); String v2 = doSecondTransform(v1); System.out.println("RECEIVED " + v2); } } catch (Throwable t) { System.err.println("CAUGHT " + t); } ``` -------------------------------- ### Process or Fallback Mono with SwitchIfEmpty Source: https://projectreactor.io/docs/core/release/reference/testing.html This method handles a command source, executing a task if a command is present, or performing a fallback action if the source is empty. It returns Mono to signify completion without data. ```java public Mono processOrFallback(Mono commandSource, Mono doWhenEmpty) { return commandSource .flatMap(command -> executeCommand(command).then()) .switchIfEmpty(doWhenEmpty); } ``` -------------------------------- ### Thread Affinity with publishOn() Source: https://projectreactor.io/docs/core/release/reference/faq.html Demonstrates how to use multiple publishOn operators to switch execution contexts in a Flux. The placement of publishOn is significant, affecting where subsequent operators run. ```java Sinks.Many dataSinks = Sinks.many().unicast().onBackpressureBuffer(); Flux source = dataSinks.asFlux(); source.publishOn(scheduler1) .map(i -> transform(i)) .publishOn(scheduler2) .doOnNext(i -> processNext(i)) .subscribe(); ``` -------------------------------- ### Combine Flux Results Asynchronously with Reactor Source: https://projectreactor.io/docs/core/release/reference/reactiveProgramming.html Shows how to achieve similar asynchronous combination logic using Project Reactor's Flux and Mono. Preferred for new reactive applications. ```java Flux ids = ifhrIds(); Flux combinations = ids.flatMap(id -> { Mono nameTask = ifhrName(id); Mono statTask = ifhrStat(id); return nameTask.zipWith(statTask, (name, stat) -> "Name " + name + " has stats " + stat); }); Mono> result = combinations.collectList(); List results = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" ); ``` -------------------------------- ### Initialize and Process Existing Classes Source: https://projectreactor.io/docs/core/release/reference/debugging.html Initialize the ReactorDebugAgent and then process existing classes. This is useful in environments like JUnit5 tests where eager initialization might not be feasible. ```java ReactorDebugAgent.init(); ReactorDebugAgent.processExistingClasses(); ``` -------------------------------- ### Observe Reactive Chain with Custom Observation Source: https://projectreactor.io/docs/core/release/reference/metrics.html Customize Micrometer's Observation by providing your own supplier function to `Micrometer.observation`. This allows for custom `ObservationConvention`, `Context` supplier, and `ObservationRegistry`. Use the `tap` operator to integrate the custom observation into the reactive chain. ```java listenToEvents() .name("events") __**(1)** .doOnNext(event -> log.info("Received {}", event)) .delayUntil(this::processEvent) .tap(Micrometer.observation( applicationDefinedRegistry, registry -> Observation.createNotStarted( myConvention, myContextSupplier, registry))) .retry() .subscribe(); ``` -------------------------------- ### Minimalistic BaseSubscriber Implementation Source: https://projectreactor.io/docs/core/release/reference/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html A basic implementation of BaseSubscriber for handling reactive streams. It overrides hooks for subscription and element processing, managing requests manually. ```java import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; public class SampleSubscriber extends BaseSubscriber { @Override public void hookOnSubscribe(Subscription subscription) { System.out.println("Subscribed"); request(1); } @Override public void hookOnNext(T value) { System.out.println(value); request(1); } ```