Understanding RxJava and how it can be used in application.

In this article, we will explain the basics and concepts of RxJava for our fundamental understanding, so this article is to support and help beginners. If you are already familiar with this concept of RxJava, you could also use it to refresh your mind of it.

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

What is Observer pattern

Observer pattern is a software design pattern in which an object, called the Subject, maintains the list of dependents, the Observers, and notifies them automatically of any state changes, usually by calling one of their methods.

Check the below diagram for how observer pattern works, we are not going to delve more into this topic since it is outside the scope of this topic, but needed to throw more light on this to enhance the understanding of RxJava because Rxjava uses this concept. In this observer pattern example, we could say the Subject is the same as Observable in RxJava

brief description of how subjects and observers are related. this image is from https://en.wikipedia.org/wiki/Observer_pattern#/media/File:W3sDesign_Observer_Design_Pattern_UML.jpg

Building Blocks of RxJava

There are two basic building blocks of RxJava and they are as follows:

  1. Observables: These represents the source of the data. They starts providing or emitting data when the subscriber (observers) starts listening to it. They can emit any number of data including zero items. They can terminate successfully or with error.
  2. Subscribers: These are the observers. Observables can have more than one subscribers or observers and when observables are emitted, a call to onNext() on each of the subscribers is triggered and when the observable completes its data flow successfully, then a call to the onComplete() on each of its subscribers is triggered and when there is an error in the data flow, then onError() on the subscribers are called.

There are four types of observables:

We can categorize observables based on how they emits their data source

  1. Observables: This emits zero or N number of items and terminates with success or an error. For example, when trying to download some data or a file on the internet and let’s say the files to be downloaded are more than one. We could use observable to so that it emits the value at a specific interval. This class is non-backpressured and the Observable operators, by default, run with a buffer size of 128 elements.
https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png

2. Flowable: This just like the Observable but the difference comes when observable is emitting huge number of values that cannot be consumed or received by the observers or observable is emitting value from 1 to 1000000 at a time and the observer can take only 200 items at a time, meaning we will end up with OutOfMemory Exception so that is where BackPressureStrategy come in handy.

BackpressureStrategy

The following are some of the backpressure strategy we can adopt:

a. Drop: This tends to discard the unnecessary items when it exceeds the buffers size limit.

b. Buffer: This buffers all the emitted items from the producer and watches for OutOfMemory exception.

c. Lattest: This keeps the most recent ones.

d. Error: This throws incase of over emission.

e. Missing: No strategy, it would throw a sooner or later somewhere on the downstream

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png

3. Single: This is used when the observable needs to emit only a single successful value or an error and we do not need onComplete() as it is for flowable and observable. We can use this for network calls since we normally get single request at a time.

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.legend.png

4. Maybe: This is used when the observable has to emit an item or no item.

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.legend.png

5. Completable: This is used when the observables has to do something without necessarily emitting a value.

https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.png

Now after this explanation on the various types of observables, we can now select any of this categories as and when our application demands it.

Now let’s use an example to enhance our understanding.

  1. Observables and Subscribers

We are going to learn how to create observables and subscribers together.

If you want to follow along with the examples, please compile the rxJava in your gradle using

implementation "io.reactivex.rxjava3:rxjava:3.0.1"
  1. Using Create method on Observables to create Observable
com.bisapp.rxjavaexamples D/RxJava Create: Emitting new Stuff !!
com.bisapp.rxjavaexamples D/RxJava Create: Observable consumed

After the code snippet is run, we get this result in the Logcat.

Now we have been able to know how to create Observables using Observable class, we could create observables too with Flowable, Single, Maybe and Completable class. You can try our hands on it.😉

Convenience methods to create observables

Apart from the create method on the observables class, we could also use the following methods to create observables.

  1. Observable.just(“Hello”): This helps us to create observables as a wrapper around other data types.
  2. Observable.fromIterable(): This takes an and emits their values in their order in the data structure.
  3. Observable.fromArray(): This takes likewise the array and emits its values in their order in the data structure.
  4. Observable.fromCallable(): This also allows to create an observable for a .
  5. Observable.fromFuture(): This allows to create an observable for a .
  6. Observable.interval(): An observable that emits objects in a given interval.
  7. Observable.range(): An observable that emits values from the start to the end of the range of values given.

Similar methods exists for the other data types, e.g , and .

Disposables

Observers do not listen to the observables externally and this means when we do not need observers to listen anymore, we need to detach them from their subscriptions. After subscription, we could get them in the form of disposable. We can use CompositeDisposable class too to store all our subscriptions. Then we dispose all the subscription when we do not need them again. If we do not unsubscribe, we may get OutOfMemory Exception

Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter emitter) {

emitter.onNext("Emitting new Stuff !!");
emitter.onComplete();
}
});
Disposable disposable = observable.subscribeWith(new DisposableObserver<Disposable>() {

@Override
public void onNext(Disposable o) {

Log.d("RxJava Create", o);
}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}


});
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);

//when we do not need the listener
//anymore, we need to dispose them all
//Mostly I dispose them in onDestroy() of activity or fragment
compositeDisposable.dispose();

Conclusion

We have talked about how RxJava is used and their building blocks with detailed explanation and some examples. In the next article, we will talk about Operators in RxJava. There are a whole lot of operators in RxJava that are helpful and fun to use to make our code simple and sweet. You cannot afford to miss that. You can follow me too and comment if anything is confusing or wants to add your own ideas to the article.

Thanks so much for your time.

If you find this article helpful, please share it with the community!

References

http://reactivex.io/RxJava/javadoc/

Senior Android Developer at ECOM Trading