Observables are broken and so is JavaScript

RxJSJavaScriptObservableIterable

(Okay, neither is "completely broken"... but hear me out...) #

TLDR: #

Teardown your resources as soon as you know you should, before you do anything else!

JavaScript iteration was implemented cleverly, but poorly, and that's why we can't have nice things.

Click here for "I don't care about RxJS/Observables, I'm angry that you're saying JavaScript is broken!"

Click here if you're just angry and want to complain about this article on Twitter

How are Observables broken? #

Over the years in RxJS, we've had a few issues come up that usually relate to reentrancy (meaning an observable operation that observes a source, that when it notifies causes it to recieve another notification from the source). The basic thing that happens is that we'll have some operator, like take, that notifies of complete, and then the complete handler downstream will next or error or the like into a subject or something that is feeding into the take's source. Here's an example:

ts
import { Subject, tap, take } from 'rxjs';
 
const subject = new Subject<number>();
 
subject.pipe(tap(console.log), take(1)).subscribe({
next: (n) => subject.next(n + 1),
complete: () => subject.next(42),
});
 
subject.next(1);
 
// In every version of RxJS
// (except the latest RxJS alpha at the time of this writing)
// this will log:
// 1
// 2
// 42

This gets even more gnarly if there are errors thrown into the mix... consider the following:

ts
import { Subject, map, take, tap, catchError, of } from 'rxjs';
 
const subject = new Subject<number>();
 
subject
.pipe(
map((v) => {
if (v === 42) {
throw new Error('No Monty Python references!');
}
return v;
}),
catchError((err) => {
console.log('LOL! an error? Yes: ' + err?.message);
return of('wee');
}),
tap(console.log),
take(1),
)
.subscribe({
next: (n) => subject.next(n + 1),
complete: () => subject.next(42),
});
 
subject.next(1);
 
// 1
// 2
// LOL! an error? Yes: No Monty Python references!
// wee

Or... really any considerable work... What if that map had some very expensive calculation in it? It would be executed, even though the downstream subscriber (inside of take) already told us it's no longer interested!

We've had several fixes around this... most recently in the unreleased RxJS 8 alpha: https://github.com/ReactiveX/rxjs/pull/6396

And all of those fixes involve some code stink in the form of needing to get a handle to the subscriber so you can unsubscribe it from within itself before you notify anything downstream... for example what was done to fix this in take

The problem is that this isn't fixed everywhere, and basically any operator that could even have an error will need to do this, or risk having this same breakage.

Take map for example:

ts
import { Subject, map, tap } from 'rxjs';
 
const subject = new Subject<number>();
 
subject
.pipe(
tap(console.log),
map(() => {
throw new Error('ROFL');
}),
)
.subscribe({
error: (err) => {
console.log(`Error: ${err?.message}`);
subject.next(42);
},
});
 
subject.next(1);
 
// 1
// Error: ROFL
// 42

The Cause: Teardown/unsubscription is incorrectly timed #

Fundamentally, the logic for "what happens when a subscriber complete/error is called?" is currently, roughly as follows:

  1. Flag the subscriber closed
  2. Notify (complete/error)
  3. Unsubscribe/Teardown

However, while coming up with the solution to the issues I listed above, I came up with a principal for implementing operators that we adopted (IIRC @cartant was the most active contributor at the time that also agreed with this), and that principle was basically "Teardown your resources as soon as you know you should, before you do anything else". So, if you're in take, and you take your fill, unsubscribe, then notify your next and complete. This principle solved the reentrancy issues we'd been seeing.

It's also a pretty good principle in general: Don't keep resources around any longer than you have to.

However, it doesn't take long mentally exercising this principle in earnest before you realize there are issues with almost every operator. Like map above. It seems rather smelly that in every operator implementation we'd have to trap a Subscriber reference, just so it can unsubscribe itself before notifying downstream. Think about this: The subscriber needs to reference itself so it unsubscribes before it emits.

