Skip to content

Commit e0ba9fb

Browse files
committed
distinct and distinctUntilChanged implemented and tested
1 parent 4bd682c commit e0ba9fb

2 files changed

Lines changed: 96 additions & 13 deletions

File tree

rxjava/expt1/src/expt1/core.clj

Lines changed: 90 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
;;; |___/_||_|_| |_|_||_|_\_\_|_||_\__, |
2828
;;; |___/
2929

30+
3031
;;; First, let's just take the first two numbers out of a vector of
3132
;;; numbers and turn them into oseq. This illustrates "take", a method
3233
;;; that often shortens sequences.
@@ -46,6 +47,7 @@
4647
;;; \___|_| \___/\_/\_/|_|_||_\__, |
4748
;;; |___/
4849

50+
4951
;;; Now, let's transform each number x into a vector of numbers, adding
5052
;;; x to some familiar constants, then flattening the results exactly
5153
;;; one time. This is the way to grow a shorter sequence into a longer
@@ -93,6 +95,13 @@
9395

9496
@collector
9597

98+
;;; __
99+
;;; / _|_ _ ___ _ __ ___ ___ ___ __ _
100+
;;; | _| '_/ _ \ ' \___(_-</ -_) _` |
101+
;;; |_| |_| \___/_|_|_| /__/\___\__, |
102+
;;; |_|
103+
104+
96105
;;; We'd like to clean up the ugly #(Observable/toObservable ...) into
97106
;;; a composition, but we can't (comp Observable/toObservable ...) since
98107
;;; it's a Java method and does not implement Clojure IFn. We fix this
@@ -109,6 +118,12 @@
109118

110119
@collector
111120

121+
;;; _
122+
;;; _ _ ___| |_ _ _ _ _ _ _
123+
;;; | '_/ -_) _| || | '_| ' \
124+
;;; |_| \___|\__|\_,_|_| |_||_|
125+
126+
112127
;;; We notice that the monadic "return" is missing from "rxjava 0.9.0",
113128
;;; so we add it as follows. This is doing some junk-work -- puts the
114129
;;; item in a vector just so we can take it out again into an obl.
@@ -126,6 +141,12 @@
126141

127142
@collector
128143

144+
;;; _ _ _ _ _
145+
;;; __| (_)__| |_(_)_ _ __| |_
146+
;;; / _` | (_-< _| | ' \/ _| _|
147+
;;; \__,_|_/__/\__|_|_||_\__|\__|
148+
149+
129150
;;; Rx is supposed to have a couple of operators: "disinct" and
130151
;;; "distinctUntilChanged", but RxJava 0.9.0 doesn't seem to
131152
;;; have them yet. We can fake them as follows:
@@ -149,7 +170,7 @@
149170

150171
@collector
151172

152-
;;; Now, we package and test.
173+
;;; Package and test.
153174

154175
(defn distinct [oseq]
155176
(-> oseq
@@ -166,6 +187,21 @@
166187

167188
@collector
168189

190+
;;; _ _ _ _ _
191+
;;; __| (_)__| |_(_)_ _ __| |_
192+
;;; / _` | (_-< _| | ' \/ _| _|
193+
;;; \__,_|_/__/\__|_|_||_\__|\__|
194+
;;; _ _ _ _ _ ___ _ _
195+
;;; | | | |_ _| |_(_) |/ __| |_ __ _ _ _ __ _ ___ __| |
196+
;;; | |_| | ' \ _| | | (__| ' \/ _` | ' \/ _` / -_) _` |
197+
;;; \___/|_||_\__|_|_|\___|_||_\__,_|_||_\__, \___\__,_|
198+
;;; |___/
199+
200+
201+
;;; The following solution is correct but unacceptable because it consumes
202+
;;; the entire source oseq before producing values. Such is not necessary
203+
;;; with distinct-until-changed: we only need to remember one back. Still,
204+
;;; to make the point:
169205

170206
(reset! collector [])
171207
(->
@@ -178,8 +214,8 @@
178214
(if (and l (= x l)) ; accounts for legit nils
179215
acc
180216
(conj acc x)))))
181-
;; We now have a singleton obl containing a set of unique characters.
182-
;; To promote this back into an obl of chars, we do:
217+
;; We now have a singleton obl containing representatives of runs of non-
218+
;; distinct characters. Slurp it back into the monad:
183219
(.mapMany from-seq)
184220

185221
(.subscribe collect)
@@ -189,6 +225,15 @@
189225

190226
(reset! collector [])
191227

228+
;;; Better is to keep a mutable buffer of length one. It could be an atom
229+
;;; if we had the opposite of "compare-and-set!"; an atomic primitive that
230+
;;; sets the value only if it's NOT equal to its current value. "compare-and
231+
;;; set!" sets the atom to a newval if its current value is equal to an
232+
;;; oldval. It's easy enough to get the desired semantics with a Ref and
233+
;;; software-transactional memory, the only wrinkle being that the container
234+
;;; must be defined outside the mapMany and the function that mapMany applies.
235+
;;; However, this solution will not materialize the entire input sequence.
236+
192237
(let [exploded (->
193238
(Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"])
194239
(.mapMany (comp from-seq string-explode))
@@ -208,18 +253,53 @@
208253
(.subscribe collect)))
209254
@collector
210255

256+
;;; Package and test:
257+
258+
(defn distinct-until-changed [oseq]
259+
(let [last-container (ref [])]
260+
(-> oseq
261+
(.mapMany (fn [x]
262+
(dosync
263+
(let [l (last @last-container)]
264+
(if (and l (= x l))
265+
(Observable/empty)
266+
(do
267+
(ref-set last-container [x])
268+
(return x))))))))))
269+
211270
(reset! collector [])
271+
(->
272+
(Observable/toObservable ["onnnnne" "tttwo" "thhrrrrree"])
273+
(.mapMany (comp from-seq string-explode))
274+
(distinct-until-changed)
275+
(.subscribe collect)
276+
)
277+
@collector
212278

279+
;;; It's well-behaved on an empty input:
280+
281+
(reset! collector [])
282+
(->
283+
(Observable/toObservable [])
284+
(.mapMany (comp from-seq string-explode))
285+
(distinct-until-changed)
286+
(.subscribe collect)
287+
)
213288
@collector
214-
(defn -main
215-
[& args]
216289

217-
(->
218-
(k2/existingDataFromNumbers)
219-
(Observable/filter (fn [x] (= 0 (mod x 2))))
220-
(.subscribe println))
290+
;;; ___ _ _ ___ _
291+
;;; / _ \| |_| |_ ___ _ _ | __|_ ____ _ _ __ _ __| |___ ___
292+
;;; | (_) | _| ' \/ -_) '_| | _|\ \ / _` | ' \| '_ \ / -_|_-<
293+
;;; \___/ \__|_||_\___|_| |___/_\_\__,_|_|_|_| .__/_\___/__/
294+
;;; |_|
295+
221296

222-
(.subscribe (k2/customObservableBlocking) println)
297+
(reset! collector [])
298+
(.subscribe (k2/customObservableBlocking) collect)
299+
@collector
300+
301+
(defn -main
302+
[& args]
223303

224304
(.subscribe (k2/customObservableNonBlocking) println)
225305

rxjava/expt1/src/expt1/k2.clj

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@
9696
(fn [observer]
9797
(let [f (future
9898
(doseq [articleName wikipediaArticleNames]
99-
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
99+
(-> observer
100+
(.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
100101
; after sending response to onnext we complete the sequence
101102
(-> observer .onCompleted))]
102103
; a subscription that cancels the future if unsubscribed
@@ -250,7 +251,8 @@
250251
(let [f (future
251252
(try
252253
(doseq [articleName wikipediaArticleNames]
253-
(-> observer (.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
254+
(-> observer
255+
(.onNext (http/get (str "http://en.wikipedia.org/wiki/" articleName)))))
254256
;(catch Exception e (prn "exception")))
255257
(catch Exception e (-> observer (.onError e))))
256258
; after sending response to onNext we complete the sequence
@@ -260,6 +262,7 @@
260262

261263
; To see output
262264
(comment
263-
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling ["Tiger" "NonExistentTitle" "Elephant"])
265+
(-> (fetchWikipediaArticleAsynchronouslyWithErrorHandling
266+
["Tiger" "NonExistentTitle" "Elephant"])
264267
(.subscribe #(println "--- Article ---\n" (subs (:body %) 0 125) "...")
265268
#(println "--- Error ---\n" (.getMessage %)))))

0 commit comments

Comments
 (0)