An Introduction to RxJS

Jun 28 2016

An Introduction to RxJS

RxJS is an open source JavaScript library originally created at Microsoft.
RxJS is an implementation of the of the Reactive Extensions (Rx) in javascript. Rx is a reactive programming model that allows developers to easily compose asynchronous streams of data. It provides a common interface to combine and transform data from wildly different sources.

In this blog post I will describe the main features and capabilities of this library with examples of how to use them.

Reactive programing

First of all we need to define what reactive programing is,  and what problems it comes to solve.
According to Wikipedia Reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow.

In simple words: instead of handling data as something static, we handle it as stream.  The stream can be built of events (such as mouse events, network events etc), arrays or any other sequence of data. Reactive programing means to react to the changes of the stream.
The best way to demonstrate it, is to imagine working on an excel sheet. You fill a column with a series of numbers - this is the observable.
Then, in the end you put a cell that sums them all -  the observer.
When the value of one of the cells changes, the sums cell (observer) react to it and changes its value.

Why use Reactive programing

Web apps today become more and more complicated and highly interactive. It demands a lot of effort managing the state of the application, running asynchronous operations, keep the UI updated and much more. Reactive programing let the developer focus on the functionality rather than managing all that stuff.  As a developer I would like to concentrate on the business logic of the application and its UX rather than synchronize and managing states.

What is the advantage of reactive programing over other asynchronous methods like  promises or pub/sub (publish-subscriber)?

When using pub/sub or promises (or callbacks), we can achieve the same, but we have to implement the code that react to the event. For instance, if we write a function that makes an ajax call that brings an array of item, converts the items, filters, sorts, and  shows only the first 10, we have to work hard. Reactive programing gives us the tools to do it easily and efficiently:

Observable.get(url).map(...).filter(...).sort(...).limit(10).

All that left for us to implement the methods that do the logic.
Because Reactive programing uses the principles of functional programing, the app can be much more predictable, and easy to maintain.

Observables and Observers

The basic building blocks of RxJS are observables and observers. Observables are the stream of the data. The observers observe the stream and react to it.

RxJS is based on two design patterns: the Observer and the iterator. I will explain them both briefly because it can understanding the concept of an observable.

Observer pattern

The idea is to have an object that is called the producer and an object that is called the listener. The listener subscribe to the producer, and when the producer is being updated, it notifies its subscribers (listeners), and then the listeners can execute their logic.

Iterator pattern

An object that implement 2 methods:

  • next()
  • hasNext()

It allows the customer of the object to iterate the object’s items.

Observables combines these two patterns: It iterates its items but instead of its consumers asking for the next value, it pushes it to them.
When using observables we do not need to pull the data from them, instead the will notify us of every change.

Rx Implementation

Rx provides a lot of way to create observables. The basic one is by calling create with observer as a parameter:

var observable = Rx.Observable.create(function(observer) {
  observer.onNext('1');
  observer.onNext('2');
  observer.onCompleted();
});

As we can see, the observable execute the observer methods.
The observer will be implemented like this:

var observer = Rx.Observer.create(
  function onNext(x) { console.log(x); },
  function onError(error) { console.log(error); },
  function onCompleted() { console.log('Done!'); }
);

It implements 3 methods:

  • onNext
  • onError
  • onCompleted

If we want to activate the observable we will subscribe the observer:

observable.subscribe(observer).

The output will be:

1
2
Done!

A shorter version of it can be:

observable.subscribe(
  (x) => console.log(x),
  (error) => console.log(error),
  () => console.log(Done!)
)

Observables operators

As mentioned before, Rx provides many useful operators that creates an observables:

Array: Rx.Observable.from(['a', 'b', 'c'])
Events: Rx.Observable.fromEvent(document, ‘click’)
Network: Rx.DOM.get('http://myurl.com’')
Interval: Rx.Observable.interval(1000) // the observable will produce increasing number every second).
Range: Rx.Observable.range(1, 10) // the observable will produce the numbers from 1 to 10).
fromPromise, fromCallback and more...

Full list can be found here

Great feature of Rx is the ability to manipulate the values of the observable. Rx provides set of operators that simplify the manipulation. This is done by attaching the operator to the observable. Because the result of the operator is also an observable, it allows us to make a pipeline of operators:

Rx.Observable.range(1, 10).filter((x) => x < 5).map((x) => x*2).reduce((agg,x) => agg + x)

The result will be:

[1,2,3,4,5,6,7,8,9,10] -> [1,2,3,4] -> [2,4,6,8] -> 20

It sticks to the principles of array, but it works with many more data sources, it can run asynchronously and more features that I will describe later.
The main operators are:

  • Filter: filter values from the sequence.
  • Map: convert each value
  • Reduce: aggregate the values.
  • flatMap: concatenation of observable.

More cool features

  • Cancellation: we can cancel the operations by disposal of the subscription. The observable will stop.
  • Handling error: errors in asynchronous code is always a pain, Rx provides the onError method in the observer to react to the error and handle it correctly (like promises).
  • Retries: we can use an operator to retry the operation in case of failure. It saves us the work of implement it our own.
  • Rx subjects: an object that acts like a mediator that allow us to change the behaviour of the observable (mainly to change it from synchronous to asynchronous and vise versa).
  • Sample: If we got a large amount of events that we can’t handle or it is unnecessary to handle, sample (or flatMapLatest) reduces the frequency of the events.

conclusion

In the dynamic world we live in, things are happening fast, the application needs to be up to date all the time. And the user doesn’t want to wait or to press a button for it. Reactive programing and RxJS come to solve it. They bring  modern approach to web development and provide vast varied of tools. It is very efficient, makes the development time shorter, makes the code more readable, with less bugs and much more maintainable.

Refences:

Amitai B.
Software Developer
Back to Blog