Understanding your Operators in RxJava

RxJava has helped with asynchronous computing in android and now doing some tasks on the background thread and listening on the main thread has become easy with the introduction of RxJava.

With Rxjava Operators, one could combine observables and do work on them at the same time with a small code and one could even choose to do some work at a specific interval with less code and a lot more.

In this tutorial, we would discuss more on the various operators that can be used more often in RxJava to make our work easy.

I have a full tutorials on RxJava operators on my youTube channel and it explains everything you will need to know about Rxjava.

What is RxJava Operator

Operators combine observables and most of them supports chaining of commands. Operators operates on observables and return observables. This allows you to apply the operators one after the other in a form of chains. Each operator modifies the observables from the previous operator.

There are other patterns, like the Builder Pattern, in which a variety of methods of a particular class operate on an item of that same class by modifying that object through the operation of the method. These patterns also allow you to chain the methods in a similar way. But while in the Builder Pattern, the order in which the methods appear in the chain does not usually matter, with the Observable operators order matters.

A chain of Observable operators do not operate independently on the original Observable that originates the chain, but they operate in turn, each one operating on the Observable generated by the previous operator in the chain.

Operators are categorized by how they play their role relating to the observables.

These are basically operators that originate observables;

a. Create: This is used to create observables from the scratch by calling on this method on the observable programmatically. Check how this method is used in java. Below is using the create method on observable but it can also be used on Single, Flowable, Maybe and Completable. Please check my previous article on RxJava to know how these other types of observables works.

Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter emitter) {

emitter.onNext("Emitting new Stuff !!");
emitter.onComplete();
}
});

b. Defer: This does not create the observable until the observer subscribes and it creates a new observables to each observer. We will try an example to see how this is working. First, we create observable with just() method and create an observable with defer() and check individual behaviour, then we can understand the use of defer.

Calling the deferExample method will give the following results in the logcat.

com.bisapp.rxjavaexamples D/RxJava Just: First page
com.bisapp.rxjavaexamples D/RxJava Defer: Second Page

c. Empty/Never/Error: This creates observables that have precise limited behaviors. The Empty creates observable that emits no items and it terminates normally. The Never creates observable that emits no items and does not terminates. The Error create observable that emits no items and throws out error.

//Creating Observable with empty
Observable.empty()
.subscribe(value->{
// value will be null
});
//Creating Observable with Error and I created test for this to show that Error will throw out. @Test
public void testErrorOccurred(){
//Creating Observable with Error
Throwable throwable = new Throwable("Error happened!!!");
Observable.error(throwable).test().assertError(throwable);
}
//creating Observable with never; you could run this test to see for yourself that it is not terminating. If you use assertTerminating(), the test will fail.@Test
public void testSubscriptionNotTerminating(){
Observable.never().test().assertNotTerminated();
}

d. From: This creates observables from other data types or data structures. Let’s look at these methods created using From: fromArray(), fromCallable(), fromFuture(). The following examples are how they are used

//Creating Observable using fromArray()
@Test
public void testObservableFromArray() {
String[] items = {"First", "Second", "Third"};
Observable.fromArray(items).test()
.assertComplete()
.assertValueCount(items.length)
.assertTerminated();
}
@Test
public void testObservableFromCallable() {
//this creates observable from callable object
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
int sum = 3 + 5;
return String.valueOf(sum);
}
}).test()
.assertTerminated()
.assertComplete()
.assertValue(value ->{
return value.equalsIgnoreCase("8");
});
}
@Test
public void testObservableFromFuture() {
//this creates observable from Future Task object
FutureTask futureTask = new FutureTask(new Callable<String>() {
@Override
public String call() throws Exception {
int sum = 3 + 5;
return String.valueOf(sum);
}
});

TestObserver<String > testSubscriber = new TestObserver();
TestScheduler testScheduler = new TestScheduler();
Observable.fromFuture(futureTask, 5,TimeUnit.SECONDS, testScheduler).doOnSubscribe(disposable -> {
futureTask.run();
//don't forget to call the run() on
// futureTask lest the test will hang since the task has not started
}).subscribe(testSubscriber);

testScheduler.triggerActions();

testSubscriber
.assertValue(value->{
return value.equalsIgnoreCase("8");
})
.assertComplete();
}

e. Interval: This creates observables that emits a sequence of integer spaced by a specific interval defined and this emission is infinite.

@Test
public void testObservableInterval() {
// This will not terminate, it will emit values from 0 to infinity
Observable.interval(2, TimeUnit.SECONDS)
.test()
.assertNotTerminated();

}

f. Just: This converts an object or a set of object into observable and omits that objects. The just method is like the from just that, the from uses into array or iterables.