So let's take our principle all the way to the actual Subscriber itself:

"Teardown your resources as soon as you know you should, before you do anything else"

...If that's true, then we should teardown our resources as soon as we know we can/should during the lifecycle of the Subscriber itself. ...But we don't.

We actually go out of our way to not do that, executing this code in our Subscriber:

ts
class Subscriber<T> {
// ...
complete(): void {
if (this.isStopped) {
handleStoppedNotification(COMPLETE_NOTIFICATION, this);
} else {
this.isStopped = true;
this._complete();
}
}
protected _complete(): void {
try {
this.destination.complete();
} finally {
this.unsubscribe();
}
}
}

Or simplified to the important bits:

ts
class Subscriber<T> {
// ...
complete(): void {
if (!this.isStopped) {
this.isStopped = true;
this.destination.complete(); // notify downstream
this.unsubscribe(); // flag closed, teardown
}
}
}

Even weirder, our Subscriber isn't closed when it's "stopped". It's only closed after unsubscribe was called. So if you examined it while it's notifying downstream it would be like "yeah, I'm not closed" when you clearly cannot notify anything through it anymore — because of this non-public, mysterious to non-implementors "isStopped" bit. ... yet more stank.

The Solution: Follow the principle for all observables! #

We need to make sure we teardown/unsubscribe before we notify of anything. That is to say, the simplified implementation should look like this:

ts
class Subscriber<T> {
// ...
complete(): void {
if (!this.closed) {
this.unsubscribe(); // flag closed, teardown
this.destination.complete(); // notify downstream
}
}
}

Because the subscriber itself is the first to know that it no longer needs whatever resources it has had added to it (via add, part of the Subscription it inherited along with unsubscribe), the Subscriber should tear things down before notifying on.

This means we can remove any specialized code that captures the subscriber just so it can unsubscribe it first! Removing the stink.

This would be a breaking change #

Fundamentally, what this means is things like our finalize operator, (which is "finally" in other implementations) would need to have its handler fired before any downstream "complete" notifications.

Duality and alignment with other types #

The breaking change above, though, makes sense, when you look at Observable as the "dual" of Iterable. There's been plenty of coverage of this duality, starting with @headinthebox's paper on this duality here

If we look at a simplified Iterable implementation (written out as a generator function), we can test this behavior:

ts
function* producer() {
try {
console.log('producer start');
yield 1;
yield 2;
yield 3;
} finally {
console.log('producer teardown');
}
}
 
function consumer() {
for (const value of producer()) {
console.log(`consumer gets: ${value}`);
}
console.log('consumer knows producer is done');
}
 
consumer();
 
// Logs:
// producer start
// consumer gets: 1
// consumer gets: 2
// consumer gets: 3
// producer teardown
// consumer knows producer is done

As you can see, the producer "tears down" or finalizes BEFORE the consumer can possibly know that the producer is done. Research has shown me that this is true for every iteration protocol implementation, including IEnumerable from DotNet, which is were observable, and ultimately RxJS was born from.

JavaScript's iteration design is fundamentally flawed! #

Unrelated to any of this, even if we fix this here in the proposed way, any operation which knows it can unsubscribe from the source at the moment before it notifies the consumer of a value still isn't completely fixed. For example take(count). Because as soon as take realizes that its count as been met, it must unsubscribe from the source. This means in those scenarios, we'll will need to trap a subscriber instance and tear it down before we next out that last value. It stinks, but there it is.

So what would solve this neatly? Well, if we could somehow call a "next" like method on Subscriber that not only send the last value, but flagged us closed in the meantime. Let's hypothetically call that... return?

ts
class Subscriber<T> {
// ...
return(lastValue: T) {
if (!this.closed) {
this.unsubscribe();
this.destination.next(lastValue);
this.destination.complete();
}
}
}

But wait! that destination.next call needs to know it should teardown first, and that complete call doesn't make sense, if the new return can do the whole dance for you!.

