Observable rolling averages
It might not be immediately obvious how we can model rolling averages as observables. What we need to keep in mind is that pretty much anything we can think of as a sequence of values, we can probably model as an observable sequence.
Rolling averages are no different. Let's forget for a moment that the prices are coming from a network call wrapped in an observable. Let's imagine we have all of the values we care about in a Clojure vector:
(def values (range 10))
What we need is a way to process these values in partitions or buffers—of size 5—in such a way that only a single value is dropped at each interaction. In Clojure, we can use the partition function for this purpose:
(doseq [buffer (partition 5 1 values)] (prn buffer)) (0 1 2 3 4) (1 2 3 4 5) (2 3 4 5 6) (3 4 5 6 7) (4 5 6 7 8) ...
The second argument to the partition function is called a step and it is the offset of how many items should be skipped before starting a new partition. Here, we set it to 1 to create the sliding window effect we need.
The big question then is: can we somehow leverage partition when working with observable sequences?
It turns out that RxJava has a transformer called buffer just for this purpose. The previous example can be rewritten as follows:
(-> (rx/seq->o (vec (range 10))) (.buffer 5 1) (rx/subscribe (fn [price] (prn (str "Value: " price)))))
As before, the second argument to buffer is the offset, but it's called skip in the RxJava documentation. If you run this at the REPL, you'll see the following output:
"Value: [0, 1, 2, 3, 4]" "Value: [1, 2, 3, 4, 5]" "Value: [2, 3, 4, 5, 6]" "Value: [3, 4, 5, 6, 7]" "Value: [4, 5, 6, 7, 8]" ...
This is exactly what we want. The only difference is that the buffer method waits until it has enough elements—five, in this case—before proceeding.
Now, we can go back to our program and incorporate this idea with our main function. Here is what it looks like:
(defn -main [& args] (show! main-frame) (let [price-obs (-> (rx/flatmap make-price-obs (Observable/interval 500 TimeUnit/MILLISECONDS)) (.publish)) sliding-buffer-obs (.buffer price-obs 5 1)] (rx/subscribe price-obs (fn [price] (text! price-label (str "Price: " price)))) (rx/subscribe sliding-buffer-obs (fn [buffer] (text! running-avg-label (str "Running average: " (avg buffer))))) (.connect price-obs)))
The preceding snippet works by creating two observables. The first one, price-obs, we created before. The new sliding buffer observable is created using the buffer transformer on price-obs.
Now, we can independently subscribe to each one in order to update the price and rolling average labels. Running the program will display the same screen we saw previously:
You might have noticed two method calls we hadn't seen before: publish and connect.
The publish method returns a connectable observable. This means that the observable won't start emitting values until its connect method has been called. We do this here because we want to make sure that all of the subscribers receive all of the values that were emitted by the original observable.
In conclusion, without much additional code, we implemented all of the requirements in a concise, declarative manner that is easy to maintain and follow. We have also made the previous roll-buffer function completely unnecessary.
The full source code for the CES version of the program is given here for reference:
(ns stock-market-monitor.05frp-price-monitor-rolling-avg (:require [rx.lang.clojure.core :as rx] [seesaw.core :refer :all]) (:import (java.util.concurrent TimeUnit) (rx Observable))) (native!) (def main-frame (frame :title "Stock price monitor" :width 200 :height 100 :on-close :exit)) (def price-label (label "Price: -")) (def running-avg-label (label "Running average: -")) (config! main-frame :content (border-panel :north price-label :center running-avg-label :border 5)) (defn share-price [company-code] (Thread/sleep 200) (rand-int 1000)) (defn avg [numbers] (float (/ (reduce + numbers) (count numbers)))) (defn make-price-obs [_] (rx/return (share-price "XYZ"))) (defn -main [& args] (show! main-frame) (let [price-obs (-> (rx/flatmap make-price-obs (Observable/interval 500 TimeUnit/MILLISECONDS)) (.publish)) sliding-buffer-obs (.buffer price-obs 5 1)] (rx/subscribe price-obs (fn [price] (text! price-label (str "Price: " price)))) (rx/subscribe sliding-buffer-obs (fn [buffer] (text! running-avg-label (str "Running average: " (avg buffer))))) (.connect price-obs)))
Note how, in this version of the program, we didn't have to use a shutdown hook. This is because RxClojure creates daemon threads, which are automatically terminated once the application exits.