2 * Copyright 2014 IBM Corp.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 var ws = require("ws");
18 var util = require("util");
24 var activeConnections = [];
32 function init(_server,_settings) {
39 if (!settings.disableEditor) {
40 var webSocketKeepAliveTime = settings.webSocketKeepAliveTime || 15000;
41 var path = settings.httpAdminRoot || "/";
42 path = path + (path.slice(-1) == "/" ? "":"/") + "comms";
43 wsServer = new ws.Server({server:server,path:path});
45 wsServer.on('connection',function(ws) {
46 activeConnections.push(ws);
47 ws.on('close',function() {
48 for (var i=0;i<activeConnections.length;i++) {
49 if (activeConnections[i] === ws) {
50 activeConnections.splice(i,1);
55 ws.on('message', function(data,flags) {
58 msg = JSON.parse(data);
60 util.log("[red:comms] received malformed message : "+err.toString());
64 handleRemoteSubscription(ws,msg.subscribe);
67 ws.on('error', function(err) {
68 util.log("[red:comms] error : "+err.toString());
72 wsServer.on('error', function(err) {
73 util.log("[red:comms] server error : "+err.toString());
76 lastSentTime = Date.now();
78 heartbeatTimer = setInterval(function() {
80 if (now-lastSentTime > webSocketKeepAliveTime) {
81 publish("hb",lastSentTime);
83 }, webSocketKeepAliveTime);
89 clearInterval(heartbeatTimer);
96 function publish(topic,data,retain) {
98 retained[topic] = data;
100 delete retained[topic];
102 lastSentTime = Date.now();
103 activeConnections.forEach(function(conn) {
104 publishTo(conn,topic,data);
108 function publishTo(ws,topic,data) {
109 var msg = JSON.stringify({topic:topic,data:data});
113 util.log("[red:comms] send error : "+err.toString());
117 function handleRemoteSubscription(ws,topic) {
118 var re = new RegExp("^"+topic.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
119 for (var t in retained) {
121 publishTo(ws,t,retained[t]);