[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / lib / mqtt.js
diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js
new file mode 100644 (file)
index 0000000..141a888
--- /dev/null
@@ -0,0 +1,254 @@
+/**
+ * Copyright 2013 IBM Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+var util = require("util");
+var mqtt = require("mqtt");
+var events = require("events");
+//var inspect = require("sys").inspect;
+
+//var Client = module.exports.Client = function(
+   
+var port = 1883;
+var host = "localhost";
+
+function MQTTClient(port,host) {
+   this.port = port||1883;
+   this.host = host||"localhost";
+   this.messageId = 1;
+   this.pendingSubscriptions = {};
+   this.inboundMessages = {};
+   this.lastOutbound = (new Date()).getTime();
+   this.lastInbound = (new Date()).getTime();
+   this.connected = false;
+   
+   this._nextMessageId = function() {
+      this.messageId += 1;
+      if (this.messageId > 0xFFFF) {
+         this.messageId = 1;
+      }
+      return this.messageId;
+   }
+   events.EventEmitter.call(this);
+}
+util.inherits(MQTTClient, events.EventEmitter);
+
+MQTTClient.prototype.connect = function(options) {
+   if (!this.connected) {
+       var self = this;
+       options = options||{};
+       self.options = options;
+       self.options.keepalive = options.keepalive||15;
+       self.options.clean = self.options.clean||true;
+       self.options.protocolId = 'MQIsdp';
+       self.options.protocolVersion = 3;
+       
+       self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
+             if (err) {
+                self.connected = false;
+                clearInterval(self.watchdog);
+                self.connectionError = true;
+                //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
+                self.emit('connectionlost',err);
+                return;
+             }
+             client.on('close',function(e) {
+                   //util.log('[mqtt] ['+self.uid+'] on close');
+                   clearInterval(self.watchdog);
+                   if (!self.connectionError) {
+                       if (self.connected) {
+                          self.connected = false;
+                          self.emit('connectionlost',e);
+                       } else {
+                          self.emit('disconnect');
+                       }
+                   }
+             });
+             client.on('error',function(e) {
+                   //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
+                   clearInterval(self.watchdog);
+                   if (self.connected) {
+                      self.connected = false;
+                      self.emit('connectionlost',e);
+                   }
+             });
+             client.on('connack',function(packet) {
+                   if (packet.returnCode == 0) {
+                      self.watchdog = setInterval(function(self) {
+                            var now = (new Date()).getTime();
+                            
+                            //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
+    
+                            if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
+                               if (self.pingOutstanding) {
+                                  //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
+                                  try {
+                                     self.client.disconnect();
+                                  } catch (err) {
+                                  }
+                               } else {
+                                  //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
+                                  self.lastOutbound = (new Date()).getTime();
+                                  self.lastInbound = (new Date()).getTime();
+                                  self.pingOutstanding = true;
+                                  self.client.pingreq();
+                               }
+                            }
+                            
+                      },self.options.keepalive*500,self);
+                      self.pingOutstanding = false;
+                      self.lastInbound = (new Date()).getTime()
+                      self.lastOutbound = (new Date()).getTime()
+                      self.connected = true;
+                      self.connectionError = false;
+                      self.emit('connect');
+                   } else {
+                      self.connected = false;
+                      self.emit('connectionlost');
+                   }
+             });
+             client.on('suback',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   var topic = self.pendingSubscriptions[packet.messageId];
+                   self.emit('subscribe',topic,packet.granted[0]);
+                   delete self.pendingSubscriptions[packet.messageId];
+             });
+             client.on('unsuback',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   var topic = self.pendingSubscriptions[packet.messageId];
+                   self.emit('unsubscribe',topic,packet.granted[0]);
+                   delete self.pendingSubscriptions[packet.messageId];
+             });
+             client.on('publish',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   if (packet.qos < 2) {
+                      var p = packet;
+                      self.emit('message',p.topic,p.payload,p.qos,p.retain);
+                   } else {
+                      self.inboundMessages[packet.messageId] = packet;
+                      this.lastOutbound = (new Date()).getTime()
+                      self.client.pubrec(packet);
+                   }
+                   if (packet.qos == 1) {
+                      this.lastOutbound = (new Date()).getTime()
+                      self.client.puback(packet);
+                   }
+             });
+             
+             client.on('pubrel',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   var p = self.inboundMessages[packet.messageId];
+                   if (p) {
+                       self.emit('message',p.topic,p.payload,p.qos,p.retain);
+                       delete self.inboundMessages[packet.messageId];
+                   }
+                   self.lastOutbound = (new Date()).getTime()
+                   self.client.pubcomp(packet);
+             });
+             
+             client.on('puback',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   // outbound qos-1 complete
+             });
+             
+             client.on('pubrec',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   self.lastOutbound = (new Date()).getTime()
+                   self.client.pubrel(packet);
+             });
+             client.on('pubcomp',function(packet) {
+                   self.lastInbound = (new Date()).getTime()
+                   // outbound qos-2 complete
+             });
+             client.on('pingresp',function(packet) {
+                   //util.log('[mqtt] ['+self.uid+'] received pingresp');
+                   self.lastInbound = (new Date()).getTime()
+                   self.pingOutstanding = false;
+             });
+             
+             this.lastOutbound = (new Date()).getTime()
+             this.connectionError = false;
+             client.connect(self.options);
+       });
+   }
+}
+
+MQTTClient.prototype.subscribe = function(topic,qos) {
+   var self = this;
+   if (self.connected) {
+      var options = {
+         subscriptions:[{topic:topic,qos:qos}],
+         messageId: self._nextMessageId()
+      };
+      this.pendingSubscriptions[options.messageId] = topic;
+      this.lastOutbound = (new Date()).getTime()
+      self.client.subscribe(options);
+   }
+}
+MQTTClient.prototype.unsubscribe = function(topic) {
+   var self = this;
+   if (self.connected) {
+      var options = {
+         topic:topic,
+         messageId: self._nextMessageId()
+      };
+      this.pendingSubscriptions[options.messageId] = topic;
+      this.lastOutbound = (new Date()).getTime()
+      self.client.unsubscribe(options);
+   }
+}
+
+MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
+   var self = this;
+   if (self.connected) {
+   
+      if (!Buffer.isBuffer(payload)) {
+         if (typeof payload === "object") {
+            payload = JSON.stringify(payload);
+         } else if (typeof payload !== "string") {
+            payload = ""+payload;
+         }
+      }
+      var options = {
+         topic: topic,
+         payload: payload,
+         qos: qos||0,
+         retain:retain||false
+      };
+      if (options.qos != 0) {
+         options.messageId = self._nextMessageId();
+      }
+      this.lastOutbound = (new Date()).getTime()
+      self.client.publish(options);
+   }
+}
+
+MQTTClient.prototype.disconnect = function() {
+   var self = this;
+   if (this.connected) {
+       this.connected = false;
+       try {
+           this.client.disconnect();
+       } catch(err) {
+       }
+   }
+}
+MQTTClient.prototype.isConnected = function() {
+    return this.connected;
+}
+module.exports.createClient = function(port,host) {
+   var mqtt_client = new MQTTClient(port,host);
+   return mqtt_client;
+}
+