Fix license issues
[sdnc/oam.git] / dgbuilder / core_nodes / io / lib / mqtt.js
diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js
deleted file mode 100644 (file)
index 141a888..0000000
+++ /dev/null
@@ -1,254 +0,0 @@
-/**
- * Copyright 2013 IBM Corp.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-var util = require("util");
-var mqtt = require("mqtt");
-var events = require("events");
-//var inspect = require("sys").inspect;
-
-//var Client = module.exports.Client = function(
-   
-var port = 1883;
-var host = "localhost";
-
-function MQTTClient(port,host) {
-   this.port = port||1883;
-   this.host = host||"localhost";
-   this.messageId = 1;
-   this.pendingSubscriptions = {};
-   this.inboundMessages = {};
-   this.lastOutbound = (new Date()).getTime();
-   this.lastInbound = (new Date()).getTime();
-   this.connected = false;
-   
-   this._nextMessageId = function() {
-      this.messageId += 1;
-      if (this.messageId > 0xFFFF) {
-         this.messageId = 1;
-      }
-      return this.messageId;
-   }
-   events.EventEmitter.call(this);
-}
-util.inherits(MQTTClient, events.EventEmitter);
-
-MQTTClient.prototype.connect = function(options) {
-   if (!this.connected) {
-       var self = this;
-       options = options||{};
-       self.options = options;
-       self.options.keepalive = options.keepalive||15;
-       self.options.clean = self.options.clean||true;
-       self.options.protocolId = 'MQIsdp';
-       self.options.protocolVersion = 3;
-       
-       self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
-             if (err) {
-                self.connected = false;
-                clearInterval(self.watchdog);
-                self.connectionError = true;
-                //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
-                self.emit('connectionlost',err);
-                return;
-             }
-             client.on('close',function(e) {
-                   //util.log('[mqtt] ['+self.uid+'] on close');
-                   clearInterval(self.watchdog);
-                   if (!self.connectionError) {
-                       if (self.connected) {
-                          self.connected = false;
-                          self.emit('connectionlost',e);
-                       } else {
-                          self.emit('disconnect');
-                       }
-                   }
-             });
-             client.on('error',function(e) {
-                   //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
-                   clearInterval(self.watchdog);
-                   if (self.connected) {
-                      self.connected = false;
-                      self.emit('connectionlost',e);
-                   }
-             });
-             client.on('connack',function(packet) {
-                   if (packet.returnCode == 0) {
-                      self.watchdog = setInterval(function(self) {
-                            var now = (new Date()).getTime();
-                            
-                            //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
-    
-                            if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
-                               if (self.pingOutstanding) {
-                                  //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
-                                  try {
-                                     self.client.disconnect();
-                                  } catch (err) {
-                                  }
-                               } else {
-                                  //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
-                                  self.lastOutbound = (new Date()).getTime();
-                                  self.lastInbound = (new Date()).getTime();
-                                  self.pingOutstanding = true;
-                                  self.client.pingreq();
-                               }
-                            }
-                            
-                      },self.options.keepalive*500,self);
-                      self.pingOutstanding = false;
-                      self.lastInbound = (new Date()).getTime()
-                      self.lastOutbound = (new Date()).getTime()
-                      self.connected = true;
-                      self.connectionError = false;
-                      self.emit('connect');
-                   } else {
-                      self.connected = false;
-                      self.emit('connectionlost');
-                   }
-             });
-             client.on('suback',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   var topic = self.pendingSubscriptions[packet.messageId];
-                   self.emit('subscribe',topic,packet.granted[0]);
-                   delete self.pendingSubscriptions[packet.messageId];
-             });
-             client.on('unsuback',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   var topic = self.pendingSubscriptions[packet.messageId];
-                   self.emit('unsubscribe',topic,packet.granted[0]);
-                   delete self.pendingSubscriptions[packet.messageId];
-             });
-             client.on('publish',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   if (packet.qos < 2) {
-                      var p = packet;
-                      self.emit('message',p.topic,p.payload,p.qos,p.retain);
-                   } else {
-                      self.inboundMessages[packet.messageId] = packet;
-                      this.lastOutbound = (new Date()).getTime()
-                      self.client.pubrec(packet);
-                   }
-                   if (packet.qos == 1) {
-                      this.lastOutbound = (new Date()).getTime()
-                      self.client.puback(packet);
-                   }
-             });
-             
-             client.on('pubrel',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   var p = self.inboundMessages[packet.messageId];
-                   if (p) {
-                       self.emit('message',p.topic,p.payload,p.qos,p.retain);
-                       delete self.inboundMessages[packet.messageId];
-                   }
-                   self.lastOutbound = (new Date()).getTime()
-                   self.client.pubcomp(packet);
-             });
-             
-             client.on('puback',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   // outbound qos-1 complete
-             });
-             
-             client.on('pubrec',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   self.lastOutbound = (new Date()).getTime()
-                   self.client.pubrel(packet);
-             });
-             client.on('pubcomp',function(packet) {
-                   self.lastInbound = (new Date()).getTime()
-                   // outbound qos-2 complete
-             });
-             client.on('pingresp',function(packet) {
-                   //util.log('[mqtt] ['+self.uid+'] received pingresp');
-                   self.lastInbound = (new Date()).getTime()
-                   self.pingOutstanding = false;
-             });
-             
-             this.lastOutbound = (new Date()).getTime()
-             this.connectionError = false;
-             client.connect(self.options);
-       });
-   }
-}
-
-MQTTClient.prototype.subscribe = function(topic,qos) {
-   var self = this;
-   if (self.connected) {
-      var options = {
-         subscriptions:[{topic:topic,qos:qos}],
-         messageId: self._nextMessageId()
-      };
-      this.pendingSubscriptions[options.messageId] = topic;
-      this.lastOutbound = (new Date()).getTime()
-      self.client.subscribe(options);
-   }
-}
-MQTTClient.prototype.unsubscribe = function(topic) {
-   var self = this;
-   if (self.connected) {
-      var options = {
-         topic:topic,
-         messageId: self._nextMessageId()
-      };
-      this.pendingSubscriptions[options.messageId] = topic;
-      this.lastOutbound = (new Date()).getTime()
-      self.client.unsubscribe(options);
-   }
-}
-
-MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
-   var self = this;
-   if (self.connected) {
-   
-      if (!Buffer.isBuffer(payload)) {
-         if (typeof payload === "object") {
-            payload = JSON.stringify(payload);
-         } else if (typeof payload !== "string") {
-            payload = ""+payload;
-         }
-      }
-      var options = {
-         topic: topic,
-         payload: payload,
-         qos: qos||0,
-         retain:retain||false
-      };
-      if (options.qos != 0) {
-         options.messageId = self._nextMessageId();
-      }
-      this.lastOutbound = (new Date()).getTime()
-      self.client.publish(options);
-   }
-}
-
-MQTTClient.prototype.disconnect = function() {
-   var self = this;
-   if (this.connected) {
-       this.connected = false;
-       try {
-           this.client.disconnect();
-       } catch(err) {
-       }
-   }
-}
-MQTTClient.prototype.isConnected = function() {
-    return this.connected;
-}
-module.exports.createClient = function(port,host) {
-   var mqtt_client = new MQTTClient(port,host);
-   return mqtt_client;
-}
-