Skip to content

Commit 4557bed

Browse files
committed
refactor video sample
1 parent 38a758e commit 4557bed

File tree

1 file changed

+46
-64
lines changed

1 file changed

+46
-64
lines changed

rxjava/expt1/src/expt1/core.clj

Lines changed: 46 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@
3434
;;; items from an oseq (observable sequence) by mutating side-effects
3535
;;; (horrors!).
3636

37+
(defn- or-default [val default] (if val (first val) default))
38+
3739
(defn- subscribe-collectors [obl & optional-wait-time]
38-
(let [wait-time
39-
(if optional-wait-time
40-
(first optional-wait-time)
41-
1000)
40+
(let [wait-time (or-default optional-wait-time 1000)
4241
;; Keep a sequence of all values sent:
4342
onNextCollector (agent [])
4443
;; Only need one value if the observable errors out:
@@ -458,78 +457,61 @@
458457
;;; | .` / -_) _| _| | \ \ / \ V /| / _` / -_) _ (_-<
459458
;;; |_|\_\___|\__|_| |_|_/_\_\ \_/ |_\__,_\___\___/__/
460459

461-
(defn getUser [userId]
462-
"Asynchronously fetch user data
463-
464-
return Observable<Map>"
465-
(Observable/create
466-
(fn [observer]
467-
(let [f (future
468-
(try
469-
;; simulate fetching user data via network service call with latency
470-
(Thread/sleep 60)
471-
(-> observer
472-
(.onNext {:user-id userId
473-
:name "Sam Harris"
474-
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
475-
(-> observer .onCompleted)
476-
(catch Exception e (-> observer (.onError e))))) ]
477-
;; a subscription that cancels the future if unsubscribed
478-
(Subscriptions/create #(future-cancel f))))))
460+
(defn simulatedSlowMapObjectObservable [nullaryFnToMapObject & optionalDelayMSec]
461+
(let [delay (or-default optionalDelayMSec 50)]
462+
(Observable/create
463+
(fn [observer]
464+
(let [f (future
465+
(try
466+
;; simulate fetching user data via network service call with latency
467+
(Thread/sleep delay)
468+
(-> observer (.onNext (nullaryFnToMapObject)))
469+
(-> observer .onCompleted)
470+
(catch Exception e (-> observer (.onError e))))) ]
471+
;; a subscription that cancels the future if unsubscribed
472+
(Subscriptions/create #(future-cancel f)))))))
479473

474+
(defn getUser [userId]
475+
"Asynchronously fetch user data. Returns Observable<Map>"
476+
(simulatedSlowMapObjectObservable
477+
(fn []
478+
{:user-id userId
479+
:name "Sam Harris"
480+
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") })
481+
60))
480482

481483
(defn getVideoBookmark [userId, videoId]
482-
"Asynchronously fetch bookmark for video
483-
484-
return Observable<Integer>"
485-
(Observable/create
486-
(fn [observer]
487-
(let [f (future
488-
(try
489-
;; simulate fetching user data via network service call with latency
490-
(Thread/sleep 20)
491-
(-> observer
492-
(.onNext {:video-id videoId
493-
;; 50/50 chance of giving back position 0 or 0-2500
494-
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
495-
(-> observer .onCompleted)
496-
(catch Exception e (-> observer (.onError e)))))]
497-
;; a subscription that cancels the future if unsubscribed
498-
(Subscriptions/create #(future-cancel f))))))
484+
"Asynchronously fetch bookmark for video. Returns Observable<Integer>"
485+
(simulatedSlowMapObjectObservable
486+
(fn []
487+
{:video-id videoId
488+
;; 50/50 chance of giving back position 0 or 0-2500
489+
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))})
490+
20))
499491

500492
(defn getVideoMetadata [videoId, preferredLanguage]
501-
"Asynchronously fetch movie metadata for a given language
502-
503-
return Observable<Map>"
504-
(Observable/create
505-
(fn [observer]
506-
(let [f (future
507-
(try
508-
;; simulate fetching video data via network service call with latency
509-
(Thread/sleep 50)
510-
;; contrived metadata for en-us or es-us
511-
(if (= "en-us" preferredLanguage)
512-
(-> observer (.onNext {:video-id videoId
513-
:title "House of Cards: Episode 1"
514-
:director "David Fincher"
515-
:duration 3365})))
516-
(if (= "es-us" preferredLanguage)
517-
(-> observer (.onNext {:video-id videoId
518-
:title "Cámara de Tarjetas: Episodio 1"
519-
:director "David Fincher"
520-
:duration 3365})))
521-
(-> observer .onCompleted)
522-
(catch Exception e (-> observer (.onError e))))) ]
523-
;; a subscription that cancels the future if unsubscribed
524-
(Subscriptions/create #(future-cancel f))))))
493+
"Asynchronously fetch movie metadata for a given language. Return Observable<Map>"
494+
(simulatedSlowMapObjectObservable
495+
(fn []
496+
(if (= "en-us" preferredLanguage)
497+
{:video-id videoId
498+
:title "House of Cards: Episode 1"
499+
:director "David Fincher"
500+
:duration 3365})
501+
(if (= "es-us" preferredLanguage)
502+
{:video-id videoId
503+
:title "Cámara de Tarjetas: Episodio 1"
504+
:director "David Fincher"
505+
:duration 3365}))
506+
50))
525507

526508

527509
(defn getVideoForUser [userId videoId]
528510
"Get video metadata for a given userId
529511
- video metadata
530512
- video bookmark position
531513
- user data
532-
return Observable<Map>"
514+
Returns Observable<Map>"
533515
(let [user-observable
534516
(-> (getUser userId)
535517
(.map (fn [user] {:user-name (:name user)

0 commit comments

Comments
 (0)