|
411 | 411 | ;;; /_/ \_\/__/\_, |_||_\__|_||_| \_/\_/\___|_.__/ |_| \__,_\__, \___/__/ |
412 | 412 | ;;; |__/ |___/ |
413 | 413 |
|
414 | | -(defn asynchronousWikipediaArticleObservable [names] |
415 | | - "Fetch a list of Wikipedia articles asynchronously. |
| 414 | +(defn asynchWikipediaArticle [names] |
| 415 | + "Fetch a list of Wikipedia articles asynchronously |
| 416 | + with proper error handling. |
416 | 417 |
|
417 | 418 | return Observable<String> of HTML" |
418 | 419 | (Observable/create |
419 | 420 | (fn [observer] |
420 | 421 | (let [f (future |
421 | | - (doseq [name names] |
422 | | - (-> observer |
423 | | - ;; Use "enlive" to parse & scrape html: |
424 | | - (.onNext |
425 | | - (html/html-resource |
426 | | - (java.net.URL. |
427 | | - (str "http://en.wikipedia.org/wiki/" name)))) |
428 | | - ;; Netflix originally used strings, but... |
429 | | - #_(.onNext (http/get |
430 | | - (str "http://en.wikipedia.org/wiki/" name))) |
431 | | - ) |
432 | | - ) |
433 | | - ;; After sending response to onNext, complete the |
434 | | - ;; sequence: |
| 422 | + (try |
| 423 | + (doseq [name names] |
| 424 | + (-> observer |
| 425 | + ;; Use "enlive" to parse & scrape html: |
| 426 | + (.onNext |
| 427 | + (html/html-resource |
| 428 | + (java.net.URL. |
| 429 | + (str "http://en.wikipedia.org/wiki/" name)))) |
| 430 | + ;; Netflix originally used strings, but... |
| 431 | + )) |
| 432 | + ;; (catch Exception e (prn "exception"))) |
| 433 | + (catch Exception e (-> observer (.onError e)))) |
| 434 | + ;; after sending response to onNext we complete the sequence |
435 | 435 | (-> observer .onCompleted))] |
436 | | - ;; A subscription that cancels the future if unsubscribed: |
| 436 | + ;; a subscription that cancels the future if unsubscribed |
437 | 437 | (Subscriptions/create #(future-cancel f)))))) |
438 | 438 |
|
439 | 439 | ;;; There is something in the "Atom" web page that xml/parse does not |
|
447 | 447 |
|
448 | 448 | (->> |
449 | 449 | ((subscribe-collectors |
450 | | - (asynchronousWikipediaArticleObservable ["Atom" "Molecule"]) |
| 450 | + (asynchWikipediaArticle ["Lion" "NonExistentTitle" "Bear"]) |
451 | 451 | 5000) |
452 | 452 | :onNext) |
453 | 453 | (map #(html/select % [:title])) |
454 | 454 | (pdump)) |
455 | 455 |
|
456 | | -(pdump (+ 4 3)) |
| 456 | +;;; _ _ _ __ _ _ __ ___ _ |
| 457 | +;;; | \| |___| |_ / _| (_)_ __ \ \ / (_)__| |___ ___ ___ |
| 458 | +;;; | .` / -_) _| _| | \ \ / \ V /| / _` / -_) _ (_-< |
| 459 | +;;; |_|\_\___|\__|_| |_|_/_\_\ \_/ |_\__,_\___\___/__/ |
| 460 | + |
| 461 | +(defn getUser [userId] |
| 462 | + "Asynchronously fetch user data |
| 463 | +
|
| 464 | + return Observable<Map>" |
| 465 | + (Observable/create |
| 466 | + (fn [observer] |
| 467 | + (let [f (future |
| 468 | + (try |
| 469 | + ;; simulate fetching user data via network service call with latency |
| 470 | + (Thread/sleep 60) |
| 471 | + (-> observer |
| 472 | + (.onNext {:user-id userId |
| 473 | + :name "Sam Harris" |
| 474 | + :preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") })) |
| 475 | + (-> observer .onCompleted) |
| 476 | + (catch Exception e (-> observer (.onError e))))) ] |
| 477 | + ;; a subscription that cancels the future if unsubscribed |
| 478 | + (Subscriptions/create #(future-cancel f)))))) |
| 479 | + |
| 480 | + |
| 481 | +(defn getVideoBookmark [userId, videoId] |
| 482 | + "Asynchronously fetch bookmark for video |
| 483 | +
|
| 484 | + return Observable<Integer>" |
| 485 | + (Observable/create |
| 486 | + (fn [observer] |
| 487 | + (let [f (future |
| 488 | + (try |
| 489 | + ;; simulate fetching user data via network service call with latency |
| 490 | + (Thread/sleep 20) |
| 491 | + (-> observer |
| 492 | + (.onNext {:video-id videoId |
| 493 | + ;; 50/50 chance of giving back position 0 or 0-2500 |
| 494 | + :position (if (= 0 (rand-int 2)) 0 (rand-int 2500))})) |
| 495 | + (-> observer .onCompleted) |
| 496 | + (catch Exception e (-> observer (.onError e)))))] |
| 497 | + ;; a subscription that cancels the future if unsubscribed |
| 498 | + (Subscriptions/create #(future-cancel f)))))) |
| 499 | + |
| 500 | +(defn getVideoMetadata [videoId, preferredLanguage] |
| 501 | + "Asynchronously fetch movie metadata for a given language |
| 502 | +
|
| 503 | + return Observable<Map>" |
| 504 | + (Observable/create |
| 505 | + (fn [observer] |
| 506 | + (let [f (future |
| 507 | + (try |
| 508 | + ;; simulate fetching video data via network service call with latency |
| 509 | + (Thread/sleep 50) |
| 510 | + ;; contrived metadata for en-us or es-us |
| 511 | + (if (= "en-us" preferredLanguage) |
| 512 | + (-> observer (.onNext {:video-id videoId |
| 513 | + :title "House of Cards: Episode 1" |
| 514 | + :director "David Fincher" |
| 515 | + :duration 3365}))) |
| 516 | + (if (= "es-us" preferredLanguage) |
| 517 | + (-> observer (.onNext {:video-id videoId |
| 518 | + :title "Cámara de Tarjetas: Episodio 1" |
| 519 | + :director "David Fincher" |
| 520 | + :duration 3365}))) |
| 521 | + (-> observer .onCompleted) |
| 522 | + (catch Exception e (-> observer (.onError e))))) ] |
| 523 | + ;; a subscription that cancels the future if unsubscribed |
| 524 | + (Subscriptions/create #(future-cancel f)))))) |
| 525 | + |
| 526 | + |
| 527 | +(defn getVideoForUser [userId videoId] |
| 528 | + "Get video metadata for a given userId |
| 529 | + - video metadata |
| 530 | + - video bookmark position |
| 531 | + - user data |
| 532 | + return Observable<Map>" |
| 533 | + (let [user-observable |
| 534 | + (-> (getUser userId) |
| 535 | + (.map (fn [user] {:user-name (:name user) |
| 536 | + :language (:preferred-language user)}))) |
| 537 | + bookmark-observable |
| 538 | + (-> (getVideoBookmark userId videoId) |
| 539 | + (.map (fn [bookmark] {:viewed-position (:position bookmark)}))) |
| 540 | + |
| 541 | + ;; getVideoMetadata requires :language from user-observable so nest inside map function |
| 542 | + video-metadata-observable |
| 543 | + (-> user-observable |
| 544 | + (.mapMany |
| 545 | + ;; fetch metadata after a response from user-observable is received |
| 546 | + (fn [user-map] |
| 547 | + (getVideoMetadata videoId (:language user-map)))))] |
| 548 | + ;; now combine 3 async sequences using zip |
| 549 | + (-> (Observable/zip |
| 550 | + bookmark-observable video-metadata-observable user-observable |
| 551 | + (fn [bookmark-map metadata-map user-map] |
| 552 | + {:bookmark-map bookmark-map |
| 553 | + :metadata-map metadata-map |
| 554 | + :user-map user-map})) |
| 555 | + ;; and transform into a single response object |
| 556 | + (.map (fn [data] |
| 557 | + {:video-id videoId |
| 558 | + :video-metadata (:metadata-map data) |
| 559 | + :user-id userId |
| 560 | + :language (:language (:user-map data)) |
| 561 | + :bookmark (:viewed-position (:bookmark-map data)) }))))) |
| 562 | + |
| 563 | +(-> ( getVideoForUser 12345 78965) |
| 564 | + (subscribe-collectors) |
| 565 | + (pdump) |
| 566 | + ) |
| 567 | + |
| 568 | + |
| 569 | +(pdump (* 6 (+ 4 3))) |
0 commit comments