Okay, but Ben, "Why would we want two different ways of completing an observable?". WE DON'T! The truth is that we could have ONE complete (aka return) if calling it with void (or the complete absence of a value, aka no argument at all) was a signal NOT to next, then we could have one "return" (aka "complete") to rule them all!:

ts
class Subscriber<T> {
// ...
complete(lastValue?: T) {
if (!this.closed) {
this.unsubscribe();
if (arguments.length > 0) {
// Not void!
this.destination.next(lastValue);
}
this.destination.complete();
}
}
}

Therefor a take implementation could look like this, and everything is solved:

ts
interface Observer<T> {
next(value: T): void;
error(error: any): void;
complete(value: T): void;
}
 
interface Subscriber<T> {
next(value: T): void;
error(error: any): void;
complete(value: T): void;
}
 
class Observable<T> {
constructor(init: (subscriber: Subscriber<T>) => void) {}
 
subscribe(observer: Observer<T>): void {}
}
// -- cut --
 
function take<T>(count: number) {
return (source: Observable<T>) =>
new Observable<T>((subscriber) => {
let seen = 0;
source.subscribe({
next: (value) => {
seen++;
if (seen >= count) {
subscriber.complete(value);
} else {
subscriber.next(value);
}
},
error: (err) => subscriber.error(err),
complete: (...args) => subscriber.complete(...args),
});
});
}

So why am I saying JavaScript's design for iterables is flawed?

If I'm showing you that a return (aka complete) in Observable that sometimes gets a value, and other times does not, definitively solves this issue, then certainly that design should have some analog in JavaScript. BUT... what I'm calling complete above is analogous to return in JavaScript... and a value passed via return in JavaScript is "special" and not part of the iteration.

Consider the following:

ts
function* iter() {
yield 1;
yield 2;
return 3;
}
 
for (const value of iter()) {
console.log(value);
}
 
// 1
// 2

WHAT HAPPENED TO THE 3?!

It was eaten by JavaScript's bad iteration design! (I'm sorry if you worked on this, don't worry I make bad things too, see this library for plenty of examples)

In every language BUT JavaScript, iteration is done in two steps:

  1. Can I get the next value? boolean
  2. Get the next value. V.

Leaving only two states you can be in: Either you can get a value and you get one, or you can't get a value.

But the designers of JavaScript's iterator thought they could do better! They can do it all in one step:

  1. Get an object that gives me a value OR tells me if I'm done. { value: V, done: boolean }

But this creates three possible states! You have a value, and can get more; You don't have a value, and can get no more; OR the magical third state, You DO have a value, and can get no more.

But so far, this seems fine, and almost aligns with the hypothetical change above, right? "complete with a value". So far so good.

BUT I demonstrated above that the return value is not part of the actual iterable values! And there in lies the flaw! You can't possibly implement a take using this mechanism in JavaScript iteration, because it wouldn't work right:

ts
function* iter() {
let n = 0;
while (true) {
yield n++;
}
}
 
// Ideal, but wouldn't work
function* takeBetter<T>(source: Iterable<T>, count: number) {
let seen = 0;
for (const value of source) {
seen++;
if (seen >= count) {
return value;
}
yield value;
}
}
 
for (const value of takeBetter(iter(), 3)) {
console.log(value);
}
 
// It only logs two values :(
 
// 1
// 2
ts
function* iter() {
let n = 0;
while (true) {
yield n++;
}
}
 
// -- cut --
// Less ideal, but does work
function* take<T>(source: Iterable<T>, count: number) {
let seen = 0;
for (const value of source) {
seen++;
yield value;
if (seen >= count) {
return 'totally ignored value';
}
}
}
 
for (const value of take(iter(), 3)) {
console.log(value);
}
 
// 0
// 1
// 2

Okay, but Ben... So what?! It seems just as simple?!

Okay, sure, but it's broken! Before we even yield in that take, we inherently know that our source iterable can teardown. But JavaScript iteration protocol lacks the mechanism to deal with this appropriately!

