Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Retry

The last error handling combinator we'll examine is retry. This combinator is useful when we know that an error or exception is only transient, so we should probably give it another shot by resubscribing to the observable.

First, we'll create an observable that fails when it is subscribed to for the first time. However, the next time it is subscribed to, it succeeds and emits a new item:

(defn retry-obs [] 
  (let [errored (atom false)] 
    (rx/observable* 
     (fn [observer] 
       (if @errored 
         (rx/on-next observer 20) 
         (do (reset! errored true) 
             (throw (Exception. "Oops. Something went wrong")))))))) 

Let's see what happens if we simply subscribe to it:

(rx/subscribe (retry-obs) 
              (fn [v] (prn-to-repl "result is " v))) 
 
;; Exception Oops. Something went wrong  rx-playground.core/retry-obs/fn--1476 

As expected, the exception simply bubbles up, as in our first example. However, we know—for the purposes of this example—that this is a transient failure. Let's see what changes if we use retry:

(rx/subscribe (->> (retry-obs) 
                   (.retry)) 
              (fn [v] (prn-to-repl "result is " v))) 
 
;; "result is " 20 

Now, our code is responsible for retrying the observable and, as expected, we get the correct output.

It's important to note that retry will attempt to resubscribe indefinitely until it succeeds. This might not be what you want, so Rx provides a variation, called retryWith, which allows us to specify a predicate function that controls when and if retrying should stop.

All of these operators give us the tools we need to build reliable reactive applications, and we should always keep them in mind since they are, without a doubt, a great addition to our toolbox. The RxJava wiki on the subject should be referred to for more information: https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators.