RxJS Operators In-Depth - Part 1

RxJSJavaScript

What is an operator? Why do they exist? Observables are "sets". #

The first thing to understand about operators is why they exist. They exist because observables, as a type, allow us to treat events (or values over time) as a sets. Not a set like a JavaScript Set, rather a set as in a mathematical set, or a collection of things (akin to the concepts set forth by Set Theory).

In more elementary terms, any well-defined set will have operations that can be performed on it that can transform it into a new set of the same type. For example, let's say we have a truck load of apples. We could transform that into a truck load of sliced apples with an apple slicing machine. That same apple slicing machine could then be used on any truck load of apples to transform them into a truck load of sliced apples. In this case, the apple slicing machine would be considered an "operator" that maps apples to apple slices. Likewise, we could have truck loads of sugar, flour, eggs, etc. and combine those with truck loads of apple slices to make truck loads of apple pies using some sort of pie-making machine. So in this example, a truck load is the type of set, an apple slicing machine or a pie-making machine would be "operators", and sugar, apples, apple-slices, eggs, etc, would just be values carried by our type of set.

All types of sets have different operations that can be applied to them. For example, one of the most common types of sets we use are arrays. JavaScript arrays can be mapped, reduced, filtered, combined, expanded, and more. Arrays, are one-dimensional, finite, synchronous, sets of values with their own unique properties.

Observables are sets of events, or values-over-time. This makes observables two-dimensional, where they emit a linear set of values that could mapped, reduced, filtered, combined, expanded, etc., just like arrays; But the values may arrive asynchronously, adding an additional dimension of time. The temporal nature of observables means operations on it could include things like delays, timeouts, notified completions, etc. This is why there are so many more possible operations on observable than their are on something like an array.

Operators are machinery that can perform an operation on an observable transforming it into a new observable.

A basic operator, by implementation #

A basic operator implementation is any function that takes a source observable and returns a new result observable that consumes the source. That is to say, when you subscribe to the result, you subscribe to the source. So a very basic operator that doubled numbers from an observable might look like this:

A basic "doubling" operator

ts
import { Observable, of } from 'rxjs';
 
const double = (source: Observable<number>) =>
new Observable((subscriber) => {
const subscription = source.subscribe({
// Here we alter our value and "send it along" to our consumer.
next: (value) => subscriber.next(2 * value),
// We have to make sure errors and completions are also forwarded to the consumer.
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
});
return () => {
// We must make sure to tear down our subscription.
// when the returned observable is finalized.
subscription.unsubscribe();
};
});
 
// Usage like so:
 
of(1, 2, 3, 4).pipe(double).subscribe(console.log);
 
// Output:
// 2
// 4
// 6
// 8
 

What is Observable.prototype.pipe? #

At this point, there might be some confusion about what this pipe method is on our observable above. The short version? Pipe doesn't do much other than pass the observable instance to whatever operator functions ((source: Observable<T>) => Observable<R>)) you pass to it, in order, handing the return value of each one to the next operator function in the chain. For example, we could use our double operator above more than once by simply passing it to the pipe method twice:

ts
of(1, 2, 3, 4).pipe(double, double).subscribe(console.log);
 
// Output:
// 4
// 8
// 12
// 16
 

This is exactly equivalent to doing this, because each piped operator simply returns a new result observable:

ts
const doubled = of(1, 2, 3, 4).pipe(double);
 
const doubledAgain = doubled.pipe(double);
 
doubledAgain.subscribe(console.log);
 
// Output:
// 4
// 8
// 12
// 16
 

Building a reusable operator with higher-order functions #

What if we wanted to make an operator that could do more than just "double", what if we wanted to be able to "triple" or multiply by an arbitrary number? Functional programming and "higher-order functions" make that easy. A "higher-order function", most simply explained, is a function that returns a function. So in our case,
we could make a multiply operator that allowed us to pass a multiplier to it, using our double operator as the basis, like so:

ts
import { Observable, of } from 'rxjs';
 
const multiply = (multiplier: number) => (source: Observable<number>) =>
new Observable((subscriber) => {
const subscription = source.subscribe({
next: (value) => subscriber.next(multiplier * value),
error: (err) => subscriber.error(err),
complete: () => subscriber.complete(),
});
return () => {
subscription.unsubscribe();
};
});
 
// Usage like so:
 
of(1, 2, 3, 4).pipe(multiply(2)).subscribe(console.log);
 
// Output:
// 2
// 4
// 6
// 8
 

Even better, with this functional programming practice, you can recreate double and reuse it, just like in our previous example, if you so choose; This is done by simply calling the multiply function and retaining and reusing its returned operator function:

ts
const double = multiply(2);
 
const doubled = of(1, 2, 3, 4).pipe(double);
 
const doubledAgain = doubled.pipe(double);
 
doubledAgain.subscribe(console.log);
 
// Output:
// 4
// 8
// 12
// 16
 

This brings us to some slightly confusing terminology, is multiply an "operator"? Or does it return an "operator"? This is sort of splitting hairs, but within the RxJS core team, we generally refer to the higher-order functions as operators, and their return values "operator functions". Ultimately, how you refer to these things doesn't matter as long as the people around you understand what you're talking about. (I've also heard "operator" and "operator instance".)

Operators are "declarative" #

Yes, yes. There's imperative stuff under the hood. And/or you're imperatively adding operator functions to pipe, or however you want to split hairs about this. But, operators are made in such a way that they can be moved around in a declarative way within the observable pipeline and they can have different effects. For example, maybe you've got an operator to filter out odd numbers. The results will be very different depending on where you put that in your pipe. But what's interesting is that you don't really have to alter the surrounding code, the operators "operate" on incoming values with no care what's "upstream" from them, other than that the incoming type is correct (in this case, number):

ts
import { of, OperatorFunction, map, filter } from 'rxjs';
 
const double = map((n: number) => 2 * n);
const onlyEvens = filter((n: number) => n % 2 === 0);
 
const source = of(1, 2, 3, 4, 5);
 
source.pipe(onlyEvens, double).subscribe(console.log);
// Logs: 4, 8
 
source.pipe(double, onlyEvens).subscribe(console.log);
// Logs: 2, 4, 6, 8, 10
 

Getting "Deep": You're building functions declaratively. #

In my last post, I showed out observables are really just specialized functions. Therefore, if RxJS operators are used to wrap a source observable with a new result observable, that means that you're essentially taking a specialized function, and wrapping it in a new specialized function of the same shape that will call the original. So basically you're using functional programming to build new functionsl declaratively. This is really just a different way to think about observables, I'm not really asking any of you to build functions like this — imperative code is far better for building arbitrary functions — but consider naming operators and observables based off of what they do, rather than just what they emit. Just something to think about for now. These concepts will become more clear when you start to have more understanding of side-effects and how they relate to functions (and therefor observables). I'll likely write more about this in future posts as well.

Important tips #

When creating your own operators:

  1. Operators should be tested thoroughly. (This is something I'll get to in an upcoming post).
  2. Try to reuse RxJS operators whenever possible. They're very well tested and cover a lot of wierd corner cases you may not think of (See tip #1).
  3. Always make sure your operators are properly passing along all 3 notification types: Next, error, and complete.
  4. Always make sure you're cleaning up any subscriptions created within your operators.
  5. Operators should usually not (almost never) create subscriptions outside of the returned observable.