ts
function* iter() {
try {
let n = 0;
while (true) {
yield n++;
}
} finally {
console.log('cleaned up');
}
}
 
for (const value of take(iter(), 3)) {
console.log(value);
}
 
// 0
// 1
// 2
// cleaned up

In an ideal world, "cleaned up" should be logged before "2". In place of console.log(value) could be something that does a lot of work, or somehow impedes the clean up that happens in that finally block. In language use, there's really no telling what could be happening in there. (This holds true for RxJS's usage as well!). Everything should hold up to the principle "Teardown your resources as soon as you know you should, before you do anything else!"

But why is this broken in this way? Basically, because JavaScript decided to invent a new "single step" iteration mechanism, that made the seemingly obvious choice for the IteratorResult of return 0 be { done: true, value: 0 }. Pretty neat, right? Well, no, actually not at all.

You see, iteration protocol in JavaScript can't handle including the { done: true, value: ??? } in the iteration. And why is that? Because it can't differentiate between { done: true, value: undefined } from a generator literally executing return undefined, and the implicit "void" (in practice undefined) return of all JavaScript functions being sent with that single iteration step:

ts
function* iter1() {
yield 1;
yield 2;
return undefined;
}
 
// vs
 
function* iter2() {
yield 1;
yield 2;
}

To accommodate { done: true, value: 'whatever' } being iterated in a for..of loop (or anything else that reads iterables, like rest spreads, destructuring, or Array.from()), you'd then have to deal with an implicit undefined at the end of EVERY iterable:

ts
declare const getSomeData: () => Iterable<{ isGood: boolean; value: string }>;
 
// -- cut --
function* readGoodData() {
const dataRows = getSomeData();
for (const data of dataRows) {
if (data.isGood) {
yield data.value;
}
}
}
 
for (const value of readGoodData()) {
// If { done: true, value: ??? } is iterated, we'll get a trailing "undefined" here.
console.log(value);
}

JavaScript is flawed? JavaScript iteration is. #

There's two things at play here.

  1. The implicit return of all JavaScript functions being undefined rather than void being an actual thing (as opposed to a pig that TypeScript puts lipstick on with void).
  2. The iteration mechanism being "let's iterate in a single step".

If all functions implicit returns were a void that was a real thing, meaning "you can't access or read this or use this or you'll get an error, because it's not to be used, it's void". Then we wouldn't be in this mess. But that ship has sailed.

If iteration was done in the "normal" two steps, we'd have this solved as well.

Iterators were introduced as a concept in programming languages by the creator of the CLU (Cluster) programming language (and 2008 Turing Award Recipient), Barbara Liskov at MIT in 1973. From a paper she contributed to, Ownership Types for Object Encapsulation, published 2003:

interface TEnumeration<enumOwner, TOwner> {
T<TOwner> getNext();
boolean hasMoreElements();
}

Or in Java:

public interface Iterator<Item>
{
Item next();
boolean hasNext();
}

Or Dot Net:

public interface IEnumerator<T>
{
T Current;
boolean MoveNext();
}

Iteration is effectively done by running code to see if you have a next value, then pulling the value in a second call. If iteration was done in two steps, there's no chance anyone can conflate any of this. Consider the following iterable defined with a generator function in JavaScript:

ts
function* iter() {
let a = 1;
yield a;
a = a + 1;
yield a;
a = a + 2;
yield a;
console.log('additional code');
}

In JavaScript, since everything is done in one step, this work like:

  1. iterator.next() executes let a = 1, returns { done: false, value: 1 }
  2. iterator.next() executes a = a + 1, returns { done: false, value: 2 }
  3. iterator.next() executes a = a + 2, returns { done: false, value: 4 }
  4. iterator.next() executes console.log('additional code'), returns { done: true, value: undefined }

