RxJS Operators In-Depth - Part 2 - Subscription Chains
RxJSJavaScriptBefore you read this, you should probably checkout part 1.
The Subscriber: The Real Work Horse #
I talked a bit about subscribers in my article about creating an observable from scratch. While Observable gets all of the glory in RxJS, because it's the main type people interact with, it's the subscriber that actually does all of the work. The observable itself is little more than a wrapper around a function that creates a subscriber and sets it on its life of listening to the producer (A producer is whatever thing is producing values, whether that's a web socket, mouse movements, a loop over an array, or anything else).
Subscribers provide several guarantees:
- You can call
next0 ot n times. - You can only call
errororcompleteonce, then the subscriber isclosed. next,error, andcompletedo nothing when called on aclosedsubscriber.- The subscriber is linked to the subscription such that:
- If you call
unsubscribeon the subscription, the subscriber isclosed. - If you call
errororcompleteon the subscriber, the subcription isclosed. - All three scenarios,
unsubscribe,error, andcompletewill execute any registered finalization (aka teardown) logic.
- If you call
Operator Chains Are Subscriber Chains #
When you subscribe to an observable created with operators, you're setting up a "chain" of subscribers. As we saw in our basic operator example in part 1, each operator is effectively setting up a new subscriber that passes signals along to another subscriber in the chain. Let's have a look at our example from the other article:
tsimport {Observable } from 'rxjs';constdouble = (source :Observable <number>) =>newObservable ((subscriber ) => {// We have `subscriber` above, but also below,// the call to `source.subscribe` is creating a subscriber// that is calling the passed `next`, `error`, and `complete`// handlers.constsubscription =source .subscribe ({// Next in one subscriber passes to next in another subscriber.next : (value ) =>subscriber .next (2 *value ),// Error in one subscriber passes to error in another subscriber.error : (err ) =>subscriber .error (err ),// Complete in one subscriber passes to complete in another subscriber.complete : () =>subscriber .complete (),});return () => {// We must make sure to tear down our subscription.// when the returned observable is finalized.subscription .unsubscribe ();};});
This concept will be especially important when we start talking about error propagation in RxJS.
Three "Channels" - Next, Error, and Complete #
In observable subscriptions, producers communicate with consumers by pushing three different signals. This might be mundane knowledge at this point, but just to be thorough:
- Next - When the producer is pushing an event or value to the consumer.
- Error - When the producer is pushing an error to the consumer and has stopped sending values.
- Complete - The producer is pushing a notification to the consumer that is has successfully finished sending values without error.
Visualizing This: The Telephone-Line Chart #
These three channels can be visually represented as a sort of "telephone-line" chart of the push communications between the subscribers from our operators. (Note that the "telephone-line" chart is something I made up for a talk a while back, not really some googlable d3 chart type, haha)
For example, if we were to have the following:
ts/*** This subscriber chain of this will correllate to the diagrams below*/source$ .pipe (filter ((n ) =>n % 2 === 0),map ((n ) => {if (Math .random () < 0.1) {// A ten percent chance of erroringthrow newError ('oops');}returnn * 100;}),catchError ((err ) => {console .error (err );returnEMPTY ;}),).subscribe ({next :console .log ,complete : () =>console .log ('done'),});
...We could then summarize that in a "telephone line chart" depicting all of the subscribers involved like so:
In the chart above, each "box" represents a different Subscriber, 5 in all: One for the source observable, one for each operator in the pipeline, and one for our subscribe call at the end. The circles in the boxes represent next, error, and complete handlers (methods) on the subscribers. The arrows are meant to show how things are "wired up" — that is to say that next can or will call next on the following subscriber that is down the chain.
Visualizing Emitted Values/Events #
Given the above example, we could visualize an event being emitted by the source by simply highlighting the path it might take:
Let's start with what happens when source$ emits the number 2:
- Source subscriber receives a
2and sends it along to the filter subscriber. - The
2passes the filter subscriber's predicate ofn % 2 === 0, so it sends the2along to the map subscriber. - The map subscriber successfully maps
2to200with its mapping function ofn * 100and sends that200along to the catchError subscriber (In this scenario, we're saying we don't throw a random error here, more on that below). - The catchError subscriber doesn't do anything with nexted values, it only cares about error signals, so it sends the
200along to our consumer subscriber. - The consumer subscriber recieves the
200and logs it with its handlerconsole.log.
Visualizing Errors (and Completions) #
We can also now visualize what would happen in the event that our mapping function above hits that 10% chance of throwing an error. Let's have a look:
- Source subscriber receives a
2and sends it along to the filter subscriber. - The
2passes the filter subscriber's predicate ofn % 2 === 0, so it sends the2along to the map subscriber. - This time the map subscriber's check of
Math.random() < 0.1causes it to throw! Things get different, we no longer send the value along to the following next handler, instead we'll send an error signal along to the catchError subscriber's error handler.
- Because of the guarantees subscribers provide, calling the
errorhandler oncatchError's subscriber means that its subscriber is now "closed", and no more next, error, or complete calls can be made on it. This also means all subscribers before it in the chain will be notified of unsubscription and are also closed. (NOTE: If we had afinalizeoperator before this point, it would be called.) - The callback we provided to
catchErrorwill be called, and the error would be logged — because of the callback, not because of some feature in RxJS — and it returnsEMPTYwhich is an observable that synchronously completes.
- The new
EMPTYsubscriber takes the place of all of thecatchErrorsubscriber and notifies the consumer subscriber of completion.
- At this point, having called complete, both remaining subscribers are closed — because of the aformentioned guarantees.
Summary #
- A Subscriber, a combination of subscription and observer, is setup by calling
subscribeon an observable. - Subscribers provide guarantees that you cannot
next,error, orcompleteonce they're closed, and that they are closed when theirerrororcompletemethod is called. - Subscribers do all of the "work" in RxJS, not observables, which are just templates for setting up subscribers.
- Operators chain subscribers together, linking their next, error, and complete calls as appropriate for each operator.
- Subscribers will also finalize when closed, unsubscribing from — and therefor closing and finalizing — any subscriber "before" it in the chain.
- REITERATING THIS: If any subscriber in the chain has its
errororcompletehandler called, it is finalized and closed, as are all subscribers "before" it in the chain.
- Next: forEach is a code smell
- Previous: TC39 Pipeline Operator - Hack vs F#