[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / lib / mqttConnectionPool.js
1 /**
2  * Copyright 2013 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16 var util = require("util");
17 var mqtt = require("./mqtt");
18 var settings = require(process.env.NODE_RED_HOME+"/red/red").settings;
19
20 var connections = {};
21
22 function matchTopic(ts,t) {
23     if (ts == "#") {
24         return true;
25     }
26     var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
27     return re.test(t);
28 }
29
30 module.exports = {
31     get: function(broker,port,clientid,username,password,will) {
32         var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port;
33         if (!connections[id]) {
34             connections[id] = function() {
35                 var uid = (1+Math.random()*4294967295).toString(16);
36                 var client = mqtt.createClient(port,broker);
37                 client.uid = uid;
38                 client.setMaxListeners(0);
39                 var options = {keepalive:15};
40                 options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
41                 options.username = username;
42                 options.password = password;
43                 options.will = will;
44                 var queue = [];
45                 var subscriptions = [];
46                 var connecting = false;
47                 var obj = {
48                     _instances: 0,
49                     publish: function(msg) {
50                         if (client.isConnected()) {
51                             client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
52                         } else {
53                             if (!connecting) {
54                                 connecting = true;
55                                 client.connect(options);
56                             }
57                             queue.push(msg);
58                         }
59                     },
60                     subscribe: function(topic,qos,callback) {
61                         subscriptions.push({topic:topic,qos:qos,callback:callback});
62                         client.on('message',function(mtopic,mpayload,mqos,mretain) {
63                                 if (matchTopic(topic,mtopic)) {
64                                     callback(mtopic,mpayload,mqos,mretain);
65                                 }
66                         });
67                         if (client.isConnected()) {
68                             client.subscribe(topic,qos);
69                         }
70                     },
71                     on: function(a,b){
72                         client.on(a,b);
73                     },
74                     once: function(a,b){
75                         client.once(a,b);
76                     },
77                     connect: function() {
78                         if (client && !client.isConnected() && !connecting) {
79                             connecting = true;
80                             client.connect(options);
81                         }
82                     },
83                     disconnect: function() {
84                         this._instances -= 1;
85                         if (this._instances == 0) {
86                             client.disconnect();
87                             client = null;
88                             delete connections[id];
89                         }
90                     }
91                 };
92                 client.on('connect',function() {
93                         if (client) {
94                             util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
95                             connecting = false;
96                             for (var s in subscriptions) {
97                                 var topic = subscriptions[s].topic;
98                                 var qos = subscriptions[s].qos;
99                                 var callback = subscriptions[s].callback;
100                                 client.subscribe(topic,qos);
101                             }
102                             //console.log("connected - publishing",queue.length,"messages");
103                             while(queue.length) {
104                                 var msg = queue.shift();
105                                 //console.log(msg);
106                                 client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
107                             }
108                         }
109                 });
110                 client.on('connectionlost', function(err) {
111                         util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
112                         connecting = false;
113                         setTimeout(function() {
114                             obj.connect();
115                         }, settings.mqttReconnectTime||5000);
116                 });
117                 client.on('disconnect', function() {
118                         connecting = false;
119                         util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
120                 });
121
122                 return obj
123             }();
124         }
125         connections[id]._instances += 1;
126         return connections[id];
127     }
128 };