|
| 1 | +;;; Load this file into LightTable (using LightTable's workspace tab) |
| 2 | +;;; and "make current editor an instarepl" (using LightTable's command |
| 3 | +;;; tab). |
| 4 | + |
1 | 5 | (ns expt1.core |
2 | 6 | (:require [expt1.k2 :as k2] |
3 | 7 | clojure.string |
|
6 | 10 | (:import [rx Observable subscriptions.Subscriptions]) |
7 | 11 | ) |
8 | 12 |
|
9 | | -;;; Load this file into LightTable (using the workspace tab) and "make |
10 | | -;;; current editor an instarepl" (using the command tab). |
| 13 | +(defmacro pdump [x] |
| 14 | + `(let [x# ~x] |
| 15 | + (do (println "----------------") |
| 16 | + (clojure.pprint/pprint '~x) |
| 17 | + (println "~~>") |
| 18 | + (clojure.pprint/pprint x#) |
| 19 | + (println "----------------") |
| 20 | + x#))) |
| 21 | + |
| 22 | +;;; |
| 23 | +;;; ___ _ ___ _ |
| 24 | +;;; / __|___ _ _ ___ _ _(_)__ / _ \| |__ ___ ___ _ ___ _____ _ _ |
| 25 | +;;; | (_ / -_) ' \/ -_) '_| / _| | (_) | '_ (_-</ -_) '_\ V / -_) '_| |
| 26 | +;;; \___\___|_||_\___|_| |_\__| \___/|_.__/__/\___|_| \_/\___|_| |
11 | 27 |
|
12 | 28 | ;;; The current rx library has none of the co-monadic operators such |
13 | 29 | ;;; as "first" and "last". Let us make atomic, external collectors for |
14 | 30 | ;;; extracting items from an oseq (observable sequence) by mutating |
15 | 31 | ;;; side-effects (horrors!). |
16 | 32 |
|
17 | 33 | (defn- subscribe-collectors [obl] |
18 | | - (let [onNextCollector (atom []) |
| 34 | + (let [;; Keep a sequence of all values sent: |
| 35 | + onNextCollector (agent []) |
| 36 | + ;; Only need one value if the observable errors out: |
19 | 37 | onErrorCollector (atom nil) |
| 38 | + ;; Use a promise for 'completed' so we can wait for it on |
| 39 | + ;; another thread: |
20 | 40 | onCompletedCollector (promise )] |
21 | | - (letfn [(collect-next [item] (swap! onNextCollector conj item)) |
| 41 | + (letfn [;; When observable sends a value, relay it to our agent: |
| 42 | + (collect-next [item] (send onNextCollector (fn [state] (conj state item)))) |
| 43 | + ;; If observable errors out, just set our exception; |
22 | 44 | (collect-error [excp] (reset! onErrorCollector excp)) |
| 45 | + ;; When observable completes, deliver on the promise: |
23 | 46 | (collect-completed [ ] (deliver onCompletedCollector true)) |
| 47 | + ;; In all cases, report out the back end with this: |
24 | 48 | (report-collectors [ ] |
25 | | - {:onNext @onNextCollector |
26 | | - :onError @onErrorCollector |
27 | | - :onCompleted (deref onCompletedCollector 1000 false)})] |
| 49 | + (pdump |
| 50 | + {;; Wait at most 1 second for the promise to complete; |
| 51 | + ;; if it does not complete, then produce 'false'. We |
| 52 | + ;; must wait on the onCompleted BEFORE waiting on the |
| 53 | + ;; onNext because the agent's await-for in onNext only |
| 54 | + ;; waits for messages sent to the agent from THIS |
| 55 | + ;; thread, and our asynchronous observable may be |
| 56 | + ;; sending messages to the agent from another thread, |
| 57 | + ;; say, a future's thread. The agent's await-for will |
| 58 | + ;; return too quickly, allowing this onCompleted await |
| 59 | + ;; to return, losing some messages. This code depends |
| 60 | + ;; on order-of-evaluation assumptions in the map. |
| 61 | + :onCompleted (deref onCompletedCollector 1000 false) |
| 62 | + ;; Wait for everything that has been sent to the agent |
| 63 | + ;; to drain (presumably internal message queues): |
| 64 | + :onNext (do (await-for 1000 onNextCollector) |
| 65 | + ;; Then produce the results: |
| 66 | + @onNextCollector) |
| 67 | + ;; If we ever saw an error, here it is: |
| 68 | + :onError @onErrorCollector |
| 69 | + }))] |
| 70 | + ;; Recognize that the observable 'obl' may run on another thread: |
28 | 71 | (-> obl |
29 | 72 | (.subscribe collect-next collect-error collect-completed)) |
30 | | - (report-collectors) |
31 | | - ))) |
32 | | - |
| 73 | + ;; Therefore, produce results that wait, with timeouts, on both |
| 74 | + ;; the completion event and on the draining of the (presumed) |
| 75 | + ;; message queue to the agent. |
| 76 | + (report-collectors)))) |
33 | 77 |
|
34 | 78 | ;;; ___ _ _ _ _ |
35 | 79 | ;;; / __| |_ _ _(_)_ _ | |_(_)_ _ __ _ |
|
38 | 82 | ;;; |___/ |
39 | 83 |
|
40 | 84 |
|
41 | | -;;; First, let's just take the first two numbers out of a vector of |
| 85 | +;;; There is a class of operators for shrinking a sequence. They |
| 86 | +;;; include "take", "takeUntil", etc.; "skip*"; and "filter". To |
| 87 | +;;; start, let's just take the first two numbers out of a vector of |
42 | 88 | ;;; numbers and turn them into oseq. This illustrates "take", a method |
43 | 89 | ;;; that often shortens sequences. |
44 | 90 |
|
|
133 | 179 | (subscribe-collectors) |
134 | 180 | ) |
135 | 181 |
|
136 | | - |
137 | 182 | ;;; _ _ _ _ _ |
138 | 183 | ;;; __| (_)__| |_(_)_ _ __| |_ |
139 | 184 | ;;; / _` | (_-< _| | ' \/ _| _| |
|
0 commit comments