File tree Expand file tree Collapse file tree
Expand file tree Collapse file tree Original file line number Diff line number Diff line change 33 clojure.string
44 clojure.pprint)
55 (:refer-clojure :exclude [distinct])
6- (:import [rx Observable])
6+ (:import [rx Observable subscriptions.Subscriptions ])
77 )
88
99; ;; Load this file into LightTable (using the workspace tab) and "make
1717(defn- subscribe-collectors [obl]
1818 (let [onNextCollector (atom [])
1919 onErrorCollector (atom nil )
20- onCompletedCollector (atom false )
21- awaiter (agent false )]
22- (letfn [(collect-next [item] (swap! onNextCollector conj item))
23- (collect-error [excp] (reset! onErrorCollector excp))
24- (collect-completed [ ] (reset! onCompletedCollector true )
25- (set awaiter (fn [_] true )))
26- (report-collectors [ ] {:onNext @onNextCollector
27- :onError @onErrorCollector
28- :onCompleted @onCompletedCollector})]
20+ onCompletedCollector (promise )]
21+ (letfn [(collect-next [item] (swap! onNextCollector conj item))
22+ (collect-error [excp] (reset! onErrorCollector excp))
23+ (collect-completed [ ] (deliver onCompletedCollector true ))
24+ (report-collectors [ ]
25+ {:onNext @onNextCollector
26+ :onError @onErrorCollector
27+ :onCompleted (deref onCompletedCollector 1000 false )})]
2928 (-> obl
3029 (.subscribe collect-next collect-error collect-completed))
31- (if (await-for 10000 awaiter)
32- (report-collectors )
33- (throw (Exception. " observable timed out (10 sec)" ))))
34- ))
30+ (report-collectors )
31+ )))
3532
3633
3734; ;; ___ _ _ _ _
You can’t perform that action at this time.
0 commit comments