@Test
public void testObservableJust() {
//This will terminate, it will emit just 2.
Observable.just(2)
.test()
.assertValue(v->{
return v.equals(2);
});

}

g. Range: This creates observable from a sequence of items. This is like the Interval but it has a start and number of count definitions.

@Test
public void testObservableRange() {
// This will terminate, and emit values from 2
// with a 9 number of items

Observable.range(2,9)
.test()
.assertValueCount(9);

}

i. Repeat: This creates observable that emits a particular item or sequence of items repeatedly. This can also accept a parameter to determine the number of times to repeat. Below example with emit 2 and 9 8 times.

private void observableFromRepeat() {
Observable.just(2, 9)
.repeat(8)
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v) );
});
}

j. Timer: This creates an observable that emits an item at a given delay.

private void observableFromTimer(){
//This will start emitting from 0 after a delay of 2 seconds
Observable.timer(2,TimeUnit.SECONDS)
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});
}

a. Buffer: This periodically gathers items into bundle from observable and emit the bundle instead of emitting them one after the other.

private void observableFromBuffer() {
//This will emit array of 8 items at a time
Observable.interval(2, TimeUnit.SECONDS)
.buffer(8)
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});
}

b. FlatMap: This transforms an emitted item from source observable into observables, then flattens the emissions from those into a single observable.

private void observableFromFlatMap() {
//This will emit value => Item from + emitted value from interval
Observable.interval(1, TimeUnit.SECONDS)
.flatMap(new Function<Long, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Long aLong) throws Exception {
String item = "Item from " + aLong;
return Observable.just(item);
}
})
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});
}

Differences Between FlatMap, ConcatMap and SwitchMap

FlatMap and ConcatMap works in the same way with difference in the order of items. FlatMap and ConcatMap takes emission from the original source and apply a function to modify the emission and create a new observable out of the modified item before emitting but with FlatMap, modification of the emission from the original source is done asynchronously, meaning that the order might not be preserved depending on how expensive the individual modification is. With ConcatMap, the modification is done sequentially and as a result, the order is preserved and this can be costly as it has to wait in turn regardlessly.

SwitchMap is quite different from the FlatMap and the ConcatMap in a way that, it unsubscribes from the previous source observable whenever a new emission starts. So it keeps the latest emission.

c. GroupBy: This divides observable into a set of observables that emits a different groups of items from the original observables organized by key.
The following example is grouping the array by the length of the values.

private void observableFromGroupBy() {
//This will emit value =>
// Rx Java: [a, d]
// Rx Java: [bb, cc]
// Rx Java: [eee]

Observable<String> mySource = Observable.just("a", "bb", "cc", "d", "eee");
mySource.groupBy(s -> s.length())
.flatMapSingle(new Function<GroupedObservable<Integer, String>, SingleSource<?>>() {
@Override
public SingleSource<?> apply(GroupedObservable<Integer,
String> integerStringGroupedObservable) throws Exception {
return integerStringGroupedObservable.toList();
}
})
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});
}

d. Map: This transforms the items emitted by the observable by applying function to each item. The following will transform the item to its length.

private void observableFromMap() {
//This will emit value =>
// Rx Java: 1
// Rx Java: 2
// Rx Java: 2
// Rx Java: 1
// Rx Java: 3

Observable<String> mySource = Observable.just("a", "bb", "cc", "d", "eee");
mySource.map(new Function<String, Object>() {
@Override
public Object apply(String s) throws Exception {
return s.length();
}
}).subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});
}

e. Scan: This applies a function to items emitted by an observable sequentially and emit each successive value. This also take into consideration the previous results and the current emission from the source observable. The following example uses a range(1,10) and we are going to use scan() to concatenate the previous result with the current emission.

private void observableFromScan(){

Observable.range(1,10)
.scan("", new BiFunction<String, Integer, String>() {
@Override
public String apply(String s, Integer integer)
throws Exception {
return s + "" +integer;
}
})
.subscribe(v -> {
Log.d("Rx Java", String.valueOf(v));
});

//Rx Java: 1
//Rx Java: 12
//Rx Java: 123
//Rx Java: 1234
//Rx Java: 12345
//Rx Java: 123456
//Rx Java: 1234567
//Rx Java: 12345678
//Rx Java: 123456789
//Rx Java: 12345678910

//Checking from the results, we are getting the
// previous result in the
// subsequent result but we are using
// String concatenation in the mapping function.

}

f. window: This periodically subdivides the items from the observable into window observable and emit these windows rather than emitting the items once at a time.

@Test
public void testObservableTimer() {
Observable
.range(0, 10)
.window(2,TimeUnit.SECONDS)
.subscribe(v->{
v.subscribe(System.out::print);
});

}
//This gives a result of
0123456789

