Taming the Async Beast with FRP and RxJS

The Problem

We’ve recently been working on an in-browser vision mixer for the BBC (previous blog posts here, here, here, and here). Live vision mixing involves keeping track of a large number of BBC R&D logointerdependent data streams. Our application receives timing data for video tapes and live video streams via webrtc data channels and websocket connections and we’re sending video and audio authoring decisions over other websockets to the live rendering backend.

Many of the data streams we’re handling are interdependent; we don’t want to send an authoring decision to the renderer to cut to a video tape until the video tape is loaded and ready to play, so we need to wait until the video tape is ready to play before we send an authoring decision; if the authoring websocket has closed we’ll need to reconnect to it then retry sending that authoring decision.

Orchestrating interdependent asynchronous data streams is a fundamentally complex problem.

Promises are one popular solution for composing asynchronous operations and safely transforming the results, however they have a number of limitations. The primary issue is that they cannot be cancelled, so we need to handle teardown separately somehow. We could use the excellent fluture or Task Future libraries instead, both of which support cancellation (and are lazy and chainable and fantasy-land compliant), but futures and promises handle one single future value (or error value), not a stream of many values (or error value). The team working this project are fans of futures (less so of promises) and were aiming to write the majority of the codebase in a functional style using folktale and ramda (and react-redux) so wanted a functional, composable way to handle ongoing streams of data that could sit comfortably within the rest of the codebase.

A Solution

After some debate, we decided to use FRP (functional reactive programming) powered by the observable pattern. Having used RxJS (with redux-observable) for smaller projects in the past, we were confident that it could be an elegant solution to our problem. You can find out more about RxJS here and here but, in short, it’s a library that allows subscribers to listen to and transform the output of a data stream as per the observer pattern, and allows the observable (the thing subscribed to) to “complete” its stream when it runs out of data (or whatever), similar to an iterator from the iterator pattern. Observables also allow their subscribers to terminate them at any point, and typically observables will encapsulate teardown logic related to their data source – a websocket, long-poll, webrtc data channel, or similar.

RxJS implements the observer pattern in a functional way that allows developers to compose together observables, just as they’d compose functions or types. RxJS has its roots in functional reactive programming and leverages the power of monadic composition to chain together streams while also ensuring that teardown logic is preserved and handled as you’d expect.

Why FRP and Observables?

The elegance and power of observables is much more easily demonstrated than explained in a wordy paragraph. I’ll run through the basics and let your imagination think through the potential of it all.

A simple RxJS observable looks like this:

Observable.of(1, 2, 3)

It can be subscribed to as follows:

Observable.of(1, 2, 3).subscribe({
  next: val => console.log(`Next: ${val}`),
  error: err => console.error(err),
  complete: () => console.log('Completed!')

Which would emit the following to the console:

Next: 1
Next: 2
Next: 3

We can also transform the data just as we’d transform values in an array:

Observable.of(1, 2, 3).map(x => x * 2).filter(x => x !== 4).subscribe(...)

Observables can also be asynchronous:

0 [a second passes]
1 [a second passes]
2 [a second passes]

Observables can represent event streams:

Observable.fromEvent(window, 'mousemove').subscribe(...)
[Event Object]
[Event Object]
[Event Object]

Which can also be transformed:

Observable.fromEvent(window, 'mousemove')
  .map(ev => [ev.clientX, ev.clientY])
[211, 120]
[214, 128]
[218, 139]

We can cancel the subscriptions which will clean up the event listener:

const subscription = Observable.fromEvent(window, 'mousemove')
  .map(ev => [ev.clientX, ev.clientY])


Or we can unsubscribe in a dot-chained functional way:

Observable.of(1, 2, 3)
  .take(2)  // After receiving two values, complete the observable early
Observable.fromEvent(window, 'mousemove')
  .map(ev => [ev.clientX, ev.clientY])
   // Stop emitting when the user clicks
  .takeUntil(Observable.fromEvent(window, 'click'))

Note that those last examples left no variables lying around. They are entirely self-contained bits of functionality that clean up after themselves.

Many common asynchronous stream use-cases are catered for natively, in such a way that the “operators” (the observable methods e.g. “throttle”, “map”, “delay”, “filter”) take care of all of the awkward state required to track emitted values over time.

Observable.fromEvent(window, 'mousemove')
  .throttle(1000) // only allow one event through per second

… and that’s barely scratching the surface.

The Benefits

Many of the benefits of RxJS are the benefits of functional programming. The avoidance of state, the readability and testability of short, pure functions. By encapsulating the side-effects associated with your application in a generic, composable way, developers can maximise the reusability of the asynchronous logic in their codebase.

By seeing the application as a series of data transformations between the external application interfaces, we can describe those transformations by composing short, pure functions and lazily applying data to them as it is emitted in real-time.

Messy, temporary, imperative variables are replaced by functional closure to give observables access to previously emitted variables in a localised way that limits the amount of the application logic and state a developer must hold in their head at any given time.

Did It Work?

Sort of.  We spent a lot of our time in a state of low-level fury at RxJS, so much so that we’ve written up a long list of complaints, in another post.

There are some good bits though:

FRP and the observable pattern are both transformative approaches to writing complex asynchronous javascript code, producing fewer bugs and drastically improving the reusability of our codebase.

RxJS operators can encapsulate extremely complex asynchronous operations and elegantly describe dependencies in a terse, declarative way that leaves no state lying around.

In multiple standups throughout the project we’ve enthusiastically raved about how these operators have turned a fundamentally complex part of our implementation into a two line solution. Sure those two lines usually took a long time to craft and get right, but once working, it’s difficult to write many bugs in just two lines of code (when compared to the hundreds of lines of imperative code we’d otherwise need to write if we rolled our own).

That said, RxJS is a functional approach to writing code so developers should expect to incur a penalty if they’re new to the paradigm as they go from an imperative, object-oriented approach to system design to a functional, data-flow-driven approach instead. There is also a very steep learning curve required to feel the benefits of RxJS as developers familiarise themselves with the toolbox and the idiosyncrasies.

Would We Use It Again?

Despite the truly epic list of shortcomings, I would still recommend an FRP approach to complex async javascript projects. In future we’ll be trying out most.js to see if it solves the myriad of problems we found with RxJS. If it doesn’t, I’d consider implementing an improved Observable that keeps its hands off my errors.

It’s also worth mentioning that we used RxJS with react-redux to handle all redux side-effects. We used redux-observable to achieve this and it was terrific. We’ll undoubtedly be using redux-observable again.


Leave a Reply

Your email address will not be published. Required fields are marked *