The first official release of .Net Reactive Extensions (RX) was three years ago. In that time the interest in Reactive Programming has gone from strength to strength as developers look for ways to build applications which accommodate impatient and fault intolerant users; I want to be able to process all the data on the internet from my phone and I want to be able to do it now!
Reactive Extensions is a library for composing asynchronous and event-based programs using observable sequences. This allows you to use functional prose to write consolidated and readable code which makes asynchronous processing easier to reason about.
The Reactive Extensions programming model is pretty straightforward. You create Observables. The Observables emit data items which can be filtered and transformed. You can subscribe to the Observable to react to data as it is published. The Observables will tell Observers if there is an error during processing and when they have finished emitting data. The Observables can be implemented in a synchronous or asynchronous fashion; this is hidden from the Observers who are only interested in receiving data as it is emitted.
The nice folks at Netflix have ported the RX extensions over for the JVM. There is a good post on how and why they are using this programming model.
I recently walked through a couple of the Netflix How to start examples in Java so thought I would post them online as most of the samples are shown in Groovy and Clojure.
Creating an Observable from an Existing Data Structure
The library provides you with methods to convert existing objects, lists or arrays into Observables. In the following example we are using the from method to create an array into an Observable which will synchronously emit it’s items. The subscriber defines onNext, onError and onCompleted which are called by the Observable when it emits items, encounters an error and completes, respectively.
Creating an Observable via the create() method
The other way to create Observables is by calling the create method and implementing your own logic to call onNext, onError and onCompleted. In this example we create a synchronous Observable which pushes values to our subscriber.
A Filtered Asynchronous Observable
The RxJava library provides many methods for filtering your Observables. In this case we have created an asynchronous Observable and applied the skip filter which will ignore the stated number of items.
The Observables provide a mechanism for reporting errors to all subscribers. In this sample we generate an error which is reported to the Observer.