RxJS Operators In-Depth - Part 2 - Subscription Chains
RxJSJavaScriptBefore you read this, you should probably checkout part 1.
The Subscriber: The Real Work Horse #
I talked a bit about subscribers in my article about creating an observable from scratch. While Observable gets all of the glory in RxJS, because it's the main type people interact with, it's the subscriber that actually does all of the work. The observable itself is little more than a wrapper around a function that creates a subscriber and sets it on its life of listening to the producer (A producer is whatever thing is producing values, whether that's a web socket, mouse movements, a loop over an array, or anything else).
Subscribers provide several guarantees:
- You can call
next
0 ot n times. - You can only call
error
orcomplete
once, then the subscriber isclosed
. next
,error
, andcomplete
do nothing when called on aclosed
subscriber.- The subscriber is linked to the subscription such that:
- If you call
unsubscribe
on the subscription, the subscriber isclosed
. - If you call
error
orcomplete
on the subscriber, the subcription isclosed
. - All three scenarios,
unsubscribe
,error
, andcomplete
will execute any registered finalization (aka teardown) logic.
- If you call
Operator Chains Are Subscriber Chains #
When you subscribe to an observable created with operators, you're setting up a "chain" of subscribers. As we saw in our basic operator example in part 1, each operator is effectively setting up a new subscriber that passes signals along to another subscriber in the chain. Let's have a look at our example from the other article:
ts
import {Observable } from 'rxjs';constdouble = (source :Observable <number>) =>newObservable ((subscriber ) => {// We have `subscriber` above, but also below,// the call to `source.subscribe` is creating a subscriber// that is calling the passed `next`, `error`, and `complete`// handlers.constsubscription =source .subscribe ({// Next in one subscriber passes to next in another subscriber.next : (value ) =>subscriber .next (2 *value ),// Error in one subscriber passes to error in another subscriber.error : (err ) =>subscriber .error (err ),// Complete in one subscriber passes to complete in another subscriber.complete : () =>subscriber .complete (),});return () => {// We must make sure to tear down our subscription.// when the returned observable is finalized.subscription .unsubscribe ();};});
This concept will be especially important when we start talking about error propagation in RxJS.
Three "Channels" - Next, Error, and Complete #
In observable subscriptions, producers communicate with consumers by pushing three different signals. This might be mundane knowledge at this point, but just to be thorough:
- Next - When the producer is pushing an event or value to the consumer.
- Error - When the producer is pushing an error to the consumer and has stopped sending values.
- Complete - The producer is pushing a notification to the consumer that is has successfully finished sending values without error.
Visualizing This: The Telephone-Line Chart #
These three channels can be visually represented as a sort of "telephone-line" chart of the push communications between the subscribers from our operators. (Note that the "telephone-line" chart is something I made up for a talk a while back, not really some googlable d3 chart type, haha)
For example, if we were to have the following:
ts
/*** This subscriber chain of this will correllate to the diagrams below*/source$ .pipe (filter ((n ) =>n % 2 === 0),map ((n ) => {if (Math .random () < 0.1) {// A ten percent chance of erroringthrow newError ('oops');}returnn * 100;}),catchError ((err ) => {console .error (err );returnEMPTY ;}),).subscribe ({next :console .log ,complete : () =>console .log ('done'),});
...We could then summarize that in a "telephone line chart" depicting all of the subscribers involved like so:
In the chart above, each "box" represents a different Subscriber, 5 in all: One for the source observable, one for each operator in the pipeline, and one for our subscribe call at the end. The circles in the boxes represent next, error, and complete handlers (methods) on the subscribers. The arrows are meant to show how things are "wired up" — that is to say that next can or will call next on the following subscriber that is down the chain.
Visualizing Emitted Values/Events #
Given the above example, we could visualize an event being emitted by the source by simply highlighting the path it might take:
Let's start with what happens when source$
emits the number 2
:
- Source subscriber receives a
2
and sends it along to the filter subscriber. - The
2
passes the filter subscriber's predicate ofn % 2 === 0
, so it sends the2
along to the map subscriber. - The map subscriber successfully maps
2
to200
with its mapping function ofn * 100
and sends that200
along to the catchError subscriber (In this scenario, we're saying we don't throw a random error here, more on that below). - The catchError subscriber doesn't do anything with nexted values, it only cares about error signals, so it sends the
200
along to our consumer subscriber. - The consumer subscriber recieves the
200
and logs it with its handlerconsole.log
.
Visualizing Errors (and Completions) #
We can also now visualize what would happen in the event that our mapping function above hits that 10% chance of throwing an error. Let's have a look:
- Source subscriber receives a
2
and sends it along to the filter subscriber. - The
2
passes the filter subscriber's predicate ofn % 2 === 0
, so it sends the2
along to the map subscriber. - This time the map subscriber's check of
Math.random() < 0.1
causes it to throw! Things get different, we no longer send the value along to the following next handler, instead we'll send an error signal along to the catchError subscriber's error handler.
- Because of the guarantees subscribers provide, calling the
error
handler oncatchError
's subscriber means that its subscriber is now "closed", and no more next, error, or complete calls can be made on it. This also means all subscribers before it in the chain will be notified of unsubscription and are also closed. (NOTE: If we had afinalize
operator before this point, it would be called.) - The callback we provided to
catchError
will be called, and the error would be logged — because of the callback, not because of some feature in RxJS — and it returnsEMPTY
which is an observable that synchronously completes.
- The new
EMPTY
subscriber takes the place of all of thecatchError
subscriber and notifies the consumer subscriber of completion.
- At this point, having called complete, both remaining subscribers are closed — because of the aformentioned guarantees.
Summary #
- A Subscriber, a combination of subscription and observer, is setup by calling
subscribe
on an observable. - Subscribers provide guarantees that you cannot
next
,error
, orcomplete
once they're closed, and that they are closed when theirerror
orcomplete
method is called. - Subscribers do all of the "work" in RxJS, not observables, which are just templates for setting up subscribers.
- Operators chain subscribers together, linking their next, error, and complete calls as appropriate for each operator.
- Subscribers will also finalize when closed, unsubscribing from — and therefor closing and finalizing — any subscriber "before" it in the chain.
- REITERATING THIS: If any subscriber in the chain has its
error
orcomplete
handler called, it is finalized and closed, as are all subscribers "before" it in the chain.
- Next: forEach is a code smell
- Previous: TC39 Pipeline Operator - Hack vs F#