[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / lib / mqtt.js
1 /**
2  * Copyright 2013 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16 var util = require("util");
17 var mqtt = require("mqtt");
18 var events = require("events");
19 //var inspect = require("sys").inspect;
20
21 //var Client = module.exports.Client = function(
22    
23 var port = 1883;
24 var host = "localhost";
25
26 function MQTTClient(port,host) {
27    this.port = port||1883;
28    this.host = host||"localhost";
29    this.messageId = 1;
30    this.pendingSubscriptions = {};
31    this.inboundMessages = {};
32    this.lastOutbound = (new Date()).getTime();
33    this.lastInbound = (new Date()).getTime();
34    this.connected = false;
35    
36    this._nextMessageId = function() {
37       this.messageId += 1;
38       if (this.messageId > 0xFFFF) {
39          this.messageId = 1;
40       }
41       return this.messageId;
42    }
43    events.EventEmitter.call(this);
44 }
45 util.inherits(MQTTClient, events.EventEmitter);
46
47 MQTTClient.prototype.connect = function(options) {
48    if (!this.connected) {
49        var self = this;
50        options = options||{};
51        self.options = options;
52        self.options.keepalive = options.keepalive||15;
53        self.options.clean = self.options.clean||true;
54        self.options.protocolId = 'MQIsdp';
55        self.options.protocolVersion = 3;
56        
57        self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
58              if (err) {
59                 self.connected = false;
60                 clearInterval(self.watchdog);
61                 self.connectionError = true;
62                 //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
63                 self.emit('connectionlost',err);
64                 return;
65              }
66              client.on('close',function(e) {
67                    //util.log('[mqtt] ['+self.uid+'] on close');
68                    clearInterval(self.watchdog);
69                    if (!self.connectionError) {
70                        if (self.connected) {
71                           self.connected = false;
72                           self.emit('connectionlost',e);
73                        } else {
74                           self.emit('disconnect');
75                        }
76                    }
77              });
78              client.on('error',function(e) {
79                    //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
80                    clearInterval(self.watchdog);
81                    if (self.connected) {
82                       self.connected = false;
83                       self.emit('connectionlost',e);
84                    }
85              });
86              client.on('connack',function(packet) {
87                    if (packet.returnCode == 0) {
88                       self.watchdog = setInterval(function(self) {
89                             var now = (new Date()).getTime();
90                             
91                             //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
92     
93                             if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
94                                if (self.pingOutstanding) {
95                                   //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
96                                   try {
97                                      self.client.disconnect();
98                                   } catch (err) {
99                                   }
100                                } else {
101                                   //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
102                                   self.lastOutbound = (new Date()).getTime();
103                                   self.lastInbound = (new Date()).getTime();
104                                   self.pingOutstanding = true;
105                                   self.client.pingreq();
106                                }
107                             }
108                             
109                       },self.options.keepalive*500,self);
110                       self.pingOutstanding = false;
111                       self.lastInbound = (new Date()).getTime()
112                       self.lastOutbound = (new Date()).getTime()
113                       self.connected = true;
114                       self.connectionError = false;
115                       self.emit('connect');
116                    } else {
117                       self.connected = false;
118                       self.emit('connectionlost');
119                    }
120              });
121              client.on('suback',function(packet) {
122                    self.lastInbound = (new Date()).getTime()
123                    var topic = self.pendingSubscriptions[packet.messageId];
124                    self.emit('subscribe',topic,packet.granted[0]);
125                    delete self.pendingSubscriptions[packet.messageId];
126              });
127              client.on('unsuback',function(packet) {
128                    self.lastInbound = (new Date()).getTime()
129                    var topic = self.pendingSubscriptions[packet.messageId];
130                    self.emit('unsubscribe',topic,packet.granted[0]);
131                    delete self.pendingSubscriptions[packet.messageId];
132              });
133              client.on('publish',function(packet) {
134                    self.lastInbound = (new Date()).getTime()
135                    if (packet.qos < 2) {
136                       var p = packet;
137                       self.emit('message',p.topic,p.payload,p.qos,p.retain);
138                    } else {
139                       self.inboundMessages[packet.messageId] = packet;
140                       this.lastOutbound = (new Date()).getTime()
141                       self.client.pubrec(packet);
142                    }
143                    if (packet.qos == 1) {
144                       this.lastOutbound = (new Date()).getTime()
145                       self.client.puback(packet);
146                    }
147              });
148              
149              client.on('pubrel',function(packet) {
150                    self.lastInbound = (new Date()).getTime()
151                    var p = self.inboundMessages[packet.messageId];
152                    if (p) {
153                        self.emit('message',p.topic,p.payload,p.qos,p.retain);
154                        delete self.inboundMessages[packet.messageId];
155                    }
156                    self.lastOutbound = (new Date()).getTime()
157                    self.client.pubcomp(packet);
158              });
159              
160              client.on('puback',function(packet) {
161                    self.lastInbound = (new Date()).getTime()
162                    // outbound qos-1 complete
163              });
164              
165              client.on('pubrec',function(packet) {
166                    self.lastInbound = (new Date()).getTime()
167                    self.lastOutbound = (new Date()).getTime()
168                    self.client.pubrel(packet);
169              });
170              client.on('pubcomp',function(packet) {
171                    self.lastInbound = (new Date()).getTime()
172                    // outbound qos-2 complete
173              });
174              client.on('pingresp',function(packet) {
175                    //util.log('[mqtt] ['+self.uid+'] received pingresp');
176                    self.lastInbound = (new Date()).getTime()
177                    self.pingOutstanding = false;
178              });
179              
180              this.lastOutbound = (new Date()).getTime()
181              this.connectionError = false;
182              client.connect(self.options);
183        });
184    }
185 }
186
187 MQTTClient.prototype.subscribe = function(topic,qos) {
188    var self = this;
189    if (self.connected) {
190       var options = {
191          subscriptions:[{topic:topic,qos:qos}],
192          messageId: self._nextMessageId()
193       };
194       this.pendingSubscriptions[options.messageId] = topic;
195       this.lastOutbound = (new Date()).getTime()
196       self.client.subscribe(options);
197    }
198 }
199 MQTTClient.prototype.unsubscribe = function(topic) {
200    var self = this;
201    if (self.connected) {
202       var options = {
203          topic:topic,
204          messageId: self._nextMessageId()
205       };
206       this.pendingSubscriptions[options.messageId] = topic;
207       this.lastOutbound = (new Date()).getTime()
208       self.client.unsubscribe(options);
209    }
210 }
211
212 MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
213    var self = this;
214    if (self.connected) {
215    
216       if (!Buffer.isBuffer(payload)) {
217          if (typeof payload === "object") {
218             payload = JSON.stringify(payload);
219          } else if (typeof payload !== "string") {
220             payload = ""+payload;
221          }
222       }
223       var options = {
224          topic: topic,
225          payload: payload,
226          qos: qos||0,
227          retain:retain||false
228       };
229       if (options.qos != 0) {
230          options.messageId = self._nextMessageId();
231       }
232       this.lastOutbound = (new Date()).getTime()
233       self.client.publish(options);
234    }
235 }
236
237 MQTTClient.prototype.disconnect = function() {
238    var self = this;
239    if (this.connected) {
240        this.connected = false;
241        try {
242            this.client.disconnect();
243        } catch(err) {
244        }
245    }
246 }
247 MQTTClient.prototype.isConnected = function() {
248     return this.connected;
249 }
250 module.exports.createClient = function(port,host) {
251    var mqtt_client = new MQTTClient(port,host);
252    return mqtt_client;
253 }
254