|
31 | 31 | ;;; items from an oseq (observable sequence) by mutating side-effects |
32 | 32 | ;;; (horrors!). |
33 | 33 |
|
34 | | -(defn- subscribe-collectors [obl] |
35 | | - (let [;; Keep a sequence of all values sent: |
| 34 | +(defn- subscribe-collectors [obl & optional-wait-time] |
| 35 | + (let [wait-time |
| 36 | + (if optional-wait-time |
| 37 | + (first optional-wait-time) |
| 38 | + 1000) |
| 39 | + ;; Keep a sequence of all values sent: |
36 | 40 | onNextCollector (agent []) |
37 | 41 | ;; Only need one value if the observable errors out: |
38 | 42 | onErrorCollector (atom nil) |
|
59 | 63 | ;; return too quickly, allowing this onCompleted await |
60 | 64 | ;; to return, losing some messages. This code depends |
61 | 65 | ;; on order-of-evaluation assumptions in the map. |
62 | | - :onCompleted (deref onCompletedCollector 1000 false) |
| 66 | + :onCompleted (deref onCompletedCollector wait-time false) |
63 | 67 | ;; Wait for everything that has been sent to the agent |
64 | 68 | ;; to drain (presumably internal message queues): |
65 | | - :onNext (do (await-for 1000 onNextCollector) |
| 69 | + :onNext (do (await-for wait-time onNextCollector) |
66 | 70 | ;; Then produce the results: |
67 | 71 | @onNextCollector) |
68 | 72 | ;; If we ever saw an error, here it is: |
|
383 | 387 | ;; A subscription that cancels the future if unsubscribed: |
384 | 388 | (Subscriptions/create #(future-cancel f)))))) |
385 | 389 |
|
386 | | -(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"]) |
| 390 | +#_(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"]) |
| 391 | + (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 90) "...")) |
| 392 | + ) |
387 | 393 |
|
388 | | - (.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")) |
| 394 | +#_(-> (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"]) |
| 395 | + (subscribe-collectors) |
| 396 | + (nth 1) |
389 | 397 | ) |
390 | 398 |
|
| 399 | +(->> |
| 400 | + ((subscribe-collectors |
| 401 | + (asynchronousWikipediaArticleObservable ["Atom" "Molecule"]) |
| 402 | + 5000) |
| 403 | + :onNext) |
| 404 | + (map :trace-redirects) |
| 405 | + ) |
| 406 | + |
| 407 | +#_(let [collectors (subscribe-collectors (asynchronousWikipediaArticleObservable ["Tiger" "Elephant"]))] |
| 408 | + (map |
| 409 | + (fn [collector] |
| 410 | + (into |
| 411 | + collector |
| 412 | + {:body (subs (:body collector) 0 90)})) |
| 413 | + collectors)) |
| 414 | + |
391 | 415 | (+ 4 3) |
0 commit comments