|
73 | 73 | ;; Use a promise for 'completed' so we can wait for it on |
74 | 74 | ;; another thread: |
75 | 75 | onCompletedCollector (promise )] |
76 | | - (letfn [;; When observable sends a value, relay it to our agent: |
| 76 | + (letfn [ ;; When observable sends a value, relay it to our agent: |
77 | 77 | (collect-next [item] (send onNextCollector (fn [state] (conj state item)))) |
78 | 78 | ;; If observable errors out, just set our exception; |
79 | 79 | (collect-error [excp] (reset! onErrorCollector excp)) |
|
82 | 82 | ;; In all cases, report out the back end with this: |
83 | 83 | (report-collectors [ ] |
84 | 84 | (identity ;; pdump |
85 | | - {;; Wait at most 1 second for the promise to complete; |
| 85 | + { ;; Wait at most "wait-time" for the promise to complete; |
86 | 86 | ;; if it does not complete, then produce 'false'. We |
87 | 87 | ;; must wait on the onCompleted BEFORE waiting on the |
88 | 88 | ;; onNext because the agent's await-for in onNext only |
|
235 | 235 | ;;; "distinctUntilChanged", but RxJava 0.9.0 doesn't seem to |
236 | 236 | ;;; have them yet. We can fake them as follows: |
237 | 237 |
|
238 | | -(-> (Observable/toObservable ["one" "two" "three"]) |
| 238 | +(-> (from-seq ["one" "two" "three"]) |
239 | 239 | (.mapMany (comp from-seq string-explode)) |
240 | 240 |
|
241 | 241 | ;; The following two effect an implementation of "distinct". |
|
258 | 258 | (.reduce #{} conj) |
259 | 259 | (.mapMany from-seq))) |
260 | 260 |
|
261 | | -(-> (Observable/toObservable ["one" "two" "three"]) |
| 261 | +(-> (from-seq ["one" "two" "three"]) |
262 | 262 | (.mapMany (comp from-seq string-explode)) |
263 | 263 | distinct |
264 | 264 | subscribe-collectors |
|
284 | 284 | ;;; distinct-until-changed: we only need to remember one back. Still, to make |
285 | 285 | ;;; the point: |
286 | 286 |
|
287 | | -(-> (Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"]) |
| 287 | +(-> (from-seq ["onnnnne" "tttwo" "thhrrrrree"]) |
288 | 288 | (.mapMany (comp from-seq string-explode)) |
289 | 289 |
|
290 | 290 | ;; The following two effect "distinctUntilChanged". |
|
310 | 310 | ;;; must be defined outside the mapMany and the function that mapMany applies. |
311 | 311 | ;;; However, this solution will not materialize the entire input sequence. |
312 | 312 |
|
313 | | -(let [exploded (-> (Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"]) |
| 313 | +(let [exploded (-> (from-seq ["onnnnne" "tttwo" "thhrrrrree"]) |
314 | 314 | (.mapMany (comp from-seq string-explode))) |
315 | 315 | ;; Must define this container outside the mapMany and the function |
316 | 316 | ;; that napMany applies. |
|
325 | 325 | (ref-set last-container [x]) |
326 | 326 | (return x))))))) |
327 | 327 | subscribe-collectors |
328 | | - pdump)) |
| 328 | + pdump)) |
329 | 329 |
|
330 | 330 | ;;; Package and test: |
331 | 331 |
|
|
338 | 338 | (if (and l (= x l)) |
339 | 339 | (Observable/empty) |
340 | 340 | (do |
341 | | - (ref-set last-container [x]) |
| 341 | + (ref-set last-container [x]) |
342 | 342 | (return x)))))))))) |
343 | 343 |
|
344 | | -(-> (Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"]) |
| 344 | +(-> (from-seq ["onnnnne" "tttwo" "thhrrrrree"]) |
345 | 345 | (.mapMany (comp from-seq string-explode)) |
346 | 346 | distinct-until-changed |
347 | 347 | subscribe-collectors |
|
350 | 350 |
|
351 | 351 | ;;; It's well-behaved on an empty input: |
352 | 352 |
|
353 | | -(-> (Observable/toObservable []) |
| 353 | +(-> (from-seq []) |
354 | 354 | (.mapMany (comp from-seq string-explode)) |
355 | 355 | distinct-until-changed |
356 | 356 | subscribe-collectors |
|
0 commit comments