Manipulating observables
Now that we know how to create observables, we should look at what kinds of interesting things we can do with them. In this section, we will see what it means to treat observables as sequences.
We'll start with something simple. Let's print the sum of the first five positive even integers from an observable of all integers:
(rx/subscribe (->> (Observable/interval 1 TimeUnit/MICROSECONDS) (rx/filter even?) (rx/take 5) (rx/reduce +)) prn-to-repl)
This is starting to look awfully familiar to us. We create an interval that will emit all positive integers starting at zero every one microsecond. Then, we filter all even numbers in this observable. Obviously, this is too big a list to handle, so we simply take the first five elements from it. Finally, we reduce the value using +. The result is 20.
To drive home the point that programming with observables really is just like operating on sequences, we will look at one more example, where we will combine two different observable sequences. One contains the names of musicians I'm a fan of and the other the names of their respective bands:
(defn musicians [] (rx/seq->o ["James Hetfield" "Dave Mustaine" "Kerry King"])) (defn bands [] (rx/seq->o ["Metallica" "Megadeth" "Slayer"]))
We would like to print to a string of the format Musician name - from: band name to the REPL. An added requirement is that the band names should be printed in uppercase for impact.
We'll start by creating another observable that contains the uppercased band names:
(defn uppercased-obs [] (rx/map (fn [s] (.toUpperCase s)) (bands)))
While not strictly necessary, this makes a reusable piece of code that can be handy in several places of the program, thus avoiding duplication. Subscribers interested in the original band names can keep subscribing to the bands observable.
With the two observables in hand, we can proceed to combine them:
(-> (rx/map vector (musicians) (uppercased-obs)) (rx/subscribe (fn [[musician band]] (prn-to-repl (str musician " - from: " band)))))
Once more, this example should feel familiar. The solution we were after was a way to zip the two observables together. RxClojure provides zip behavior through map, much like Clojure's core map function does. We call it with three arguments: the two observables to zip and a function that will be called with both elements, one from each observable, which should return an appropriate representation. In this case, we simply turn them into a vector.
Next, in our subscriber, we simply destructure the vector so that we can access the musician and band names. Finally, we can print the final result to the REPL:
"James Hetfield - from: METALLICA" "Dave Mustaine - from: MEGADETH" "Kerry King - from: SLAYER"