Skip to content

Commit 441064f

Browse files
committed
variable wait-time
1 parent df7e510 commit 441064f

1 file changed

Lines changed: 30 additions & 6 deletions

File tree

rxjava/expt1/src/expt1/temp.clj

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,12 @@
3131
;;; items from an oseq (observable sequence) by mutating side-effects
3232
;;; (horrors!).
3333

34-
(defn- subscribe-collectors [obl]
35-
(let [;; Keep a sequence of all values sent:
34+
(defn- subscribe-collectors [obl & optional-wait-time]
35+
(let [wait-time
36+
(if optional-wait-time
37+
(first optional-wait-time)
38+
1000)
39+
;; Keep a sequence of all values sent:
3640
onNextCollector (agent [])
3741
;; Only need one value if the observable errors out:
3842
onErrorCollector (atom nil)
@@ -59,10 +63,10 @@
5963
;; return too quickly, allowing this onCompleted await
6064
;; to return, losing some messages. This code depends
6165
;; on order-of-evaluation assumptions in the map.
62-
:onCompleted (deref onCompletedCollector 1000 false)
66+
:onCompleted (deref onCompletedCollector wait-time false)
6367
;; Wait for everything that has been sent to the agent
6468
;; to drain (presumably internal message queues):
65-
:onNext (do (await-for 1000 onNextCollector)
69+
:onNext (do (await-for wait-time onNextCollector)
6670
;; Then produce the results:
6771
@onNextCollector)
6872
;; If we ever saw an error, here it is:
@@ -383,9 +387,29 @@
383387
;; A subscription that cancels the future if unsubscribed:
384388
(Subscriptions/create #(future-cancel f))))))
385389

386-
(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"])
390+
#_(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"])
391+
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 90) "..."))
392+
)
387393

388-
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "..."))
394+
#_(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"])
395+
(subscribe-collectors)
396+
(nth 1)
389397
)
390398

399+
(->>
400+
((subscribe-collectors
401+
(asynchronousWikipediaArticleObservable ["Atom" "Molecule"])
402+
5000)
403+
:onNext)
404+
(map :trace-redirects)
405+
)
406+
407+
#_(let [collectors (subscribe-collectors (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"]))]
408+
(map
409+
(fn [collector]
410+
(into
411+
collector
412+
{:body (subs (:body collector) 0 90)}))
413+
collectors))
414+
391415
(+ 4 3)

0 commit comments

Comments
 (0)