Merge "Add Graph/Node to org.ops4j.pax.logging.cfg"
[ccsdk/distribution.git] / dgbuilder / red / comms.js
1 /**
2  * Copyright 2014 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16
17 var ws = require("ws");
18 var util = require("util");
19
20 var server;
21 var settings;
22
23 var wsServer;
24 var activeConnections = [];
25
26 var retained = {};
27
28 var heartbeatTimer;
29 var lastSentTime;
30
31
32 function init(_server,_settings) {
33     server = _server;
34     settings = _settings;
35 }
36
37 function start() {
38
39     if (!settings.disableEditor) {
40         var webSocketKeepAliveTime = settings.webSocketKeepAliveTime || 15000;
41         var path = settings.httpAdminRoot || "/";
42         path = path + (path.slice(-1) == "/" ? "":"/") + "comms";
43         wsServer = new ws.Server({server:server,path:path});
44         
45         wsServer.on('connection',function(ws) {
46             activeConnections.push(ws);
47             ws.on('close',function() {
48                 for (var i=0;i<activeConnections.length;i++) {
49                     if (activeConnections[i] === ws) {
50                         activeConnections.splice(i,1);
51                         break;
52                     }
53                 }
54             });
55             ws.on('message', function(data,flags) {
56                 var msg = null;
57                 try {
58                     msg = JSON.parse(data);
59                 } catch(err) {
60                     util.log("[red:comms] received malformed message : "+err.toString());
61                     return;
62                 }
63                 if (msg.subscribe) {
64                     handleRemoteSubscription(ws,msg.subscribe);
65                 }
66             });
67             ws.on('error', function(err) {
68                 util.log("[red:comms] error : "+err.toString());
69             });
70         });
71         
72         wsServer.on('error', function(err) {
73             util.log("[red:comms] server error : "+err.toString());
74         });
75          
76         lastSentTime = Date.now();
77         
78         heartbeatTimer = setInterval(function() {
79             var now = Date.now();
80             if (now-lastSentTime > webSocketKeepAliveTime) {
81                 publish("hb",lastSentTime);
82             }
83         }, webSocketKeepAliveTime);
84     }
85 }
86
87 function stop() {
88     if (heartbeatTimer) {
89         clearInterval(heartbeatTimer);
90     }
91     if (wsServer) {
92         wsServer.close();
93     }
94 }
95
96 function publish(topic,data,retain) {
97     if (retain) {
98         retained[topic] = data;
99     } else {
100         delete retained[topic];
101     }
102     lastSentTime = Date.now();
103     activeConnections.forEach(function(conn) {
104         publishTo(conn,topic,data);
105     });
106 }
107
108 function publishTo(ws,topic,data) {
109     var msg = JSON.stringify({topic:topic,data:data});
110     try {
111         ws.send(msg);
112     } catch(err) {
113         util.log("[red:comms] send error : "+err.toString());
114     }
115 }
116
117 function handleRemoteSubscription(ws,topic) {
118     var re = new RegExp("^"+topic.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
119     for (var t in retained) {
120         if (re.test(t)) {
121             publishTo(ws,t,retained[t]);
122         }
123     }
124 }
125
126
127 module.exports = {
128     init:init,
129     start:start,
130     stop:stop,
131     publish:publish,
132 }