Learning Observable By Building Observable

RxJSJavaScript

This is a rehash of an old article I wrote in 2016, and a talk I've given more than a few times. I want to modernize the content a little, and hopefully simplify it. The goal is to help people understand what an observable is. Not just an RxJS observable, but any observable (yes, there's more than one), as a type.

Observables Are Just Functions #

To understand this, I want to place two side-by-side examples. One that is an RxJS observable, and one that is just a function. Both of these examples have the exact same output:

An RxJS Observable

This is an example of a simple observable created with RxJS that emits three numbers and completes. If you're already familiar with RxJS, this is equivalent to of(1, 2, 3).

ts
import { Observable } from 'rxjs';
 
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
 
// Usage
console.log('start');
source.subscribe({
next: console.log,
complete: () => console.log('done'),
});
console.log('stop');

A function "observable"

ts
/**
* A simple object with a `next` and `complete` callback on it.
*/
interface Observer<T> {
next: (value: T) => void;
complete: () => void;
}
 
/**
* A function that takes a simple object with callbacks
* and does something them.
*/
const source = (subscriber: Observer<number>) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
};
 
// Usage
console.log('start');
source({
next: console.log,
complete: () => console.log('done'),
});
console.log('stop');

output (of both!)

txt
"start"
1
2
3
"done"
"stop"

What I want you to notice above are the similarities. In both, you pass in an object with a next and complete method on it. In both, you call next and complete in the body of a function. In both, the body of the function is not executed until later, when you call source.subscribe() or simply call the function directly as source() in the other example. This is because Observables are just specialized functions.

Why not just use functions? What's "special" about Observable? #

Fair point. If you were very careful, you probably could use ordinary functions for some of these use cases. The problem is that the functions-only version isn't quite "safe". As just one example, if the semantics of observable are that next should never be called after complete, we're going to need some guarantees around that.

ts
const source = function (subscriber: Observer<number>) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Oops, this shouldn't be allowed!
};

The above example will now log "done" then 4 right after. It wasn't "done" at all! So we want to provide some way to guarantee our subscriber's next method can't be called after complete. This can be done by wrapping our function in a class.

ts
/**
* A class used to wrap a user-provided Observer. Since the
* observer is just a plain objects with a couple of callbacks on it,
* this type will wrap that to ensure `next` does nothing if called after
* `complete` has been called, and that nothing happens if `complete`
* is called more than once.
*/
class SafeSubscriber<T> {
closed = false;
 
constructor(private destination: Observer<T>) {}
 
next(value: T) {
// Check to see if this is "closed" before nexting.
if (!this.closed) {
this.destination.next(value);
}
}
 
complete() {
// Make sure we're not completing an already "closed" subscriber.
if (!this.closed) {
// We're closed now.
this.closed = true;
this.destination.complete();
}
}
}
 
/**
* A class to wrap our function, to ensure that when the function is
* called with an observer, that observer is wrapped with a SafeSubscriber
*/
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => void) {}
 
subscribe(observer: Observer<T>): void {
// We can wrap our observer in a "safe subscriber" that
// does the work of making sure it's not closed.
const subscriber = new SafeSubscriber(observer);
this._wrappedFunc(subscriber);
}
}
 
// Usage
// Now 4 won't be nexted after we complete.
const source = new Observable((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // this does nothing.
});

Handling Partial Observers #

Another scenario that comes up is a "partial" observer. That is to say, an observer that just has a next method or just has a complete method (or perhaps an error method, but we'll get to that later). We can handle this scenario now easily with our Observable type above, because we can implement that in our SafeSubscriber:

ts
class SafeSubscriber<T> {
closed = false;
 
constructor(private destination: Partial<Observer<T>>) {}
 
next(value: T) {
if (!this.closed) {
this.destination.next?.(value); // Note the ?. check here.
}
}
 
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.(); // And here.
}
}
}

Error Notifications #

Notifying our subscriber of an error is really as simple as adding an additional handler to our Observer and Safe Subscriber. The semantics are very much the same as complete above. Both error and complete are considered to be a termination of the observation.

The Observer just changes slightly, to have an error handler:

ts
interface Observer<T> {
next: (value: T) => void;
complete: () => void;
error: (err: any) => void;
}

After that, we can add an error method to our SafeSubscriber:

ts
class SafeSubscriber<T> {
closed = false;
 
constructor(private destination: Partial<Observer<T>>) {}
 
next(value: T) {
if (!this.closed) {
this.destination.next?.(value);
}
}
 
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.();
}
}
 
error(err: any) {
if (!this.closed) {
this.closed = true;
this.destination.error?.(err);
}
}
}

Data Sources And The Case For Teardown #

The primary use case for observable is to wrap an async data source, for example a WebSocket. To do something like that, you could use our homemade observable above like so:

