aboutsummaryrefslogtreecommitdiff
path: root/third_party/websockify/other/websockify.clj
blob: ad1cc3365ec71333da4bf014926cac670f19ba2b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
(ns websockify
  ;(:use ring.adapter.jetty)
  (:require [clojure.tools.cli :as cli]
            [clojure.string :as string])

  (:import
   
   ;; Netty TCP Client 
   [java.util.concurrent Executors]
   [java.net InetSocketAddress]
   [org.jboss.netty.channel
    Channels SimpleChannelHandler ChannelPipelineFactory]
   [org.jboss.netty.buffer ChannelBuffers]
   [org.jboss.netty.channel.socket.nio NioClientSocketChannelFactory]
   [org.jboss.netty.bootstrap ClientBootstrap]
   [org.jboss.netty.handler.codec.base64 Base64]
   [org.jboss.netty.util CharsetUtil]
   
   ;; Jetty WebSocket Server
   [org.eclipse.jetty.server Server]
   [org.eclipse.jetty.server.nio BlockingChannelConnector]
   [org.eclipse.jetty.servlet
    ServletContextHandler ServletHolder DefaultServlet]
   [org.eclipse.jetty.websocket
    WebSocket WebSocket$OnTextMessage
    WebSocketClientFactory WebSocketClient WebSocketServlet]))


;; TCP / NIO

;; (defn tcp-channel [host port]
;;   (try
;;     (let [address (InetSocketAddress. host port)
;;          channel (doto (SocketChannel/open)
;;                    (.connect address))]
;;       channel)
;;     (catch Exception e
;;       (println (str "Failed to connect to'" host ":" port "':" e))
;;       nil)))

;; http://docs.jboss.org/netty/3.2/guide/html/start.html#d0e51
;; http://stackoverflow.com/questions/5453602/highly-concurrent-http-with-netty-and-nio
;; https://github.com/datskos/ring-netty-adapter/blob/master/src/ring/adapter/netty.clj


(defn netty-client [host port open close message]
  (let [handler (proxy [SimpleChannelHandler] []
                  (channelConnected [ctx e] (open ctx e))
                  (channelDisconnected [ctx e] (close ctx e))
                  (messageReceived [ctx e] (message ctx e))
                  (exceptionCaught [ctx e]
                    (println "exceptionCaught:" e)))
        pipeline (proxy [ChannelPipelineFactory] []
                   (getPipeline []
                     (doto (Channels/pipeline)
                       (.addLast "handler" handler))))
        bootstrap (doto (ClientBootstrap.
                         (NioClientSocketChannelFactory.
                          (Executors/newCachedThreadPool)
                          (Executors/newCachedThreadPool)))
                    (.setPipelineFactory pipeline)
                    (.setOption "tcpNoDelay" true)
                    (.setOption "keepAlive" true))
        channel-future (.connect bootstrap (InetSocketAddress. host port))
        channel (.. channel-future (awaitUninterruptibly) (getChannel))]
    channel))



;; WebSockets

;; http://wiki.eclipse.org/Jetty/Feature/WebSockets
(defn make-websocket-servlet [open close message]
  (proxy [WebSocketServlet] []
    (doGet [request response]
      ;;(println "doGet" request)
      (.. (proxy-super getServletContext)
          (getNamedDispatcher (proxy-super getServletName))
          (forward request response)))
    (doWebSocketConnect [request response]
      (println "doWebSocketConnect")
      (reify WebSocket$OnTextMessage
        (onOpen [this connection] (open this connection))
        (onClose [this code message] (close this code message))
        (onMessage [this data] (message this data))))))

