|
34 | 34 | ;;; items from an oseq (observable sequence) by mutating side-effects |
35 | 35 | ;;; (horrors!). |
36 | 36 |
|
| 37 | +(defn- or-default [val default] (if val (first val) default)) |
| 38 | + |
37 | 39 | (defn- subscribe-collectors [obl & optional-wait-time] |
38 | | - (let [wait-time |
39 | | - (if optional-wait-time |
40 | | - (first optional-wait-time) |
41 | | - 1000) |
| 40 | + (let [wait-time (or-default optional-wait-time 1000) |
42 | 41 | ;; Keep a sequence of all values sent: |
43 | 42 | onNextCollector (agent []) |
44 | 43 | ;; Only need one value if the observable errors out: |
|
458 | 457 | ;;; | .` / -_) _| _| | \ \ / \ V /| / _` / -_) _ (_-< |
459 | 458 | ;;; |_|\_\___|\__|_| |_|_/_\_\ \_/ |_\__,_\___\___/__/ |
460 | 459 |
|
461 | | -(defn getUser [userId] |
462 | | - "Asynchronously fetch user data |
463 | | -
|
464 | | - return Observable<Map>" |
465 | | - (Observable/create |
466 | | - (fn [observer] |
467 | | - (let [f (future |
468 | | - (try |
469 | | - ;; simulate fetching user data via network service call with latency |
470 | | - (Thread/sleep 60) |
471 | | - (-> observer |
472 | | - (.onNext {:user-id userId |
473 | | - :name "Sam Harris" |
474 | | - :preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") })) |
475 | | - (-> observer .onCompleted) |
476 | | - (catch Exception e (-> observer (.onError e))))) ] |
477 | | - ;; a subscription that cancels the future if unsubscribed |
478 | | - (Subscriptions/create #(future-cancel f)))))) |
| 460 | +(defn simulatedSlowMapObjectObservable [nullaryFnToMapObject & optionalDelayMSec] |
| 461 | + (let [delay (or-default optionalDelayMSec 50)] |
| 462 | + (Observable/create |
| 463 | + (fn [observer] |
| 464 | + (let [f (future |
| 465 | + (try |
| 466 | + ;; simulate fetching user data via network service call with latency |
| 467 | + (Thread/sleep delay) |
| 468 | + (-> observer (.onNext (nullaryFnToMapObject))) |
| 469 | + (-> observer .onCompleted) |
| 470 | + (catch Exception e (-> observer (.onError e))))) ] |
| 471 | + ;; a subscription that cancels the future if unsubscribed |
| 472 | + (Subscriptions/create #(future-cancel f))))))) |
479 | 473 |
|
| 474 | +(defn getUser [userId] |
| 475 | + "Asynchronously fetch user data. Returns Observable<Map>" |
| 476 | + (simulatedSlowMapObjectObservable |
| 477 | + (fn [] |
| 478 | + {:user-id userId |
| 479 | + :name "Sam Harris" |
| 480 | + :preferred-language (if (= 0 (rand-int 2)) "en-us" "es-us") }) |
| 481 | + 60)) |
480 | 482 |
|
481 | 483 | (defn getVideoBookmark [userId, videoId] |
482 | | - "Asynchronously fetch bookmark for video |
483 | | -
|
484 | | - return Observable<Integer>" |
485 | | - (Observable/create |
486 | | - (fn [observer] |
487 | | - (let [f (future |
488 | | - (try |
489 | | - ;; simulate fetching user data via network service call with latency |
490 | | - (Thread/sleep 20) |
491 | | - (-> observer |
492 | | - (.onNext {:video-id videoId |
493 | | - ;; 50/50 chance of giving back position 0 or 0-2500 |
494 | | - :position (if (= 0 (rand-int 2)) 0 (rand-int 2500))})) |
495 | | - (-> observer .onCompleted) |
496 | | - (catch Exception e (-> observer (.onError e)))))] |
497 | | - ;; a subscription that cancels the future if unsubscribed |
498 | | - (Subscriptions/create #(future-cancel f)))))) |
| 484 | + "Asynchronously fetch bookmark for video. Returns Observable<Integer>" |
| 485 | + (simulatedSlowMapObjectObservable |
| 486 | + (fn [] |
| 487 | + {:video-id videoId |
| 488 | + ;; 50/50 chance of giving back position 0 or 0-2500 |
| 489 | + :position (if (= 0 (rand-int 2)) 0 (rand-int 2500))}) |
| 490 | + 20)) |
499 | 491 |
|
500 | 492 | (defn getVideoMetadata [videoId, preferredLanguage] |
501 | | - "Asynchronously fetch movie metadata for a given language |
502 | | -
|
503 | | - return Observable<Map>" |
504 | | - (Observable/create |
505 | | - (fn [observer] |
506 | | - (let [f (future |
507 | | - (try |
508 | | - ;; simulate fetching video data via network service call with latency |
509 | | - (Thread/sleep 50) |
510 | | - ;; contrived metadata for en-us or es-us |
511 | | - (if (= "en-us" preferredLanguage) |
512 | | - (-> observer (.onNext {:video-id videoId |
513 | | - :title "House of Cards: Episode 1" |
514 | | - :director "David Fincher" |
515 | | - :duration 3365}))) |
516 | | - (if (= "es-us" preferredLanguage) |
517 | | - (-> observer (.onNext {:video-id videoId |
518 | | - :title "Cámara de Tarjetas: Episodio 1" |
519 | | - :director "David Fincher" |
520 | | - :duration 3365}))) |
521 | | - (-> observer .onCompleted) |
522 | | - (catch Exception e (-> observer (.onError e))))) ] |
523 | | - ;; a subscription that cancels the future if unsubscribed |
524 | | - (Subscriptions/create #(future-cancel f)))))) |
| 493 | + "Asynchronously fetch movie metadata for a given language. Return Observable<Map>" |
| 494 | + (simulatedSlowMapObjectObservable |
| 495 | + (fn [] |
| 496 | + (if (= "en-us" preferredLanguage) |
| 497 | + {:video-id videoId |
| 498 | + :title "House of Cards: Episode 1" |
| 499 | + :director "David Fincher" |
| 500 | + :duration 3365}) |
| 501 | + (if (= "es-us" preferredLanguage) |
| 502 | + {:video-id videoId |
| 503 | + :title "Cámara de Tarjetas: Episodio 1" |
| 504 | + :director "David Fincher" |
| 505 | + :duration 3365})) |
| 506 | + 50)) |
525 | 507 |
|
526 | 508 |
|
527 | 509 | (defn getVideoForUser [userId videoId] |
528 | 510 | "Get video metadata for a given userId |
529 | 511 | - video metadata |
530 | 512 | - video bookmark position |
531 | 513 | - user data |
532 | | - return Observable<Map>" |
| 514 | + Returns Observable<Map>" |
533 | 515 | (let [user-observable |
534 | 516 | (-> (getUser userId) |
535 | 517 | (.map (fn [user] {:user-name (:name user) |
|
0 commit comments