ts
const helloSocket = new Observable<string>((subscriber) => {
// Open a socket.
const socket = new WebSocket('wss://echo.websocket.org');
 
socket.onopen = () => {
// Once it's open, send some text.
socket.send('Hello, World!');
};
 
socket.onmessage = (e) => {
// When it echoes the text back (in the case of this particular server)
// notify the consumer.
subscriber.next(e.data);
};
 
socket.onclose = (e) => {
// Oh! we closed!
if (e.wasClean) {
// ...because the server said it was done.
subscriber.complete();
} else {
// ...because of something bad. Maybe we lost network or something.
subscriber.error(new Error('Socket closed dirty!'));
}
};
});
 
// Start the websocket and log the echoes
helloSocket.subscribe({
next: console.log,
complete: () => console.log('server closed'),
error: console.error,
});

But now we have a problem. The user that subscribes to our observable has no way to cancel it and close the socket. We need a way to shut things down. If we were just using a function, we could return a function that contained our teardown logic.

teardown if we were just using a function

ts
const source = (subscriber: Observer<string>) => {
const socket = new WebSocket('wss://echo.websocket.org');
 
socket.onopen = () => {
socket.send('Hello, World!');
};
 
socket.onmessage = (e) => {
subscriber.next(e.data);
};
 
socket.onclose = (e) => {
if (e.wasClean) {
subscriber.complete();
} else {
subscriber.error(new Error('Socket closed dirty!'));
}
};
 
return () => {
if (socket.readyState <= WebSocket.OPEN) {
socket.close();
}
};
};
 
const teardown = source({
next: console.log,
complete: () => console.log('done'),
error: console.error,
});
 
// Decide you really don't want to keep the socket open.
teardown();

Another thing that comes up, not necessarily with WebSocket, but with other situations, are situations where the observable author decides they've encountered an error or a completion state and they want to notify the user then teardown. It would be nice to have a unified spot to do this, such that if subscriber.error or subscriber.complete is called, the teardown is invoked ASAP.

We can accomplish all of this with some changes to SafeSubscriber and the addition of a Subscription type.

Adding teardown (via Subscription) to our homemade Observable.

ts
/**
* Our subscription type. This is to manage teardowns.
*/
class Subscription {
private teardowns = new Set<() => void>();
 
add(teardown: () => void) {
this.teardowns.add(teardown);
}
 
unsubscribe() {
for (const teardown of this.teardowns) {
teardown();
}
this.teardowns.clear();
}
}
 
class SafeSubscriber<T> {
closed = false;
 
constructor(
private destination: Partial<Observer<T>>,
private subscription: Subscription,
) {
// Make sure that if the subscription is unsubscribed,
// we don't let any more notifications through this subscriber.
subscription.add(() => (this.closed = true));
}
 
next(value: T) {
if (!this.closed) {
this.destination.next?.(value);
}
}
 
complete() {
if (!this.closed) {
this.closed = true;
this.destination.complete?.();
this.subscription.unsubscribe();
}
}
 
error(err: any) {
if (!this.closed) {
this.closed = true;
this.destination.error?.(err);
this.subscription.unsubscribe();
}
}
}
 
class Observable<T> {
constructor(private _wrappedFunc: (subscriber: Observer<T>) => () => void) {}
 
subscribe(observer: Observer<T>) {
const subscription = new Subscription();
const subscriber = new SafeSubscriber(observer, subscription);
subscription.add(this._wrappedFunc(subscriber));
return subscription;
}
}
 
const helloSocket = new Observable<string>((subscriber) => {
const socket = new WebSocket('wss://echo.websocket.org');
 
socket.onopen = () => {
socket.send('Hello, World!');
};
 
socket.onmessage = (e) => {
subscriber.next(e.data);
};
 
socket.onclose = (e) => {
if (e.wasClean) {
subscriber.complete();
} else {
subscriber.error(new Error('Socket closed dirty!'));
}
};
 
return () => {
if (socket.readyState <= WebSocket.OPEN) {
socket.close();
}
};
});
 
const subscription = helloSocket.subscribe({
next: console.log,
complete: () => console.log('server closed'),
error: console.error,
});
 
// Later, we can unsubscribe!
subscription.unsubscribe();

That's It! Observables are just specialized functions! #

I know that was a LOT of code, hopefully the walkthrough helped you realize what goes into an observable type (like the one from rxjs), and helped demystify it a bit. It's certainly not magic.

I'd challenge you to ask yourself then, "What are observables good for?" and honestly, the shortest answer is: You can use an observable for anything you can use a function for. The biggest difference is a small set of guarantees around the callbacks and perhaps most importantly, around teardown of resources.

It's important to note that the above implementation is NOT something you should be recreating or using in production. I'd recommend (of course) using the RxJS observable as it covers a lot more situations to help keep your code dependable and safe when composing complex streaming behaviors.

In a follow up posts, I'll talk about "Why observables?" and all about "operators", which is generally where people get turned off by RxJS.