Skip to content

Commit 1cca179

Browse files
committed
experiment with promise for onCompleted
1 parent 5f3bafc commit 1cca179

1 file changed

Lines changed: 11 additions & 14 deletions

File tree

rxjava/expt1/src/expt1/core.clj

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
clojure.string
44
clojure.pprint)
55
(:refer-clojure :exclude [distinct])
6-
(:import [rx Observable])
6+
(:import [rx Observable subscriptions.Subscriptions])
77
)
88

99
;;; Load this file into LightTable (using the workspace tab) and "make
@@ -17,21 +17,18 @@
1717
(defn- subscribe-collectors [obl]
1818
(let [onNextCollector (atom [])
1919
onErrorCollector (atom nil)
20-
onCompletedCollector (atom false)
21-
awaiter (agent false)]
22-
(letfn [(collect-next [item] (swap! onNextCollector conj item))
23-
(collect-error [excp] (reset! onErrorCollector excp))
24-
(collect-completed [ ] (reset! onCompletedCollector true)
25-
(set awaiter (fn [_] true)))
26-
(report-collectors [ ] {:onNext @onNextCollector
27-
:onError @onErrorCollector
28-
:onCompleted @onCompletedCollector})]
20+
onCompletedCollector (promise )]
21+
(letfn [(collect-next [item] (swap! onNextCollector conj item))
22+
(collect-error [excp] (reset! onErrorCollector excp))
23+
(collect-completed [ ] (deliver onCompletedCollector true))
24+
(report-collectors [ ]
25+
{:onNext @onNextCollector
26+
:onError @onErrorCollector
27+
:onCompleted (deref onCompletedCollector 1000 false)})]
2928
(-> obl
3029
(.subscribe collect-next collect-error collect-completed))
31-
(if (await-for 10000 awaiter)
32-
(report-collectors)
33-
(throw (Exception. "observable timed out (10 sec)"))))
34-
))
30+
(report-collectors)
31+
)))
3532

3633

3734
;;; ___ _ _ _ _

0 commit comments

Comments
 (0)