(defn websocket-server
  [port & {:keys [open close message ws-path web]
           :or {open (fn [_ conn]
                       (println "New websocket client:" conn))
                close (fn [_ code reason]
                        (println "Websocket client closed:" code reason))
                message (fn [_ data]
                          (println "Websocket message:" data))
                
                ws-path "/websocket"}}]
  (let [http-servlet (doto (ServletHolder. (DefaultServlet.))
                       (.setInitParameter "dirAllowed" "true")
                       (.setInitParameter "resourceBase" web))
        ws-servlet (ServletHolder.
                    (make-websocket-servlet open close message))
        context (doto (ServletContextHandler.)
                  (.setContextPath "/")
                  (.addServlet ws-servlet ws-path))
        connector (doto (BlockingChannelConnector.)
                    (.setPort port)
                    (.setMaxIdleTime Integer/MAX_VALUE))
        server (doto (Server.)
                 (.setHandler context)
                 (.addConnector connector))]
    
    (when web (.addServlet context http-servlet "/"))
    server))



;; Websockify

(defonce settings (atom {}))

;; WebSocket client to TCP target mappings

(defonce clients (atom {}))
(defonce targets (atom {}))


(defn target-open [ctx e]
  (println "Connected to target")
  #_(println "channelConnected:" e))

(defn target-close [ctx e]
  #_(println "channelDisconnected:" e)
  (println "Target closed")
  (when-let [channel (get @targets (.getChannel ctx))]
    (.disconnect channel)))

(defn target-message [ctx e]
  (let [channel (.getChannel ctx)
        client (get @targets channel)
        msg (.getMessage e)
        len (.readableBytes msg)
        b64 (Base64/encode msg false)
        blen (.readableBytes b64)]
    #_(println "received" len "bytes from target")
    #_(println "target receive:" (.toString msg 0 len CharsetUtil/UTF_8))
    #_(println "sending to client:" (.toString b64 0 blen CharsetUtil/UTF_8))
    (.sendMessage client (.toString b64 0 blen CharsetUtil/UTF_8))))

(defn client-open [this connection]
  #_(println "Got WebSocket connection:" connection)
  (println "New client")
  (let [target (netty-client
                (:target-host @settings)
                (:target-port @settings)
                target-open target-close target-message)]
    (swap! clients assoc this {:client connection
                               :target target})
    (swap! targets assoc target connection)))

(defn client-close [this code message]
  (println "WebSocket connection closed")
  (when-let [target (:target (get @clients this))]
    (println "Closing target")
    (.close target)
    (println "Target closed")
    (swap! targets dissoc target))
  (swap! clients dissoc this))

(defn client-message [this data]
  #_(println "WebSocket onMessage:" data)
  (let [target (:target (get @clients this))
        cbuf (ChannelBuffers/copiedBuffer data CharsetUtil/UTF_8)
        decbuf (Base64/decode cbuf)
        rlen (.readableBytes decbuf)]
    #_(println "Sending" rlen "bytes to target")
    #_(println "Sending to target:" (.toString decbuf 0 rlen CharsetUtil/UTF_8))
    (.write target decbuf)))

(defn start-websockify
  [& {:keys [listen-port target-host target-port web]
      :or {listen-port 6080
           target-host "localhost"
           target-port 5900
           }}]
  
  (reset! clients {})
  (reset! targets {})

  (reset! settings {:target-host target-host
                    :target-port target-port})
  (let [server (websocket-server listen-port
                                 :web web
                                 :ws-path "/websockify"
                                 :open client-open
                                 :close client-close
                                 :message client-message)]
    
    (.start server)
    
    (if web
      (println "Serving web requests from:" web)
      (println "Not serving web requests"))
    
    (defn stop-websockify []
      (doseq [client (vals @clients)]
        (.disconnect (:client client))
        (.close (:target client)))
      (.stop server)
      (reset! clients {})
      (reset! targets {})
      nil)))

(defn -main [& args]
  (let [[options args banner]
        (cli/cli
         args
         ["-v" "--[no-]verbose" "Verbose output"]
         ["--web" "Run webserver with root at given location"]
         ["-h" "--help" "Show help" :default false :flag true]
         )]
    (when (or (:help options)
              (not= 2 (count args)))
      (println banner)
      (System/exit 0))
    (println options)
    (println args)
    (let [target (second args)
          [target-host target-port] (string/split target #":")]
      (start-websockify :listen-port (Integer/parseInt (first args))
                        :target-host target-host 
                        :target-port (Integer/parseInt target-port)
                        :web (:web options))))
  nil)