|
27 | 27 | ;;; |___/_||_|_| |_|_||_|_\_\_|_||_\__, | |
28 | 28 | ;;; |___/ |
29 | 29 |
|
| 30 | + |
30 | 31 | ;;; First, let's just take the first two numbers out of a vector of |
31 | 32 | ;;; numbers and turn them into oseq. This illustrates "take", a method |
32 | 33 | ;;; that often shortens sequences. |
|
46 | 47 | ;;; \___|_| \___/\_/\_/|_|_||_\__, | |
47 | 48 | ;;; |___/ |
48 | 49 |
|
| 50 | + |
49 | 51 | ;;; Now, let's transform each number x into a vector of numbers, adding |
50 | 52 | ;;; x to some familiar constants, then flattening the results exactly |
51 | 53 | ;;; one time. This is the way to grow a shorter sequence into a longer |
|
93 | 95 |
|
94 | 96 | @collector |
95 | 97 |
|
| 98 | +;;; __ |
| 99 | +;;; / _|_ _ ___ _ __ ___ ___ ___ __ _ |
| 100 | +;;; | _| '_/ _ \ ' \___(_-</ -_) _` | |
| 101 | +;;; |_| |_| \___/_|_|_| /__/\___\__, | |
| 102 | +;;; |_| |
| 103 | + |
| 104 | + |
96 | 105 | ;;; We'd like to clean up the ugly #(Observable/toObservable ...) into |
97 | 106 | ;;; a composition, but we can't (comp Observable/toObservable ...) since |
98 | 107 | ;;; it's a Java method and does not implement Clojure IFn. We fix this |
|
109 | 118 |
|
110 | 119 | @collector |
111 | 120 |
|
| 121 | +;;; _ |
| 122 | +;;; _ _ ___| |_ _ _ _ _ _ _ |
| 123 | +;;; | '_/ -_) _| || | '_| ' \ |
| 124 | +;;; |_| \___|\__|\_,_|_| |_||_| |
| 125 | + |
| 126 | + |
112 | 127 | ;;; We notice that the monadic "return" is missing from "rxjava 0.9.0", |
113 | 128 | ;;; so we add it as follows. This is doing some junk-work -- puts the |
114 | 129 | ;;; item in a vector just so we can take it out again into an obl. |
|
126 | 141 |
|
127 | 142 | @collector |
128 | 143 |
|
| 144 | +;;; _ _ _ _ _ |
| 145 | +;;; __| (_)__| |_(_)_ _ __| |_ |
| 146 | +;;; / _` | (_-< _| | ' \/ _| _| |
| 147 | +;;; \__,_|_/__/\__|_|_||_\__|\__| |
| 148 | + |
| 149 | + |
129 | 150 | ;;; Rx is supposed to have a couple of operators: "disinct" and |
130 | 151 | ;;; "distinctUntilChanged", but RxJava 0.9.0 doesn't seem to |
131 | 152 | ;;; have them yet. We can fake them as follows: |
|
149 | 170 |
|
150 | 171 | @collector |
151 | 172 |
|
152 | | -;;; Now, we package and test. |
| 173 | +;;; Package and test. |
153 | 174 |
|
154 | 175 | (defn distinct [oseq] |
155 | 176 | (-> oseq |
|
166 | 187 |
|
167 | 188 | @collector |
168 | 189 |
|
| 190 | +;;; _ _ _ _ _ |
| 191 | +;;; __| (_)__| |_(_)_ _ __| |_ |
| 192 | +;;; / _` | (_-< _| | ' \/ _| _| |
| 193 | +;;; \__,_|_/__/\__|_|_||_\__|\__| |
| 194 | +;;; _ _ _ _ _ ___ _ _ |
| 195 | +;;; | | | |_ _| |_(_) |/ __| |_ __ _ _ _ __ _ ___ __| | |
| 196 | +;;; | |_| | ' \ _| | | (__| ' \/ _` | ' \/ _` / -_) _` | |
| 197 | +;;; \___/|_||_\__|_|_|\___|_||_\__,_|_||_\__, \___\__,_| |
| 198 | +;;; |___/ |
| 199 | + |
| 200 | + |
| 201 | +;;; The following solution is correct but unacceptable because it consumes |
| 202 | +;;; the entire source oseq before producing values. Such is not necessary |
| 203 | +;;; with distinct-until-changed: we only need to remember one back. Still, |
| 204 | +;;; to make the point: |
169 | 205 |
|
170 | 206 | (reset! collector []) |
171 | 207 | (-> |
|
178 | 214 | (if (and l (= x l)) ; accounts for legit nils |
179 | 215 | acc |
180 | 216 | (conj acc x))))) |
181 | | - ;; We now have a singleton obl containing a set of unique characters. |
182 | | - ;; To promote this back into an obl of chars, we do: |
| 217 | + ;; We now have a singleton obl containing representatives of runs of non- |
| 218 | + ;; distinct characters. Slurp it back into the monad: |
183 | 219 | (.mapMany from-seq) |
184 | 220 |
|
185 | 221 | (.subscribe collect) |
|
189 | 225 |
|
190 | 226 | (reset! collector []) |
191 | 227 |
|
| 228 | +;;; Better is to keep a mutable buffer of length one. It could be an atom |
| 229 | +;;; if we had the opposite of "compare-and-set!"; an atomic primitive that |
| 230 | +;;; sets the value only if it's NOT equal to its current value. "compare-and |
| 231 | +;;; set!" sets the atom to a newval if its current value is equal to an |
| 232 | +;;; oldval. It's easy enough to get the desired semantics with a Ref and |
| 233 | +;;; software-transactional memory, the only wrinkle being that the container |
| 234 | +;;; must be defined outside the mapMany and the function that mapMany applies. |
| 235 | +;;; However, this solution will not materialize the entire input sequence. |
| 236 | + |
192 | 237 | (let [exploded (-> |
193 | 238 | (Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"]) |
194 | 239 | (.mapMany (comp from-seq string-explode)) |
|
208 | 253 | (.subscribe collect))) |
209 | 254 | @collector |
210 | 255 |
|
| 256 | +;;; Package and test: |
| 257 | + |
| 258 | +(defn distinct-until-changed [oseq] |
| 259 | + (let [last-container (ref [])] |
| 260 | + (-> oseq |
| 261 | + (.mapMany (fn [x] |
| 262 | + (dosync |
| 263 | + (let [l (last @last-container)] |
| 264 | + (if (and l (= x l)) |
| 265 | + (Observable/empty) |
| 266 | + (do |
| 267 | + (ref-set last-container [x]) |
| 268 | + (return x)))))))))) |
| 269 | + |
211 | 270 | (reset! collector []) |
| 271 | +(-> |
| 272 | + (Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"]) |
| 273 | + (.mapMany (comp from-seq string-explode)) |
| 274 | + (distinct-until-changed) |
| 275 | + (.subscribe collect) |
| 276 | +) |
| 277 | +@collector |
212 | 278 |
|
| 279 | +;;; It's well-behaved on an empty input: |
| 280 | + |
| 281 | +(reset! collector []) |
| 282 | +(-> |
| 283 | + (Observable/toObservable []) |
| 284 | + (.mapMany (comp from-seq string-explode)) |
| 285 | + (distinct-until-changed) |
| 286 | + (.subscribe collect) |
| 287 | +) |
213 | 288 | @collector |
214 | | -(defn -main |
215 | | - [& args] |
216 | 289 |
|
217 | | - (-> |
218 | | - (k2/existingDataFromNumbers) |
219 | | - (Observable/filter (fn [x] (= 0 (mod x 2)))) |
220 | | - (.subscribe println)) |
| 290 | +;;; ___ _ _ ___ _ |
| 291 | +;;; / _ \| |_| |_ ___ _ _ | __|_ ____ _ _ __ _ __| |___ ___ |
| 292 | +;;; | (_) | _| ' \/ -_) '_| | _|\ \ / _` | ' \| '_ \ / -_|_-< |
| 293 | +;;; \___/ \__|_||_\___|_| |___/_\_\__,_|_|_|_| .__/_\___/__/ |
| 294 | +;;; |_| |
| 295 | + |
221 | 296 |
|
222 | | - (.subscribe (k2/customObservableBlocking) println) |
| 297 | +(reset! collector []) |
| 298 | +(.subscribe (k2/customObservableBlocking) collect) |
| 299 | +@collector |
| 300 | + |
| 301 | +(defn -main |
| 302 | + [& args] |
223 | 303 |
|
224 | 304 | (.subscribe (k2/customObservableNonBlocking) println) |
225 | 305 |
|
|
0 commit comments