With a more contemporary iteration design, this would look as follows:

  1. iterator.hasNext() executes let a = 1 and returns true.
  2. iterator.next() returns 1.
  3. iterator.hasNext() executes a = a + 1 and returns true.
  4. iterator.next() returns 2.
  5. iterator.hasNext() executes a = a + 2 and returns true.
  6. iterator.next() returns 4.
  7. iterator.hasNext() executes console.log('additional code') and returns false.

Sure, it's more steps, but we're not allocating an object on every turn, and we can differentiate between whether or not a real value has been returned, or we're just returned. Even in a language like JavaScript where we return undefined implicitly from functions.

I suppose you could differentiate this in JavaScript's mechanism by seeing the difference between { done: true } and { done: true, value: undefined }, but "Yay, JavaScript!" that doesn't really work either if you naively read the value ({ done: true }).value === undefined. You'd have to check result.done && 'value' in result, but that's a footgun and a bit inefficient.

So what about an example where we use return in iteration?

ts
function* iter() {
let a = 1;
yield a;
a = a + 1;
yield a;
a = a + 2;
return a;
}

In this hypothetical, the more contemporary iteration design would execute as follows:

  1. iterator.hasNext() executes let a = 1 and returns true.
  2. iterator.next() returns 1.
  3. iterator.hasNext() executes a = a + 1 and returns true.
  4. iterator.next() returns 2.
  5. iterator.hasNext() executes a = a + 2 and returns true.
  6. iterator.next() returns 4.
  7. iterator.hasNext() returns false, with nothing to execute as the function returned.

But then we're hosed if we just wanted to end our iteration early... Because in JavaScript, return; is observably the same as return undefined; as far as the consumer is concerned. This is why in C#, return in a generator function is a compile-time error, and there's yield return and yield break.

How does this hold up to our principle above? "Teardown your resources as soon as you know you should, before you do anything else!" Since you're checking to see if you have another value before you're allowing the consumer to consume it, it holds up perfectly. There's enough time to notify teardown any upstream resource with an associated method call. In the case of generators, it's the poorly named return method, I'd have supplied some sort of finalize() method, personally, I think. But I'm not a language designer.

Okay, well how do we fix our take above? #

Now we're left to do some hacky stuff to get this working... In order for anything to "know" that it it's done sending values at or before the moment we yield that last value, we'll have to examine the iterator to see if it's actually a generator, and then invoke it's return() method.

ts
function* take<T>(source: Iterable<T>, count: number) {
let seen = 0;
for (const value of source) {
seen++;
const done = seen >= count;
if (done && 'return' in source && typeof source.return === 'function') {
// Execute the finally block in the iterator if it
// happens to be a generator.
source.return();
}
yield value;
if (done) {
return;
}
}
}

Wait, are you saying that Observable complete should have a value? #

No. Not in JavaScript. It simply doesn't make sense and there are no use cases to support it. It would add an undue level of complexity. Just as generator.return("this value") can only echo the value back to the consumer that executed the call, even though it does cause the innards of the generator itself to jump to whatever finally block its in; Observables should not be able to push a value through it's "return" (complete) to be observed by it's consumer. Being the "dual" the roles are reversed. If a iterables consumer can't pass a value to the producer through return(value), an observable's producer shouldn't be able to pass a value to its consumer through complete(value) (it's return). It doesn't make any sense, and like I said, there are no use cases to support it anyhow.

There are probably some thinking-types out there that might say "well why not pass { done: boolean, value: T } to next in Observables?" That's no good either! The problem is that next is reserved for observed values from the set of events. { done: true, value: T } is NOT "observed" during iteration of an iterable, why would it be observed while subscribed to an observable? That doesn't fit the duality of the type.

Summary #

  1. RxJS Observable, and all Observable implementations will need to start tearing down as soon as they're able to. That means that teardown and other finalization needs to happen before the consumer is notified of complete or error.
  2. JavaScript's iterable design was an avante garde idea, but it has some rough edges related to the fact that it tries to do too much in one call; and they probably should have used an existing design, rather than invent a new one.
  3. I ramble a lot, and if you made it this far. Thank you for putting up with my nonsense. <3