[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / 22-websocket.js
diff --git a/dgbuilder/core_nodes/io/22-websocket.js b/dgbuilder/core_nodes/io/22-websocket.js
new file mode 100644 (file)
index 0000000..72eda50
--- /dev/null
@@ -0,0 +1,185 @@
+/**\r
+ * Copyright 2013 IBM Corp.\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ **/\r
+\r
+module.exports = function(RED) {\r
+    "use strict";\r
+    var ws = require("ws"),\r
+        inspect = require("sys").inspect;\r
+\r
+    // A node red node that sets up a local websocket server\r
+    function WebSocketListenerNode(n) {\r
+        // Create a RED node\r
+        RED.nodes.createNode(this,n);\r
+\r
+        var node = this;\r
+\r
+        // Store local copies of the node configuration (as defined in the .html)\r
+        node.path = n.path;\r
+        node.wholemsg = (n.wholemsg === "true");\r
+\r
+        node._inputNodes = [];    // collection of nodes that want to receive events\r
+\r
+        var path = RED.settings.httpNodeRoot || "/";\r
+        path = path + (path.slice(-1) == "/" ? "":"/") + (node.path.charAt(0) == "/" ? node.path.substring(1) : node.path);\r
+\r
+        // Workaround https://github.com/einaros/ws/pull/253\r
+        // Listen for 'newListener' events from RED.server\r
+        node._serverListeners = {};\r
+\r
+        var storeListener = function(/*String*/event,/*function*/listener){\r
+            if(event == "error" || event == "upgrade" || event == "listening"){\r
+                node._serverListeners[event] = listener;\r
+            }\r
+        }\r
+\r
+        node._clients = {};\r
+\r
+        RED.server.addListener('newListener',storeListener);\r
+\r
+        // Create a WebSocket Server\r
+        node.server = new ws.Server({server:RED.server,path:path});\r
+\r
+        // Workaround https://github.com/einaros/ws/pull/253\r
+        // Stop listening for new listener events\r
+        RED.server.removeListener('newListener',storeListener);\r
+\r
+        node.server.on('connection', function(socket){\r
+            var id = (1+Math.random()*4294967295).toString(16);\r
+            node._clients[id] = socket;\r
+            socket.on('close',function() {\r
+                delete node._clients[id];\r
+            });\r
+            socket.on('message',function(data,flags){\r
+                node.handleEvent(id,socket,'message',data,flags);\r
+            });\r
+            socket.on('error', function(err) {\r
+                node.warn("An error occured on the ws connection: "+inspect(err));\r
+            });\r
+        });\r
+\r
+        node.on("close", function() {\r
+            // Workaround https://github.com/einaros/ws/pull/253\r
+            // Remove listeners from RED.server\r
+            var listener = null;\r
+            for(var event in node._serverListeners) {\r
+                if (node._serverListeners.hasOwnProperty(event)) {\r
+                    listener = node._serverListeners[event];\r
+                    if(typeof listener === "function"){\r
+                        RED.server.removeListener(event,listener);\r
+                    }\r
+                }\r
+            }\r
+            node._serverListeners = {};\r
+            node.server.close();\r
+            node._inputNodes = [];\r
+        });\r
+    }\r
+    RED.nodes.registerType("websocket-listener",WebSocketListenerNode);\r
+\r
+    WebSocketListenerNode.prototype.registerInputNode = function(/*Node*/handler){\r
+        this._inputNodes.push(handler);\r
+    }\r
+\r
+    WebSocketListenerNode.prototype.handleEvent = function(id,/*socket*/socket,/*String*/event,/*Object*/data,/*Object*/flags){\r
+        var msg;\r
+        if (this.wholemsg) {\r
+            try {\r
+                msg = JSON.parse(data);\r
+            }\r
+            catch(err) {\r
+                msg = { payload:data };\r
+            }\r
+        } else {\r
+            msg = {\r
+                payload:data\r
+            };\r
+        }\r
+        msg._session = {type:"websocket",id:id};\r
+\r
+        for (var i = 0; i < this._inputNodes.length; i++) {\r
+            this._inputNodes[i].send(msg);\r
+        }\r
+    }\r
+\r
+    WebSocketListenerNode.prototype.broadcast = function(data){\r
+        try {\r
+            for (var i = 0; i < this.server.clients.length; i++) {\r
+                this.server.clients[i].send(data);\r
+            }\r
+        }\r
+        catch(e) { // swallow any errors\r
+            this.warn("ws:"+i+" : "+e);\r
+        }\r
+    }\r
+\r
+    WebSocketListenerNode.prototype.send = function(id,data) {\r
+        var session = this._clients[id];\r
+        if (session) {\r
+            try {\r
+                session.send(data);\r
+            }\r
+            catch(e) { // swallow any errors\r
+            }\r
+        }\r
+    }\r
+\r
+    function WebSocketInNode(n) {\r
+        RED.nodes.createNode(this,n);\r
+        this.server = n.server;\r
+        var node = this;\r
+        this.serverConfig = RED.nodes.getNode(this.server);\r
+        if (this.serverConfig) {\r
+            this.serverConfig.registerInputNode(this);\r
+        } else {\r
+            this.error("Missing server configuration");\r
+        }\r
+    }\r
+    RED.nodes.registerType("websocket in",WebSocketInNode);\r
+\r
+    function WebSocketOutNode(n) {\r
+        RED.nodes.createNode(this,n);\r
+        var node = this;\r
+        this.server = n.server;\r
+        this.serverConfig = RED.nodes.getNode(this.server);\r
+        if (!this.serverConfig) {\r
+            this.error("Missing server configuration");\r
+        }\r
+        this.on("input", function(msg) {\r
+            var payload;\r
+            if (this.serverConfig.wholemsg) {\r
+                delete msg._session;\r
+                payload = JSON.stringify(msg);\r
+            } else {\r
+                if (!Buffer.isBuffer(msg.payload)) { // if it's not a buffer make sure it's a string.\r
+                    payload = RED.util.ensureString(msg.payload);\r
+                }\r
+                else {\r
+                    payload = msg.payload;\r
+                }\r
+            }\r
+            if (msg._session && msg._session.type == "websocket") {\r
+                node.serverConfig.send(msg._session.id,payload);\r
+            } else {\r
+                node.serverConfig.broadcast(payload,function(error){\r
+                    if (!!error) {\r
+                        node.warn("An error occurred while sending:" + inspect(error));\r
+                    }\r
+                });\r
+            }\r
+        });\r
+    }\r
+    RED.nodes.registerType("websocket out",WebSocketOutNode);\r
+}\r