Reactive Programming - RxJS vs EventEmitter in Node.js

Kishore Yekkanti picture Kishore Yekkanti · Aug 16, 2014 · Viewed 17.6k times · Source

Recently I've started looking at RxJS and RxJava(from Netflix) libraries which work on the concept of Reactive Programming.

Node.js works on the basis of event loops, which provides you all the arsenal for asynchronous programming and the subsequent node libraries like "cluster" help you to get best out of your multi-core machine. And Node.js also provides you the EventEmitter functionality where you can subscribe to events and act upon it asynchronously.

On the other hand if I understand correctly RxJS (and Reactive Programming in general) works on the principle of event streams, subscribing to event streams, transforming the event stream data asynchronously.

So, the question is what does using Rx packages in Node.js mean. How different is the Node's event loop, event emitter & subscriptions to the Rx's streams and subscriptions.

Answer

André Staltz picture André Staltz · Feb 6, 2016

Observables are not like EventEmitters. They may act like EventEmitters in some cases, namely when they are multicasted using RxJS Subjects, but usually they don't act like EventEmitters.

In short, an RxJS Subject is like an EventEmitter, but an RxJS Observable is a more generic interface. Observables are more similar to functions with zero arguments.

Consider the following:


function foo() {
  console.log('Hello');
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

Of course we all expect to see as output:

"Hello"
42
"Hello"
42

You can write the same behavior above, but with Observables:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

And the output is the same:

"Hello"
42
"Hello"
42

That's because both functions and Observables are lazy computations. If you don't call the function, the console.log('Hello') won't happen. Also with Observables, if you don't "call" (subscribe), the console.log('Hello') won't happen. Plus, "calling" or "subscribing" is an independent operation: two function calls trigger two separate side effects, and two Observable subscribes trigger two separate side effects. As opposed to EventEmitters which share the side effects and have eager execution regardless of the existence of subscribers, Observables have no shared execution and are lazy.


So far, no difference between the behavior of a function and an Observable. This StackOverflow question would have been better phrased as "RxJS Observables vs functions?".

Some people claim that Observables are asynchronous. That is not true. If you surround a function call with logs, like this:

console.log('before');
console.log(foo.call());
console.log('after');

You will obviously see the output:

"before"
"Hello"
42
"after"

And this is the same behavior with Observables:

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

And the output:

"before"
"Hello"
42
"after"

Which proves the subscription of foo was entirely synchronous, just like a function.


So what is really the difference between an Observable and a function?

Observables can "return" multiple values over time, something which functions cannot. You can't do this:

function foo() {
  console.log('Hello');
  return 42;
  return 100; // dead code. will never happen
}

Functions can only return one value. Observables, however, can do this:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100); // "return" another value
  observer.next(200);
});

console.log('before');
foo.subscribe(function (x) {
  console.log(x);
});
console.log('after');

With synchronous output:

"before"
"Hello"
42
100
200
"after"

But you can also "return" values asynchronously:

var foo = Rx.Observable.create(function (observer) {
  console.log('Hello');
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(function () {
    observer.next(300);
  }, 1000);
});

With output:

"before"
"Hello"
42
100
200
"after"
300

To conclude,

  • func.call() means "give me one value immediately (synchronously)"
  • obsv.subscribe() means "give me values. Maybe many of them, maybe synchronously, maybe asynchronously"

That's how Observables are a generalization of functions (that have no arguments).