a. Debounce: This only emits an item from observables when a particular timespan has passed without it emitting another item.

@Test
public void testObservableDebounce() {
Observable.range(1, 1200)
.debounce(3, TimeUnit.MICROSECONDS)
.subscribe(v->{
System.out.println(v);
});

}
97
143
252
1200

b. Distinct: This suppresses duplicate items emitted by the observable. As the name implies, distinct items only.

@Test
public void testObservableDebounce() {

Observable.fromArray(1, 1,2,4,5,4,5,4,3,5)
.distinct()
.subscribe(v->{
System.out.println(v);
});

}
1
2
4
5
3

c. ElementAt: This only emits item n emitted by an observable.

@Test
public void testObservableElementAt() {

Observable.fromArray(1, 1,2,4,5,4,5,4,3,5)
.elementAt(4)
.subscribe(v->{
System.out.println(v);
});

}
This gives a value of 5 since 5 is at 4th index

d. Filter: This only emits those elements from observable that passed a predicate test. The following examples steps through using of filter to get even numbers.

@Test
public void testObservableFilter() {

Observable.range(1,10)
.filter(integer -> integer % 2 == 0)
.subscribe(v -> {
System.out.println(v);
});

}
2
4
6
8
10

c. First: This emits an the first item from an observable.

@Test
public void testObservableFirst() {

Observable.range(1,10)
.first(2)//this is default item
.subscribe(v -> {
System.out.println(v);
});

}
This will emit the first item which is 1.

d. IgnoreElements: This does not emit any item from the observable but mirrors its termination notification.

@Test
public void testObservableIgnoreElement() {

Observable.range(1,10)
.ignoreElements()
.subscribe();

}

e. Last: This emits the only last item from the observable.

@Test
public void testObservableLast() {

Observable.range(1,10)
.last(3) //this is default item
.subscribe(System.out::println);

}
This emits the last item in the range which is 10.

f. Sample: This emits the most recent item from an observable within periodic time interval.

@Test
public void testObservableSample() {

Observable.range(1,900)
.sample(1,TimeUnit.MICROSECONDS)
.subscribe(System.out::println);

}
268
648
900

g. Skip: This suppresses the first n items emitted by an observable.

@Test
public void testObservableSkip() {

Observable.range(1,12)
.skip(7)
.subscribe(System.out::println);

}
8
9
10
11
12

h. SkipLast: This suppress the last n items emitted by the observable.

@Test
public void testObservableSkipLast() {

Observable.range(1,12)
.skipLast(7)
.subscribe(System.out::println);

}
1
2
3
4
5

i. Take: This emits only the first n items emitted by an Observable.

@Test
public void testObservableTake() {

Observable.range(1,12)
.take(3)
.subscribe(System.out::println);

}
1
2
3

j. TakeLast: This emits only the last n items emitted by an observable.

@Test
public void testObservableTakeLast() {

Observable.range(1,12)
.takeLast(3)
.subscribe(System.out::println);

}
10
11
12

a. CombineLatest: when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function.

@Test
public void testObservableCombineLatest() {

Observable<Integer> integerObservable = Observable.just(1, 3, 5, 6);
Observable<String> stringObservable = Observable.just("a", "r", "ysd");
Observable.combineLatest(stringObservable, integerObservable, new BiFunction<String, Integer, Object>() {
@Override
public Object apply(String s, Integer integer) throws Exception {
return s + integer;
}
}).subscribe(System.out::println);

}
ysd1
ysd3
ysd5
ysd6

b. Join: This combines items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable. The first observable emission is determined by the first function which returns an observables of what keeps the first observable item window open and second function to keep the second observable item window open. If you find this join operator challenging, I will recommend this blog, it takes this Join operator and treats it.

