### Gradle Build Output Example Source: https://github.com/reactivex/rxjava/wiki/Getting-Started Example output of a successful Gradle build process for RxJava. ```bash $ ./gradlew build :rxjava:compileJava :rxjava:processResources UP-TO-DATE :rxjava:classes :rxjava:jar :rxjava:sourcesJar :rxjava:signArchives SKIPPED :rxjava:assemble :rxjava:licenseMain UP-TO-DATE :rxjava:licenseTest UP-TO-DATE :rxjava:compileTestJava :rxjava:processTestResources UP-TO-DATE :rxjava:testClasses :rxjava:test :rxjava:check :rxjava:build BUILD SUCCESSFUL Total time: 30.758 secs ``` -------------------------------- ### Hello World in Groovy Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava A 'Hello World' example in Groovy using Observable.from and a lambda for subscription. ```groovy def hello(String[] names) { Observable.from(names).subscribe { println "Hello ${it}!" } } ``` -------------------------------- ### Hello World in Scala Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava A 'Hello World' example in Scala using Observable.from and a lambda for subscription. ```scala import rx.lang.scala.Observable def hello(names: String*) { Observable.from(names) subscribe { n => println(s"Hello $n!") } } ``` -------------------------------- ### Observable fromStream Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates creating an Observable from a Java Stream. ```java Observable stream2 = Observable.fromStream(list.stream()); ``` -------------------------------- ### flatMapSingle Example Source: https://github.com/reactivex/rxjava/wiki/Transforming-Observables Applies a function to each item emitted by a source, where the function returns a SingleSource. Merges the items from these SingleSources. This example uses Single.timer to delay emission. ```java Observable.just(4, 2, 1, 3) .flatMapSingle(x -> Single.timer(x, TimeUnit.SECONDS).map(i -> x)) .blockingSubscribe(System.out::print); // prints 1234 ``` -------------------------------- ### Hello World Execution in Java Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava Demonstrates the execution of the Java 'Hello World' example with sample arguments and expected output. ```java hello("Ben", "George"); Hello Ben! Hello George! ``` -------------------------------- ### Hello World Execution in Groovy Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava Demonstrates the execution of the Groovy 'Hello World' example with sample arguments and expected output. ```groovy hello("Ben", "George") Hello Ben! Hello George! ``` -------------------------------- ### Hello World Execution in Clojure Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava Demonstrates the execution of the Clojure 'Hello World' example with sample arguments and expected output. ```clojure (hello ["Ben" "George"]) Hello Ben! Hello George! ``` -------------------------------- ### Flowable fromStream Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates creating a Flowable from a Java Stream. ```java Flowable stream = Flowable.fromStream(IntStream.range(1, 10).boxed()); ``` -------------------------------- ### throttleFirst() example Source: https://github.com/reactivex/rxjava/wiki/Filtering-Observables Emits only the first item emitted by a reactive source during sequential time windows of a specified duration. This example uses Schedulers.io() and blockingSubscribe, and requires TimeUnit. ```java // Diagram: // -A----B-C-------D-----E-|--> // a---------1s // d-------|--> // -A--------------D-------|--> Observable source = Observable.create(emitter -> { emitter.onNext("A"); Thread.sleep(500); emitter.onNext("B"); Thread.sleep(200); emitter.onNext("C"); Thread.sleep(800); emitter.onNext("D"); Thread.sleep(600); emitter.onNext("E"); emitter.onComplete(); }); source.subscribeOn(Schedulers.io()) .throttleFirst(1, TimeUnit.SECONDS) .blockingSubscribe( item -> System.out.println("onNext: " + item), Throwable::printStackTrace, () -> System.out.println("onComplete")); // prints: // onNext: A // onNext: D // onComplete ``` -------------------------------- ### fromStream example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates how to use `Flowable.fromStream` and `Observable.fromStream` to convert Java Streams into reactive types. ```APIDOC ## fromStream example ### Description This example shows how to create a `Flowable` from an `IntStream` and an `Observable` from a `Stream` of a list. ### Code ```java Flowable stream = Flowable.fromStream(IntStream.range(1, 10).boxed()); Observable stream2 = Observable.fromStream(list.stream()); ``` ``` -------------------------------- ### Replay with Scheduler Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates how to use `replay()` and `observeOn()` with a `Scheduler`. This pattern is useful when you need to control the threading for the replayed observable. ```java ConnectableFlowable connectable = source.replay(); Flowable flowable = connectable.observeOn(Schedulers.io()); // hand flowable to consumers flowable.subscribe(); connectable.connect(); ``` -------------------------------- ### flatMapIterable Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates the use of flatMapIterable to transform each item into an iterable and flatten the results. ```java Flowable.range(1, 10) .parallel() .runOn(Schedulers.computation()) .flatMapIterable(v -> Arrays.asList(v, v + 1)); ``` -------------------------------- ### take() example Source: https://github.com/reactivex/rxjava/wiki/Filtering-Observables Emits only the first *n* items from a source Observable. Requires the Observable class. ```java Observable source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); source.take(4) .subscribe(System.out::println); // prints: // 1 // 2 // 3 // 4 ``` -------------------------------- ### Example of flatMap with just() and empty() Source: https://github.com/reactivex/rxjava/wiki/Writing-operators-for-2.0 Demonstrates how `just()` and `empty()` might appear within a `flatMap` operation. Recognizing their emission properties can lead to optimizations. ```java source.flatMap(v -> { if (v % 2 == 0) { return just(v); } return empty(); }) ``` -------------------------------- ### flatMap Example Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Transforming-Observables.md Applies a function returning a reactive source to each item and merges the results. The order of emissions is not guaranteed. ```java Observable.just("A", "B", "C") .flatMap(a -> { return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS) .map(b -> '(' + a + ", " + b + ')'); }) .blockingSubscribe(System.out::println); // prints (not necessarily in this order): // (A, 1) // (C, 1) // (B, 1) // (A, 2) // (C, 2) // (B, 2) // (A, 3) // (C, 3) // (B, 3) ``` -------------------------------- ### fromStream Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates converting a java.util.stream.Stream into a Flowable or Observable. Note that primitive streams must be boxed first. ```java Flowable.fromStream(streamOfObjects); Observable.fromStream(streamOfObjects); ``` -------------------------------- ### Maybe fromCompletionStage Example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Wraps a CompletionStage into a Maybe. Note that cancellation of the Maybe does not cancel the underlying CompletionStage. ```java Maybe failed = Maybe.fromCompletionStage( CompletableFurure.completedFuture(0) .thenAccept(v -> { throw new RuntimeException(); }) ); ``` -------------------------------- ### reduce Example Source: https://github.com/reactivex/rxjava/wiki/Mathematical-and-Aggregate-Operators Applies a function to each emitted item sequentially and emits only the final accumulated value. ```java Observable.range(1, 5) .reduce((product, x) -> product * x) .subscribe(System.out::println); // prints 120 ``` -------------------------------- ### fromCompletionStage example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Illustrates how to wrap `CompletionStage` instances (like `CompletableFuture`) into reactive types such as `Flowable`, `Observable`, `Maybe`, `Single`, and `Completable`. ```APIDOC ## fromCompletionStage example ### Description This example demonstrates converting `CompletionStage` instances into reactive types. It shows usage with `Flowable`, `Observable`, and `Maybe`, including handling potential errors within the `CompletionStage`. ### Code ```java Flowable someAsync = Flowable.fromCompletionStage( operation.getAsync() ); Obervable otherAsync = Observable.fromCompletionStage( CompletableFuture.completedFuture(1) ); Maybe failed = Maybe.fromCompletionStage( CompletableFurure.completedFuture(0) .thenAccept(v -> { throw new RuntimeException(); }) ``` ``` -------------------------------- ### flatMapSingle Example with Empty Maybe Source Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Transforming-Observables.md Demonstrates the behavior of `Maybe::flatMapSingle` when the source Maybe is empty. It results in an onError notification. ```java Maybe emptySource = Maybe.empty(); Single result = emptySource.flatMapSingle(x -> Single.just(x)); result.subscribe( x -> System.out.println("onSuccess will not be printed!"), error -> System.out.println("onError: Source was empty!")); ``` -------------------------------- ### reduceWith Example Source: https://github.com/reactivex/rxjava/wiki/Mathematical-and-Aggregate-Operators Applies a function to each emitted item sequentially, accumulating into a mutable data structure, and emits the final structure. ```java Observable.just(1, 2, 2, 3, 4, 4, 4, 5) .reduceWith(TreeSet::new, (set, x) -> { set.add(x); return set; }) .subscribe(System.out::println); // prints [1, 2, 3, 4, 5] ``` -------------------------------- ### From Future with Scheduler Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Provides examples of using `Flowable.fromFuture()` with explicit `subscribeOn()` calls to manage threading. This replaces the convenience overloads that were removed. ```java Flowable.fromFuture(future).subscribeOn(Schedulers.io()); ``` ```java Flowable.fromFuture(future, 5, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()); ``` -------------------------------- ### Fetch Wikipedia Articles Asynchronously in Clojure Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava An example in Clojure demonstrating asynchronous fetching of Wikipedia articles. It uses a future to perform HTTP GET requests and emits the HTML content for each article. ```clojure (defn fetchWikipediaArticleAsynchronously [wikipediaArticleNames] "Fetch a list of Wikipedia articles asynchronously. return Observable of HTML" (Observable/create (fn [subscriber] (let [f (future (doseq [articleName wikipediaArticleNames] (-> subscriber (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName))))) ; after sending response to onnext we complete the sequence (-> subscriber .onCompleted)) )))) ``` ```clojure (-> (fetchWikipediaArticleAsynchronously ["Tiger" "Elephant"]) (.subscribe #(println "--- Article ---" (subs (:body %) 0 125) "..."))) ``` -------------------------------- ### Emit Increasing Longs with interval in Java Source: https://context7.com/reactivex/rxjava/llms.txt Use `interval` to periodically emit an ever-increasing `Long`, starting at 0. By default, it runs on `Schedulers.computation()`. This example prints 'Tick' or 'Tock' every second. ```java Observable clock = Observable.interval(1, TimeUnit.SECONDS); clock.subscribe(tick -> { if (tick % 2 == 0) System.out.println("Tick"); else System.out.println("Tock"); }); // prints "Tick", "Tock", "Tick", ... every second ``` -------------------------------- ### Select Element by Position with `elementAt`, `first`, `last` Source: https://context7.com/reactivex/rxjava/llms.txt Use `elementAt` to get an item at a specific index, `first` to get the first item (with a default if the source is empty), and `lastOrError` to get the last item or throw an error if the source is empty. ```java // elementAt — get factorial at index 5 (720) Observable.generate(() -> 1L, (s, e) -> { e.onNext(s); return s + 1L; }) .scan((p, x) -> p * x) .elementAt(5) .subscribe(System.out::println); // 720 ``` ```java // first with default Observable.just("A", "B", "C").first("D").subscribe(System.out::println); // A ``` ```java // lastOrError on empty source Observable.empty().lastOrError() .subscribe( v -> {}, e -> System.out.println("onError: " + e))); // onError: NoSuchElementException ``` -------------------------------- ### CompositeException stacktrace example Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 This example demonstrates the nested structure of exceptions within a CompositeException, showing how multiple errors are aggregated and displayed. ```text Multiple exceptions (2) |-- io.reactivex.rxjava3.exceptions.TestException: ex3 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:341) |-- io.reactivex.rxjava3.exceptions.TestException: ex4 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:342) |-- io.reactivex.rxjava3.exceptions.CompositeException: 2 exceptions occurred. at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:337) |-- io.reactivex.rxjava3.exceptions.CompositeException.ExceptionOverview: Multiple exceptions (2) |-- io.reactivex.rxjava3.exceptions.TestException: ex1 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:335) |-- io.reactivex.rxjava3.exceptions.TestException: ex2 at io.reactivex.rxjava3.exceptions.CompositeExceptionTest.nestedMultilineMessage(CompositeExceptionTest.java:336) ``` -------------------------------- ### fromOptional Examples Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Illustrates creating RxJava sources (Flowable, Observable, Maybe) from java.util.Optional instances. Handles empty, present, and nullable values. ```java Flowable zero = Flowable.fromOptional(Optional.empty()); Observable one = Flowable.fromOptional(Optional.of(1)); Maybe maybe = Flowable.fromOptional(Optional.ofNullable(valueMaybeNull)); ``` -------------------------------- ### filter Example Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Filtering-Observables.md Filters emissions from the source Observable, only allowing items that satisfy the given predicate to pass through. This example filters for even numbers. ```java Observable.just(1, 2, 3, 4, 5, 6) .filter(x -> x % 2 == 0) .subscribe(System.out::println); ``` -------------------------------- ### RxJava Hello World Example Source: https://github.com/reactivex/rxjava/blob/4.x/README.md A basic 'Hello World' program using RxJava 4. Note that RxJava 4 components are in 'io.reactivex.rxjava4' and base classes/interfaces are in 'io.reactivex.rxjava4.core'. ```java package rxjava.examples; import io.reactivex.rxjava4.core.*; public class HelloWorld { public static void main(String[] args) { Flowable.just("Hello world").subscribe(System.out::println); } } ``` -------------------------------- ### Flowable concatArrayEagerDelayError Example Source: https://github.com/reactivex/rxjava/blob/4.x/README.md Shows an example of an operator name combining multiple suffixes: `concatArrayEagerDelayError`. This indicates eager processing of an array of sources with delayed error handling. ```java Flowable concatArrayEagerDelayError(Publisher... sources); ``` -------------------------------- ### Clone and Build RxJava Source Source: https://github.com/reactivex/rxjava/wiki/Getting-Started Commands to clone the RxJava repository and build it using Gradle. ```bash $ git clone git@github.com:ReactiveX/RxJava.git $ cd RxJava/ $ ./gradlew build ``` -------------------------------- ### flatMapSingle Example with Timed Delay Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Transforming-Observables.md Applies a function to each item emitted by an Observable, where the function returns a Single. Merges the results from these Singles. This example uses Single.timer to introduce a delay. ```java Observable.just(4, 2, 1, 3) .flatMapSingle(x -> Single.timer(x, TimeUnit.SECONDS).map(i -> x)) .blockingSubscribe(System.out::print); ``` -------------------------------- ### concatMapCompletableDelayError Example Source: https://github.com/reactivex/rxjava/wiki/Transforming-Observables Utilize `concatMapCompletableDelayError` to handle errors from `CompletableSource` emissions. Errors are delayed until all sources terminate, allowing other operations to complete. This example shows error handling and sequential processing with delays. ```java Observable source = Observable.just(2, 1, 3); Completable completable = source.concatMapCompletableDelayError(x -> { if (x.equals(2)) { return Completable.error(new IOException("Processing of item \"" + x + "\" failed!")); } else { return Completable.timer(1, TimeUnit.SECONDS) .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed")); } }); completable.doOnError(error -> System.out.println("Error: " + error.getMessage())) .onErrorComplete() .blockingAwait(); // prints: // Info: Processing of item "1" completed // Info: Processing of item "3" completed // Error: Processing of item "2" failed! ``` -------------------------------- ### Hello World in Java (Lambda) Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava A simple 'Hello World' implementation in Java using RxJava's Flowable and lambda expressions for subscription. ```java public static void hello(String... args) { Flowable.fromArray(args).subscribe(s -> System.out.println("Hello " + s + "!")); } ``` -------------------------------- ### Handle backpressure with Flowable.create and BackpressureStrategy Source: https://context7.com/reactivex/rxjava/llms.txt Implement backpressure handling for fast producers using Flowable.create with strategies like BUFFER, DROP, LATEST, ERROR, or MISSING. This example demonstrates buffering and testing with a limited requested count. ```java import io.reactivex.rxjava4.core.BackpressureStrategy; import io.reactivex.rxjava4.subscribers.TestSubscriber; Flowable flowable = Flowable.create(emitter -> { for (int i = 0; i < 1000; i++) { if (emitter.isCancelled()) return; emitter.onNext(i); } emitter.onComplete(); }, BackpressureStrategy.BUFFER); // also: DROP, LATEST, ERROR, MISSING TestSubscriber subscriber = new TestSubscriber<>(10); // request only 10 flowable.subscribe(subscriber); subscriber.assertValueCount(10); subscriber.assertNoErrors(); ``` -------------------------------- ### concatMapCompletable Example Source: https://github.com/reactivex/rxjava/wiki/Transforming-Observables Use `concatMapCompletable` to apply a function returning a `CompletableSource` to each item, subscribing to them sequentially. The resulting `Completable` completes when all sources are done. This example demonstrates processing items with delays and logging completion. ```java Observable source = Observable.just(2, 1, 3); Completable completable = source.concatMapCompletable(x -> { return Completable.timer(x, TimeUnit.SECONDS) .doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed")); }); completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed")) .blockingAwait(); // prints: // Info: Processing of item "2" completed // Info: Processing of item "1" completed // Info: Processing of item "3" completed // Info: Processing of all items completed ``` -------------------------------- ### RxJava retryUntil Operator Example Source: https://github.com/reactivex/rxjava/wiki/Error-Handling-Operators Illustrates the retryUntil operator, which resubscribes to the source observable until a BooleanSupplier returns true. This example uses a LongAdder to track errors and retries until the error count reaches 3. ```java LongAdder errorCounter = new LongAdder(); Observable source = Observable.interval(0, 1, TimeUnit.SECONDS) .flatMap(x -> { if (x >= 2) return Observable.error(new IOException("Something went wrong!")); else return Observable.just(x); }) .doOnError((error) -> errorCounter.increment()); source.retryUntil(() -> errorCounter.intValue() >= 3) .blockingSubscribe( x -> System.out.println("onNext: " + x), error -> System.err.println("onError: " + error.getMessage())); ``` -------------------------------- ### Import RxJava as Eclipse Project Source: https://github.com/reactivex/rxjava/wiki/How-to-Contribute Use this Gradle command to prepare the project for import into Eclipse as a standard Eclipse project. ```bash ./gradlew eclipse ``` -------------------------------- ### Ambiguous concatWith Lambda Example Source: https://github.com/reactivex/rxjava/blob/4.x/README.md This example shows an incorrect attempt to use a lambda expression with `concatWith`, which fails due to ambiguity among multiple overloads. The compiler may flag this as an error in newer versions. ```java someSource.concatWith(s -> Single.just(2)) .subscribe(System.out::println, Throwable::printStackTrace); ``` -------------------------------- ### RxJava retryWhen Operator Example Source: https://github.com/reactivex/rxjava/wiki/Error-Handling-Operators Shows the retryWhen operator, which allows for custom retry logic. Errors are passed to a separate observable, which determines whether to resubscribe. This example limits retries to 3 and introduces a delay between retries. ```java Observable source = Observable.interval(0, 1, TimeUnit.SECONDS) .flatMap(x -> { if (x >= 2) return Observable.error(new IOException("Something went wrong!")); else return Observable.just(x); }); source.retryWhen(errors -> { return errors.map(error -> 1) // Count the number of errors. .scan(Math::addExact) .doOnNext(errorCount -> System.out.println("No. of errors: " + errorCount)) // Limit the maximum number of retries. .takeWhile(errorCount -> errorCount < 3) // Signal resubscribe event after some delay. .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS)); }).blockingSubscribe( x -> System.out.println("onNext: " + x), Throwable::printStackTrace, () -> System.out.println("onComplete")); ``` -------------------------------- ### flatMapMaybe Example Source: https://github.com/reactivex/rxjava/wiki/Transforming-Observables Applies a function to each item that returns a MaybeSource, merging the results. Use when each source item may produce zero or one output item. ```java Observable.just(9.0, 16.0, -4.0) .flatMapMaybe(x -> { if (x.compareTo(0.0) < 0) return Maybe.empty(); else return Maybe.just(Math.sqrt(x)); }) .subscribe( System.out::println, Throwable::printStackTrace, () -> System.out.println("onComplete")); // prints: // 3.0 // 4.0 // onComplete ``` -------------------------------- ### Hello World in Clojure Source: https://github.com/reactivex/rxjava/wiki/How-To-Use-RxJava A 'Hello World' implementation in Clojure using Observable/from and a lambda for subscription. ```clojure (defn hello [&rest] (-> (Observable/from &rest) (.subscribe #(println (str "Hello " % "!"))))) ``` -------------------------------- ### Buffer with Boundary Supplier Emulation Source: https://github.com/reactivex/rxjava/wiki/What's-different-in-3.0 Demonstrates how to emulate the removed `buffer` operator with a boundary supplier using `Observable.defer`, `take(1)`, and `repeat()`. This allows for dynamic boundary creation. ```java source.buffer(Observable.defer(supplier).take(1).repeat()) ``` -------------------------------- ### sumLong Example Source: https://github.com/reactivex/rxjava/wiki/Mathematical-and-Aggregate-Operators Calculates the sum of a range of long numbers emitted by an Observable. ```java Observable numbers = Observable.rangeLong(1L, 100L); MathObservable.sumLong(numbers).subscribe((Long sum) -> System.out.println(sum)); // prints 5050 ``` -------------------------------- ### connect() Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Alphabetical-List-of-Observable-Operators.md Instructs a Connectable Observable to begin emitting items. ```APIDOC ## connect() ### Description Instructs a Connectable Observable to begin emitting items. ### Method N/A (Method signature) ### Endpoint N/A ### Parameters None ### Request Example N/A ### Response N/A ``` -------------------------------- ### window Example in RxJava Source: https://github.com/reactivex/rxjava/wiki/Transforming-Observables Illustrates the use of the window operator to group items into observable windows based on specified sizes and skips. Each window is then processed to join its elements into a delimited string. ```java Observable.range(1, 10) // Create windows containing at most 2 items, and skip 3 items before starting a new window. .window(2, 3) .flatMapSingle(window -> { return window.map(String::valueOf) .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add); }) .subscribe(System.out::println); // prints: // [1, 2] // [4, 5] // [7, 8] // [10] ``` -------------------------------- ### count Example Source: https://github.com/reactivex/rxjava/wiki/Mathematical-and-Aggregate-Operators Counts the number of items emitted by an Observable and emits the count as a Long. ```java Observable.just(1, 2, 3).count().subscribe(System.out::println); // prints 3 ``` -------------------------------- ### flatMapPublisher Example Source: https://github.com/reactivex/rxjava/blob/4.x/docs/Transforming-Observables.md Transforms each item emitted by a source Single into a Flowable by splitting a string and emitting each part. Subscribes to the resulting Flowable and prints each name. ```java Single source = Single.just("Kirk, Spock, Chekov, Sulu"); Flowable names = source.flatMapPublisher(text -> { return Flowable.fromArray(text.split(",")) .map(String::strip); }); names.subscribe(name -> System.out.println("onNext: " + name)); ``` -------------------------------- ### takeLast() example Source: https://github.com/reactivex/rxjava/wiki/Filtering-Observables Emits only the last *n* items from a source Observable. Requires the Observable class. ```java Observable source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); source.takeLast(4) .subscribe(System.out::println); // prints: // 7 // 8 // 9 // 10 ``` -------------------------------- ### Download Dependencies Command Source: https://github.com/reactivex/rxjava/wiki/Getting-Started Execute this command with a Maven pom file to download RxJava and its dependencies. ```bash $ mvn -f download-rxjava-pom.xml dependency:copy-dependencies ``` -------------------------------- ### RxJava `just` with explicit backpressure request Source: https://github.com/reactivex/rxjava/wiki/Backpressure-(2.0) Demonstrates creating a backpressure-aware source with `Flowable.just`. The `onStart` method explicitly requests 0 items, preventing any emissions. ```java Flowable.just(1).subscribe(new DisposableSubscriber() { @Override public void onStart() { request(0); } @Override public void onNext(Integer v) { System.out.println(v); } // the rest is omitted for brevity } ``` -------------------------------- ### elementAt Example Source: https://github.com/reactivex/rxjava/wiki/Filtering-Observables Emits the item at a specific index from the source. If the index is out of bounds, no item is emitted. ```java Observable source = Observable.generate(() -> 1L, (state, emitter) -> { emitter.onNext(state); return state + 1L; }).scan((product, x) -> product * x); Maybe element = source.elementAt(5); element.subscribe(System.out::println); // prints 720 ``` -------------------------------- ### StartWith Operator Example Source: https://github.com/reactivex/rxjava/wiki/Combining-Observables Use the startWith operator to emit a specified sequence of items before the source Observable begins emitting its items. This is useful for prepending data. ```java Observable names = Observable.just("Spock", "McCoy"); names.startWith("Kirk").subscribe(item -> System.out.println(item)); // prints Kirk, Spock, McCoy ``` -------------------------------- ### Perform Clean Build with Gradle Source: https://github.com/reactivex/rxjava/wiki/Getting-Started Command to perform a clean build of the RxJava project using Gradle. ```bash $ ./gradlew clean build ``` -------------------------------- ### Hot Source Backpressure Example Source: https://github.com/reactivex/rxjava/wiki/Backpressure-(2.0) Demonstrates a hot source (`PublishProcessor`) producing items faster than a background thread can consume, potentially leading to buffer overflow and `OutOfMemoryError` without backpressure handling. ```java PublishProcessor source = PublishProcessor.create(); source .observeOn(Schedulers.computation()) .subscribe(v -> compute(v), Throwable::printStackTrace); for (int i = 0; i < 1_000_000; i++) { source.onNext(i); } Thread.sleep(10_000); ```