RxJava in Different Flavours of Java

5 August 2014

The first official release of .Net Reactive Extensions (RX) was three years ago. In that time the interest in Reactive Programming has gone from strength to strength as developers look for ways to build applications which accommodate impatient and fault intolerant users; I want to be able to process all the data on the internet from my phone and I want to be able to do it now!

Reactive Extensions is a library for composing asynchronous and event-based programs using observable sequences. This allows you to use functional prose to write consolidated and readable code which makes asynchronous processing easier to reason about.

The Reactive Extensions programming model is pretty straightforward. You create Observables. The Observables emit data items which can be filtered and transformed. You can subscribe to the Observable to react to data as it is published. The Observables will tell Observers if there is an error during processing and when they have finished emitting data. The Observables can be implemented in a synchronous or asynchronous fashion; this is hidden from the Observers who are only interested in receiving data as it is emitted.

The nice folks at Netflix have ported the RX extensions over for the JVM. There is a good post on how and why they are using this programming model.

I recently walked through a couple of the Netflix How to start examples in Java so thought I would post them online as most of the samples are shown in Groovy and Clojure.

Creating an Observable from an Existing Data Structure

The library provides you with methods to convert existing objects, lists or arrays into Observables. In the following example we are using the from method to create an array into an Observable which will synchronously emit it’s items. The subscriber defines onNext, onError and onCompleted which are called by the Observable when it emits items, encounters an error and completes, respectively.

// In Java 7
Integer[] numbers = { 0, 1, 2, 3, 4, 5 };
Observable numberObservable = Observable.from(numbers);

numberObservable.subscribe(
        new Action1<Integer>() {
            @Override
            public void call(Integer incomingNumber) {
                System.out.println(incomingNumber);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable error) {
                System.out.println("Error in synchronous observable");
            }
        },
        new Action0() {
            @Override
            public void call() {
                System.out.println("This observable is finished");
            }

        }
);
// In Java 8
Integer[] numbers = { 0, 1, 2, 3, 4, 5 };
Observable numberObservable = Observable.from(numbers);

numberObservable.subscribe(
        (incomingNumber) -> System.out.println("incomingNumber " + incomingNumber),
        (error) -> System.out.println("Something went wrong" + ((Throwable)error).getMessage()),
        () -> System.out.println("This observable is finished")
);

The output:

incomingNumber 0
incomingNumber 1
incomingNumber 2
incomingNumber 3
incomingNumber 4
incomingNumber 5
This observable is finished

Creating an Observable via the create() method

The other way to create Observables is by calling the create method and implementing your own logic to call onNext, onError and onCompleted. In this example we create a synchronous Observable which pushes values to our subscriber.

// In Java 7
Observable createdObservable = Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        Subscriber mySubscriber = (Subscriber)subscriber;

        for (int ii = 0; ii < 10; ii++) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext("Pushed value " + ii);
            }
        }

        if (!subscriber.isUnsubscribed()) {
            subscriber.onCompleted();
        }
    }
});

createdObservable.subscribe(
        new Action1<String>() {
            @Override
            public void call(String incomingValue) {
                System.out.println(incomingValue);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                System.out.println("Something went wrong the observable");
            }
        },
        new Action0() {
            @Override
            public void call() {
                System.out.println("No more values will be pushed.");
            }
        }
);
// In Java 8
Observable.OnSubscribe<String> subscribeFunction = (s) -> {
    Subscriber subscriber = (Subscriber)s;

    for (int ii = 0; ii < 10; ii++) {
        if (!subscriber.isUnsubscribed()) {
            subscriber.onNext("Pushed value " + ii);
        }
    }

    if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();
    }
};
Observable createdObservable = Observable.create(subscribeFunction);

createdObservable.subscribe(
        (incomingValue) -> System.out.println("incomingValue " + incomingValue),
        (error) -> System.out.println("Something went wrong" + ((Throwable)error).getMessage()),
        () -> System.out.println("This observable is finished")
);

The output...

incomingNumber Pushed value 0
incomingNumber Pushed value 1
incomingNumber Pushed value 2
incomingNumber Pushed value 3
incomingNumber Pushed value 4
incomingNumber Pushed value 5
incomingNumber Pushed value 6
incomingNumber Pushed value 7
incomingNumber Pushed value 8
incomingNumber Pushed value 9
This observable is finished

A Filtered Asynchronous Observable

The RxJava library provides many methods for filtering your Observables. In this case we have created an asynchronous Observable and applied the skip filter which will ignore the stated number of items.

// In Java 7
Observable asyncObservable = Observable.create(new Observable.OnSubscribe() {

    @Override
    public void call(final Subscriber subscriber) {
        final Subscriber mySubscriber = (Subscriber)subscriber;

        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int ii = 0; ii < 10; ii++) {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext("Pushing value from async thread " + ii);
                    }
                }
            }
        });
        thread.start();
    }
});

asyncObservable.skip(5).subscribe(new Action1<String>() {
    @Override
    public void call(String incomingValue) {
        System.out.println(incomingValue);
    }
});
// In Java 8
Observable.OnSubscribe<String> subscribeFunction = (s) -> asyncProcessingOnSubscribe(s);

Observable asyncObservable = Observable.create(subscribeFunction);

asyncObservable.skip(5).subscribe((incomingValue) -> System.out.println(incomingValue));

private void asyncProcessingOnSubscribe(Subscriber s) {
    final Subscriber subscriber = (Subscriber)s;
    Thread thread = new Thread(() -> produceSomeValues(subscriber));
    thread.start();
}

private void produceSomeValues(Subscriber subscriber) {
    for (int ii = 0; ii < 10; ii++) {
        if (!subscriber.isUnsubscribed()) {
             subscriber.onNext("Pushing value from async thread " + ii);
        }
    }
}

The output...

Pushing value from async thread 5
Pushing value from async thread 6
Pushing value from async thread 7
Pushing value from async thread 8
Pushing value from async thread 9
Processing halted when error encountered

The Observables provide a mechanism for reporting errors to all subscribers. In this sample we generate an error which is reported to the Observer.

// In Java 8
Observable.OnSubscribe<String> subscribeFunction = (s) -> produceValuesAndAnError(s);

Observable createdObservable = Observable.create(subscribeFunction);

createdObservable.subscribe(
        (incomingValue) -> System.out.println("incoming " + incomingValue),
        (error) -> System.out.println("Something went wrong " + ((Throwable)error).getMessage()),
        () -> System.out.println("This observable is finished")
);

private void produceValuesAndAnError(Subscriber s) {
    Subscriber subscriber = (Subscriber)s;

    try {
        for (int ii = 0; ii < 50; ii++) {
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext("Pushed value " + ii);
            }

            if (ii == 5) {
                throw new Throwable("Something has gone wrong here");
            }
        }

        if (!subscriber.isUnsubscribed()) {
            subscriber.onCompleted();
        }
    } catch (Throwable throwable) {
        subscriber.onError(throwable);
    }
}

The output...

incoming Pushed value 0
incoming Pushed value 1
incoming Pushed value 2
incoming Pushed value 3
incoming Pushed value 4
incoming Pushed value 5
Handling error: Something has gone wrong here
Article By
blog author

Richard Bell

Head of Delivery