public void testObservableJoin() {

Observable<Long> firstObservable = Observable.interval(0, 2, TimeUnit.SECONDS).take(3);
Observable<Long> secondObservable = Observable.interval(2, 1, TimeUnit.SECONDS).take(3);
firstObservable.join(secondObservable, new Function<Long, Observable<Long>>() {
@Override
public Observable<Long> apply(Long aLong) throws Exception {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Function<Long, Observable<Long>>() {
@Override
public Observable<Long> apply(Long bLong) throws Exception {
return Observable.timer(1, TimeUnit.SECONDS);
}
}, new BiFunction<Long, Long, String>() {
@Override
public String apply(Long aLong, Long tRight) throws Exception {
return aLong + " null " + tRight;
}
}).subscribe(v->{
Log.d("Rx Java", String.valueOf(v));
});

}
This will be the results of this methodRx Java: 0 null 0
Rx Java: 1 null 0
Rx Java: 1 null 1
Rx Java: 2 null 1
Rx Java: 2 null 2

c. Merge: This combines multiple observables into one by merging their emissions. Merge may interleave the items emitted by the merged Observables (a similar operator, Concat, does not interleave items, but emits all of each source Observable’s items in turn before beginning to emit items from the next source Observable).

@Test
public void testObservableMerge() {

TestObserver<String> testSubscriber = new TestObserver<>();

Observable.merge(
Observable.fromArray(new String[]{"I", "am"}),
Observable.fromArray(new String[]{"trying to learn", "RxJava"})
).subscribe(testSubscriber);

testSubscriber.assertValues("I", "am", "trying to learn", "RxJava");

}

d. StartWith: This emits a specified sequence of items before beginning to emit the items from the source Observable.

@Test
public void testObservableStartWith() {

TestObserver<String> testSubscriber = new TestObserver<>();
Observable<String> first = Observable.fromArray(new String[]{"I", "am"});
Observable<String> second = Observable.fromArray(new String[]{"trying to learn", "RxJava"});

first.startWith(second).subscribe(testSubscriber);

testSubscriber.assertValues( "trying to learn", "RxJava","I", "am");

}

e. zip: This combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function.

@Test
public void testObservableZip() {

TestObserver<String> testSubscriber = new TestObserver<>();
List<String> zippedStrings = new ArrayList<>();

Observable.zip(
Observable.fromArray(new String[]{"Simple", "Moderate", "Complex"}),
Observable.fromArray(new String[]{"Solutions", "Success", "Hierarchy"}),
(str1, str2) -> str1 + " " + str2).subscribe(zippedStrings::add);

Assert.assertThat(zippedStrings.size(), is(3));
//Assert.assertThat(zippedStrings,containsString("Simple Solutions", "Moderate Success", "Complex Hierarchy"));

}

Error Handling Operators

a. Error: This operator intercepts an onError notification from the source Observable and, instead of passing it through to any observers, replaces it with some other item or sequence of items, potentially allowing the resulting Observable to terminate normally or not to terminate at all.

private String string;

@Test
public void testObservableCatch() {
AtomicReference<String> string = new AtomicReference<>();
Observable.error(new Throwable("Error One"))
.onErrorReturn(throwable -> {
string.set("An error occured");
return string;
}).subscribe(v->{
System.out.println(v);
},err->{
System.out.println(err.getLocalizedMessage());
});
//This is using onErrorReturnItem() Observable.error(new Throwable("Error One"))
.onErrorReturnItem("This is what I want when there is an error")
.subscribe(v->{
System.out.println(v);
});
//This is using onErrorReturnNext()
Observable.error(new Throwable("Error One"))
.onErrorResumeNext(Observable.just("This is just an error result !!!"))
.subscribe(v -> {
System.out.println(v);
});
}

b. Retry: This is used if a source Observable emits an error and want to resubscribe to it in the hopes that it will complete without error.

@Test
public void testObservableRetry(){
Integer ATTEMPTS = 4;
AtomicReference<Integer> COPY_ATTEMPTS = new AtomicReference<>(4);
Observable<String> retryObservable = Observable.fromCallable(() -> {
if (Math.random() < 0.1) {
return "Working";
} else {
System.out.println(COPY_ATTEMPTS.
getAndSet(COPY_ATTEMPTS.get() - 1));
throw new RuntimeException("Transient");
}
});

retryObservable
.retryWhen(throwableObservable -> {
return throwableObservable.zipWith(Observable.range(1, ATTEMPTS),
(BiFunction<Throwable, Integer, Object>) (throwable, integer) ->
integer < ATTEMPTS? Observable.timer(20,SECONDS)
: Observable.error(throwable));
})
.subscribe(v -> {
System.out.println(v);
},err->{
System.out.println(err.getLocalizedMessage());
});
}
//Try this test function to see the behavior of retryWhen()

Conditional and Boolean Operators

a. All: This determines whether all items emitted by an Observable meet some criteria.

@Test
public void testObservableAll(){
Observable.just(2,4).
all(integer -> integer % 2 == 0).subscribe(v->{
System.out.println(v);
});
}
//This returns true

b. Contains: This determines whether an Observable emits a particular item or not.

@Test
public void testObservableContains(){
Observable.just(2,4).
contains(4).subscribe(v->{
System.out.println(v);
});
}
//This returns true

c. Amb: Given two or more source Observables, emit all of the items from only the first of these Observables which will start with emission of an item first.

@Test
public void testObservableAmb(){
Observable.ambArray(Observable.just("Item one"),Observable.range(1,4)).
subscribe(v -> {
System.out.println(v);
});
}
This returns Item one since it started with the emission.

Thank you for staying with me through this long journey. Share and like if you find it interesting. You can reference out these examples from my github project.

Senior Android Developer at ECOM Trading