RxJS Operators In-Depth - Part 2 - Subscription Chains

RxJSJavaScript

Before you read this, you should probably checkout part 1.

The Subscriber: The Real Work Horse #

I talked a bit about subscribers in my article about creating an observable from scratch. While Observable gets all of the glory in RxJS, because it's the main type people interact with, it's the subscriber that actually does all of the work. The observable itself is little more than a wrapper around a function that creates a subscriber and sets it on its life of listening to the producer (A producer is whatever thing is producing values, whether that's a web socket, mouse movements, a loop over an array, or anything else).

Subscribers provide several guarantees:

  1. You can call next 0 ot n times.
  2. You can only call error or complete once, then the subscriber is closed.
  3. next, error, and complete do nothing when called on a closed subscriber.
  4. The subscriber is linked to the subscription such that:
    1. If you call unsubscribe on the subscription, the subscriber is closed.
    2. If you call error or complete on the subscriber, the subcription is closed.
    3. All three scenarios, unsubscribe, error, and complete will execute any registered finalization (aka teardown) logic.

Operator Chains Are Subscriber Chains #

When you subscribe to an observable created with operators, you're setting up a "chain" of subscribers. As we saw in our basic operator example in part 1, each operator is effectively setting up a new subscriber that passes signals along to another subscriber in the chain. Let's have a look at our example from the other article:

ts
import { Observable } from 'rxjs';
 
const double = (source: Observable<number>) =>
new Observable((subscriber) => {
// We have `subscriber` above, but also below,
// the call to `source.subscribe` is creating a subscriber
// that is calling the passed `next`, `error`, and `complete`
// handlers.
const subscription = source.subscribe({
// Next in one subscriber passes to next in another subscriber.
next: (value) => subscriber.next(2 * value),
// Error in one subscriber passes to error in another subscriber.
error: (err) => subscriber.error(err),
// Complete in one subscriber passes to complete in another subscriber.
complete: () => subscriber.complete(),
});
return () => {
// We must make sure to tear down our subscription.
// when the returned observable is finalized.
subscription.unsubscribe();
};
});
 

This concept will be especially important when we start talking about error propagation in RxJS.

Three "Channels" - Next, Error, and Complete #

In observable subscriptions, producers communicate with consumers by pushing three different signals. This might be mundane knowledge at this point, but just to be thorough:

  1. Next - When the producer is pushing an event or value to the consumer.
  2. Error - When the producer is pushing an error to the consumer and has stopped sending values.
  3. Complete - The producer is pushing a notification to the consumer that is has successfully finished sending values without error.

Visualizing This: The Telephone-Line Chart #

These three channels can be visually represented as a sort of "telephone-line" chart of the push communications between the subscribers from our operators. (Note that the "telephone-line" chart is something I made up for a talk a while back, not really some googlable d3 chart type, haha)

For example, if we were to have the following:

ts
/**
* This subscriber chain of this will correllate to the diagrams below
*/
source$
.pipe(
filter((n) => n % 2 === 0),
map((n) => {
if (Math.random() < 0.1) {
// A ten percent chance of erroring
throw new Error('oops');
}
return n * 100;
}),
catchError((err) => {
console.error(err);
return EMPTY;
}),
)
.subscribe({
next: console.log,
complete: () => console.log('done'),
});
 

...We could then summarize that in a "telephone line chart" depicting all of the subscribers involved like so:

Basic telephone line chart, showing 5 boxes, each with three nodes, one for next, one for error, and one for complete, connected to one another by arrows.

In the chart above, each "box" represents a different Subscriber, 5 in all: One for the source observable, one for each operator in the pipeline, and one for our subscribe call at the end. The circles in the boxes represent next, error, and complete handlers (methods) on the subscribers. The arrows are meant to show how things are "wired up" — that is to say that next can or will call next on the following subscriber that is down the chain.

Visualizing Emitted Values/Events #

Given the above example, we could visualize an event being emitted by the source by simply highlighting the path it might take:

Let's start with what happens when source$ emits the number 2:

The same as the above chart, only the arrows between the 'next' nodes are highlighted in green
  1. Source subscriber receives a 2 and sends it along to the filter subscriber.
  2. The 2 passes the filter subscriber's predicate of n % 2 === 0, so it sends the 2 along to the map subscriber.
  3. The map subscriber successfully maps 2 to 200 with its mapping function of n * 100 and sends that 200 along to the catchError subscriber (In this scenario, we're saying we don't throw a random error here, more on that below).
  4. The catchError subscriber doesn't do anything with nexted values, it only cares about error signals, so it sends the 200 along to our consumer subscriber.
  5. The consumer subscriber recieves the 200 and logs it with its handler console.log.

Visualizing Errors (and Completions) #

We can also now visualize what would happen in the event that our mapping function above hits that 10% chance of throwing an error. Let's have a look:

The same as the above chart, only this time the green arrows stop at the map subscriber, and the next node is red.
  1. Source subscriber receives a 2 and sends it along to the filter subscriber.
  2. The 2 passes the filter subscriber's predicate of n % 2 === 0, so it sends the 2 along to the map subscriber.
  3. This time the map subscriber's check of Math.random() < 0.1 causes it to throw! Things get different, we no longer send the value along to the following next handler, instead we'll send an error signal along to the catchError subscriber's error handler.
All subscribers before, and including catchError are grayed out, representing how they are closed and no longer active
  1. Because of the guarantees subscribers provide, calling the error handler on catchError's subscriber means that its subscriber is now "closed", and no more next, error, or complete calls can be made on it. This also means all subscribers before it in the chain will be notified of unsubscription and are also closed. (NOTE: If we had a finalize operator before this point, it would be called.)
  2. The callback we provided to catchError will be called, and the error would be logged — because of the callback, not because of some feature in RxJS — and it returns EMPTY which is an observable that synchronously completes.
All grayed out subscribers have vanished, and are replaced with a new EMPTY subsriber, which shows the consumer subscriber being notified of completion
  1. The new EMPTY subscriber takes the place of all of the catchError subscriber and notifies the consumer subscriber of completion.
Both remaining subscribers are grayed out, representing the fact they are closed.
  1. At this point, having called complete, both remaining subscribers are closed — because of the aformentioned guarantees.

Summary #