Skip to content

Commit df7e510

Browse files
committed
improvements to the precis
1 parent 1cca179 commit df7e510

11 files changed

Lines changed: 1055 additions & 12 deletions

File tree

rxjava/expt1/src/expt1/core.clj

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
;;; Load this file into LightTable (using LightTable's workspace tab)
2+
;;; and "make current editor an instarepl" (using LightTable's command
3+
;;; tab).
4+
15
(ns expt1.core
26
(:require [expt1.k2 :as k2]
37
clojure.string
@@ -6,30 +10,70 @@
610
(:import [rx Observable subscriptions.Subscriptions])
711
)
812

9-
;;; Load this file into LightTable (using the workspace tab) and "make
10-
;;; current editor an instarepl" (using the command tab).
13+
(defmacro pdump [x]
14+
`(let [x# ~x]
15+
(do (println "----------------")
16+
(clojure.pprint/pprint '~x)
17+
(println "~~>")
18+
(clojure.pprint/pprint x#)
19+
(println "----------------")
20+
x#)))
21+
22+
;;;
23+
;;; ___ _ ___ _
24+
;;; / __|___ _ _ ___ _ _(_)__ / _ \| |__ ___ ___ _ ___ _____ _ _
25+
;;; | (_ / -_) ' \/ -_) '_| / _| | (_) | '_ (_-</ -_) '_\ V / -_) '_|
26+
;;; \___\___|_||_\___|_| |_\__| \___/|_.__/__/\___|_| \_/\___|_|
1127

1228
;;; The current rx library has none of the co-monadic operators such
1329
;;; as "first" and "last". Let us make atomic, external collectors for
1430
;;; extracting items from an oseq (observable sequence) by mutating
1531
;;; side-effects (horrors!).
1632

1733
(defn- subscribe-collectors [obl]
18-
(let [onNextCollector (atom [])
34+
(let [;; Keep a sequence of all values sent:
35+
onNextCollector (agent [])
36+
;; Only need one value if the observable errors out:
1937
onErrorCollector (atom nil)
38+
;; Use a promise for 'completed' so we can wait for it on
39+
;; another thread:
2040
onCompletedCollector (promise )]
21-
(letfn [(collect-next [item] (swap! onNextCollector conj item))
41+
(letfn [;; When observable sends a value, relay it to our agent:
42+
(collect-next [item] (send onNextCollector (fn [state] (conj state item))))
43+
;; If observable errors out, just set our exception;
2244
(collect-error [excp] (reset! onErrorCollector excp))
45+
;; When observable completes, deliver on the promise:
2346
(collect-completed [ ] (deliver onCompletedCollector true))
47+
;; In all cases, report out the back end with this:
2448
(report-collectors [ ]
25-
{:onNext @onNextCollector
26-
:onError @onErrorCollector
27-
:onCompleted (deref onCompletedCollector 1000 false)})]
49+
(pdump
50+
{;; Wait at most 1 second for the promise to complete;
51+
;; if it does not complete, then produce 'false'. We
52+
;; must wait on the onCompleted BEFORE waiting on the
53+
;; onNext because the agent's await-for in onNext only
54+
;; waits for messages sent to the agent from THIS
55+
;; thread, and our asynchronous observable may be
56+
;; sending messages to the agent from another thread,
57+
;; say, a future's thread. The agent's await-for will
58+
;; return too quickly, allowing this onCompleted await
59+
;; to return, losing some messages. This code depends
60+
;; on order-of-evaluation assumptions in the map.
61+
:onCompleted (deref onCompletedCollector 1000 false)
62+
;; Wait for everything that has been sent to the agent
63+
;; to drain (presumably internal message queues):
64+
:onNext (do (await-for 1000 onNextCollector)
65+
;; Then produce the results:
66+
@onNextCollector)
67+
;; If we ever saw an error, here it is:
68+
:onError @onErrorCollector
69+
}))]
70+
;; Recognize that the observable 'obl' may run on another thread:
2871
(-> obl
2972
(.subscribe collect-next collect-error collect-completed))
30-
(report-collectors)
31-
)))
32-
73+
;; Therefore, produce results that wait, with timeouts, on both
74+
;; the completion event and on the draining of the (presumed)
75+
;; message queue to the agent.
76+
(report-collectors))))
3377

3478
;;; ___ _ _ _ _
3579
;;; / __| |_ _ _(_)_ _ | |_(_)_ _ __ _
@@ -38,7 +82,9 @@
3882
;;; |___/
3983

4084

41-
;;; First, let's just take the first two numbers out of a vector of
85+
;;; There is a class of operators for shrinking a sequence. They
86+
;;; include "take", "takeUntil", etc.; "skip*"; and "filter". To
87+
;;; start, let's just take the first two numbers out of a vector of
4288
;;; numbers and turn them into oseq. This illustrates "take", a method
4389
;;; that often shortens sequences.
4490

@@ -133,7 +179,6 @@
133179
(subscribe-collectors)
134180
)
135181

136-
137182
;;; _ _ _ _ _
138183
;;; __| (_)__| |_(_)_ _ __| |_
139184
;;; / _` | (_-< _| | ' \/ _| _|

rxjava/expt1/src/expt1/k2.clj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@
6868
(comment
6969
(.subscribe (customObservableBlocking) println))
7070

71+
(defmacro dump [x]
72+
`(let [x# ~x]
73+
(do (println x#)
74+
x#)))
75+
7176
(defn customObservableNonBlocking []
7277
"This example shows a custom Observable that does not block
7378
when subscribed to as it spawns a separate thread.

0 commit comments

Comments
 (0)