Skip to content

Commit 38a758e

Browse files
committed
Add Netflix video sample
1 parent cca0c8c commit 38a758e

1 file changed

Lines changed: 132 additions & 19 deletions

File tree

rxjava/expt1/src/expt1/core.clj

Lines changed: 132 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -411,29 +411,29 @@
411411
;;; /_/ \_\/__/\_, |_||_\__|_||_| \_/\_/\___|_.__/ |_| \__,_\__, \___/__/
412412
;;; |__/ |___/
413413

414-
(defn asynchronousWikipediaArticleObservable [names]
415-
"Fetch a list of Wikipedia articles asynchronously.
414+
(defn asynchWikipediaArticle [names]
415+
"Fetch a list of Wikipedia articles asynchronously
416+
with proper error handling.
416417
417418
return Observable<String> of HTML"
418419
(Observable/create
419420
(fn [observer]
420421
(let [f (future
421-
(doseq [name names]
422-
(-> observer
423-
;; Use "enlive" to parse & scrape html:
424-
(.onNext
425-
(html/html-resource
426-
(java.net.URL.
427-
(str "http://en.wikipedia.org/wiki/" name))))
428-
;; Netflix originally used strings, but...
429-
#_(.onNext (http/get
430-
(str "http://en.wikipedia.org/wiki/" name)))
431-
)
432-
)
433-
;; After sending response to onNext, complete the
434-
;; sequence:
422+
(try
423+
(doseq [name names]
424+
(-> observer
425+
;; Use "enlive" to parse & scrape html:
426+
(.onNext
427+
(html/html-resource
428+
(java.net.URL.
429+
(str "http://en.wikipedia.org/wiki/" name))))
430+
;; Netflix originally used strings, but...
431+
))
432+
;; (catch Exception e (prn "exception")))
433+
(catch Exception e (-> observer (.onError e))))
434+
;; after sending response to onNext we complete the sequence
435435
(-> observer .onCompleted))]
436-
;; A subscription that cancels the future if unsubscribed:
436+
;; a subscription that cancels the future if unsubscribed
437437
(Subscriptions/create #(future-cancel f))))))
438438

439439
;;; There is something in the "Atom" web page that xml/parse does not
@@ -447,10 +447,123 @@
447447

448448
(->>
449449
((subscribe-collectors
450-
(asynchronousWikipediaArticleObservable ["Atom" "Molecule"])
450+
(asynchWikipediaArticle ["Lion" "NonExistentTitle" "Bear"])
451451
5000)
452452
:onNext)
453453
(map #(html/select % [:title]))
454454
(pdump))
455455

456-
(pdump (+ 4 3))
456+
;;; _ _ _ __ _ _ __ ___ _
457+
;;; | \| |___| |_ / _| (_)_ __ \ \ / (_)__| |___ ___ ___
458+
;;; | .` / -_) _| _| | \ \ / \ V /| / _` / -_) _ (_-<
459+
;;; |_|\_\___|\__|_| |_|_/_\_\ \_/ |_\__,_\___\___/__/
460+
461+
(defn getUser [userId]
462+
"Asynchronously fetch user data
463+
464+
return Observable<Map>"
465+
(Observable/create
466+
(fn [observer]
467+
(let [f (future
468+
(try
469+
;; simulate fetching user data via network service call with latency
470+
(Thread/sleep 60)
471+
(-> observer
472+
(.onNext {:user-id userId
473+
:name "Sam Harris"
474+
:preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }))
475+
(-> observer .onCompleted)
476+
(catch Exception e (-> observer (.onError e))))) ]
477+
;; a subscription that cancels the future if unsubscribed
478+
(Subscriptions/create #(future-cancel f))))))
479+
480+
481+
(defn getVideoBookmark [userId, videoId]
482+
"Asynchronously fetch bookmark for video
483+
484+
return Observable<Integer>"
485+
(Observable/create
486+
(fn [observer]
487+
(let [f (future
488+
(try
489+
;; simulate fetching user data via network service call with latency
490+
(Thread/sleep 20)
491+
(-> observer
492+
(.onNext {:video-id videoId
493+
;; 50/50 chance of giving back position 0 or 0-2500
494+
:position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}))
495+
(-> observer .onCompleted)
496+
(catch Exception e (-> observer (.onError e)))))]
497+
;; a subscription that cancels the future if unsubscribed
498+
(Subscriptions/create #(future-cancel f))))))
499+
500+
(defn getVideoMetadata [videoId, preferredLanguage]
501+
"Asynchronously fetch movie metadata for a given language
502+
503+
return Observable<Map>"
504+
(Observable/create
505+
(fn [observer]
506+
(let [f (future
507+
(try
508+
;; simulate fetching video data via network service call with latency
509+
(Thread/sleep 50)
510+
;; contrived metadata for en-us or es-us
511+
(if (= "en-us" preferredLanguage)
512+
(-> observer (.onNext {:video-id videoId
513+
:title "House of Cards: Episode 1"
514+
:director "David Fincher"
515+
:duration 3365})))
516+
(if (= "es-us" preferredLanguage)
517+
(-> observer (.onNext {:video-id videoId
518+
:title "Cámara de Tarjetas: Episodio 1"
519+
:director "David Fincher"
520+
:duration 3365})))
521+
(-> observer .onCompleted)
522+
(catch Exception e (-> observer (.onError e))))) ]
523+
;; a subscription that cancels the future if unsubscribed
524+
(Subscriptions/create #(future-cancel f))))))
525+
526+
527+
(defn getVideoForUser [userId videoId]
528+
"Get video metadata for a given userId
529+
- video metadata
530+
- video bookmark position
531+
- user data
532+
return Observable<Map>"
533+
(let [user-observable
534+
(-> (getUser userId)
535+
(.map (fn [user] {:user-name (:name user)
536+
:language (:preferred-language user)})))
537+
bookmark-observable
538+
(-> (getVideoBookmark userId videoId)
539+
(.map (fn [bookmark] {:viewed-position (:position bookmark)})))
540+
541+
;; getVideoMetadata requires :language from user-observable so nest inside map function
542+
video-metadata-observable
543+
(-> user-observable
544+
(.mapMany
545+
;; fetch metadata after a response from user-observable is received
546+
(fn [user-map]
547+
(getVideoMetadata videoId (:language user-map)))))]
548+
;; now combine 3 async sequences using zip
549+
(-> (Observable/zip
550+
bookmark-observable video-metadata-observable user-observable
551+
(fn [bookmark-map metadata-map user-map]
552+
{:bookmark-map bookmark-map
553+
:metadata-map metadata-map
554+
:user-map user-map}))
555+
;; and transform into a single response object
556+
(.map (fn [data]
557+
{:video-id videoId
558+
:video-metadata (:metadata-map data)
559+
:user-id userId
560+
:language (:language (:user-map data))
561+
:bookmark (:viewed-position (:bookmark-map data)) })))))
562+
563+
(-> ( getVideoForUser 12345 78965)
564+
(subscribe-collectors)
565+
(pdump)
566+
)
567+
568+
569+
(pdump (* 6 (+ 4 3)))

0 commit comments

Comments
 (0)