Oscar Funes

Published on

Observables

The observable pattern has been something that piqued my curiosity a few months back. I hadn’t had the time to investigate further other than some introductory videos on YouTube.

This post is my way to actually put some time and effort to read, understand and try to explain it with my own words. If you want to check other examples, you can go to my repository on github

The initial thought I’ve had around this, was of an array. You usually use the high order functions available like map , forEach , reduce , et al.

const data = [1, 2, 3]
// forEachdata.forEach((element) => console.log(element))
// everydata.every((element) => element % 2)

Observables are similar, but the difference is in that you don’t actually know for sure how long the array is (could go on infinitely), or even when the next element will “show up”. Since most documentation talks about sequences of data or events, most of the examples you’ll find around are about user interfaces events, like the click of a button or input of characters into a field.

My day to day work is as a Node.js backend developer, so my examples will be based on that.

The first thing I noticed was that, if you googled rxjs , you would get two repos in GitHub:

  1. Reactive-Extension/RxJS (which is v4 of RXJS)

  2. ReactiveX/rxjs (which is v5 of RXJS)

I’m not really sure what’s going on with this, but I’ll be using the latest which is rxjs@5. The documentation I’ll be referring to is here. Observables only begin their execution until an observer subscribes. When invoking the subscribe method from a source, it accepts 3 parameters, which are functions for 3 things that an Observable emits, which are next , error , and complete .

  1. Next will be called each time a new element is pushed from the Observable downstream to observers.

  2. Error will be called when the Observable ends with an error

  3. Complete will be called when an Observable ends correctly.

Creating an Observable from an EventEmitter

When thinking of stream of data, for me, the first example that comes to mind is an http server . Let’s create a really simple one:

const http = require('http')
const port = 3004

const server = http.createServer((request, response) => {
  console.log('New request', request.url)
  response.end('Hello world!')
})

server.listen(port, (err) => {
  if (err) {
    return console.log('error:', err)
  }
  console.log('listening on:', port)
})

The callback we pass to the function .createServer is actually set as an event listener for the request event of the server object. So instead of wrapping the observable around the create server function, we’ll just create the server and setup an observable from the request event.

const http = require('http')
const port = 3004
const Rx = require('rxjs/rx')

const server = http.createServer()

const requests = Rx.Observable.fromEvent(server, 'request', Array.of)

requests.subscribe(([request, response]) => {
  console.log('new request', request.url)
  response.end('Hello world!')
})

server.listen(port, (err) => {
  if (err) return console.log('Error:', err)

  console.log('Listening on:', port)
})

After struggling around with not seeing the second parameter of the callback I googled and stumbled upon the answer. You need to pass the third parameter to fromEvent which is a function that transforms the parameters in any way you want, I simply passed Array.of which is a function that when invoked will create an array of any number of elements.

Creating an Observable with create

What happens if you want to wrap an asynchronous operation but you’re not doing a common callback behavior. I have this example, where I want to do an HTTP PUT operation but not really care about the payload or response. I just care that it doesn’t end in an error.

const Rx = require('rxjs/rx')
const wreck = require('wreck')

const operation = (uri) =>
  Rx.Observable.create((observable) => {
    observable.next('Trying to do a PUT')
    wreck.put(uri, { json: true }, (err, response, payload) => {
      if (err) {
        return observable.error(err)
      }
      observable.complete()
    })
  })

operation('https://jsonplaceholder.typicode.com/users/1').subscribe(
  (message) => console.log(message),
  (err) => console.log(err),
  () => console.log('Operation is complete')
)

By using wreck.put it will wrap any 4XX and 5XX HTTP responses in an error object. If you notice, I emit a next to notify the attempt to do the PUT. The operation returns an observable and I can subscribe to begin the HTTP PUT to the endpoint passed as parameter. But if it fails, I can retry indefinitely by using the .retry operator. Or even implement a rudimental incremental backoff, with the .retryWhen operator.

operation('https://jsonplaceholder.typicode.com/users/1')
  .retry(5)
  .subscribe(
    (message) => console.log(message),
    (err) => console.log(err),
    () => console.log('Operation is complete')
  )

Using the incremental backoff approach with .retryWhen

operation('https://unreachable.tld/does/not/work')
  .retryWhen((attempts) => {
    return attempts
      .zip(Rx.Observable.range(1, 5), (a, i) => i * 1000)
      .flatMap((i) => Rx.Observable.of(1).delay(i))
  })
  .subscribe(
    (message) => console.log(message),
    (err) => console.log(err),
    () => console.log('Operation is complete')
  )

This is a more complicated example, the attempts parameter is an Observable itself, which emits an element each time the above observable fails. So each time the PUT operation fails, an error will be pushed to the attempts observable. What happens next is that it gets combined with an Observable created from a range from 1 to 5. So it will only emit 5 elements. The second parameter of zip is the function of how to merge both Observable elements. Since I only care for the elements on the range, they get multiplied by 1000 and returned. The last thing is using the flatMap to return a new Observable that returns an item after the given delay. Since operators expect an element and return an element to continue the chaining of operators, by returning another Observable it needs to be flattened, meaning if I just used map if would an Observable(Observable(element)), by using flat, I’m taking it out one level and just passing the elements along. After emitting the 5 elements of the range, if a new element comes from the attempts Observable, it will be ignored and the observable would be completed, which will complete the PUT Operation observable. Which will effectively end the operation after the first attempt and 5 retries.

Final thoughts

Observables are a great way to handle asynchronous workflows. One thing I found you have to be aware is that each time you do a subscribe you actually get a new instance of the observable. So with our operation, if we do 2 subscribes we would actually be doing 2 different PUT operations. To handle this behavior were you want the same operation multicasted to different observers, then you would need to use a Subject. I also find amazing that you can wrap virtually anything you want, you can even return Promises from inside the operators and they will get resolved and the results passed along. I’ll be looking for cases where an Observable makes more sense than a promise, or a callback, or events.

Conclusion

After this initial try to use Observables, I find them rather interesting. Here we just looked a few examples of how to create observables. We didn’t look deep into operators and how to manipulate the elements or combine different observables. So I’ll leave it to you to read through the operators, and Subjects and more advanced stuff.