|
1 | 1 | (ns ws-intro.core |
2 | 2 | (:require [clojure.data.json :as json] |
3 | | - [clojure.string :as s]) |
4 | | - (:import [org.webbitserver WebServer WebServers WebSocketHandler] |
5 | | - [org.webbitserver.handler StaticFileHandler])) |
| 3 | + [clojure.string :as s] |
| 4 | + [clojure.pprint :as pp]) |
| 5 | + (:import [org.webbitserver WebServer WebServers WebSocketHandler] |
| 6 | + [org.webbitserver.handler StaticFileHandler ] |
| 7 | + [rx Observable Observer Subscription ] |
| 8 | + [rx.subscriptions Subscriptions ] |
| 9 | + [rx.util AtomicObservableSubscription ] |
| 10 | + )) |
| 11 | + |
| 12 | +;;; ================================================================ |
| 13 | + |
| 14 | +(defn ^Observable observable |
| 15 | + "Create an observable from the given handler. When subscribed to, (handler observer) |
| 16 | + is called at which point, handler can start emitting values, etc." |
| 17 | + [handler] |
| 18 | + (Observable/create handler)) |
| 19 | + |
| 20 | +(defn getMock [] (json/read-str (slurp "traffic.json"))) |
| 21 | + |
| 22 | +(defn mockObservable [mock] |
| 23 | + (observable |
| 24 | + (fn [observer] |
| 25 | + (let [f (future |
| 26 | + (-> observer (.onNext "1")) |
| 27 | + (Thread/sleep 1000) |
| 28 | + (-> observer (.onNext "2")) |
| 29 | + (Thread/sleep 1000) |
| 30 | + (-> observer (.onNext "3")) |
| 31 | + (-> observer (.onCompleted))) |
| 32 | + ] |
| 33 | + (Subscriptions/create #(future-cancel f)))))) |
| 34 | + |
| 35 | +;;; ================================================================ |
| 36 | + |
| 37 | +(defn on-message [conn json-message] |
| 38 | + (println "WEBSOCKET MESSAGE" conn json-message) |
| 39 | + (let [msg (-> |
| 40 | + json-message |
| 41 | + json/read-json |
| 42 | + (get-in [:data :message]))] |
| 43 | + (.send conn (json/json-str |
| 44 | + {:type "upcased" |
| 45 | + :message (s/upper-case msg)})) |
| 46 | + )) |
6 | 47 |
|
7 | 48 | (defn -main [] |
8 | 49 | "Thanks to blog.jayfields.com" |
9 | | - (doto (WebServers/createWebServer 8080) |
10 | | - (.add "/websocket" |
11 | | - (proxy [WebSocketHandler] [] |
12 | | - (onOpen [c ] (println "websocket opened" c)) |
13 | | - (onClose [c ] (println "websocket closed" c)) |
14 | | - (onMessage [c j] (println "websocket message" c j)))) |
15 | | - (.add (StaticFileHandler. ".")) |
16 | | - (.start))) |
| 50 | + (pp/pprint (getMock)) |
| 51 | + (let [server (WebServers/createWebServer 8080)] |
| 52 | + (doto server |
| 53 | + (.add "/websocket" |
| 54 | + (proxy [WebSocketHandler] [] |
| 55 | + |
| 56 | + (onOpen [conn] |
| 57 | + (println "WEBSOCKET OPENED" conn) |
| 58 | + (-> (mockObservable (getMock)) |
| 59 | + (.subscribe |
| 60 | + println |
| 61 | + )) |
| 62 | + ) |
| 63 | + |
| 64 | + (onClose [c ] |
| 65 | + (println "WEBSOCKET CLOSED" c) |
| 66 | + ) |
| 67 | + |
| 68 | + (onMessage [c j] |
| 69 | + ;(println "WEBSOCKET MESSAGE" c j) |
| 70 | + (on-message c j) |
| 71 | + ) |
| 72 | + )) |
| 73 | + (.add (StaticFileHandler. ".")) |
| 74 | + (.start)))) |
17 | 75 |
|
18 | 76 | (defn foo |
19 | 77 | "I don't do a whole lot." |
|
0 commit comments