X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=dgbuilder%2Fcore_nodes%2Fio%2Flib%2Fmqtt.js;fp=dgbuilder%2Fcore_nodes%2Fio%2Flib%2Fmqtt.js;h=141a88893b5acad5a591d84e8b50f000f582e764;hb=d1569975bb18f4359fac18aa98f55b69c248a3ad;hp=0000000000000000000000000000000000000000;hpb=a016ea661ff5767a3539734c4c07ef974a6e4614;p=ccsdk%2Fdistribution.git diff --git a/dgbuilder/core_nodes/io/lib/mqtt.js b/dgbuilder/core_nodes/io/lib/mqtt.js new file mode 100644 index 00000000..141a8889 --- /dev/null +++ b/dgbuilder/core_nodes/io/lib/mqtt.js @@ -0,0 +1,254 @@ +/** + * Copyright 2013 IBM Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +var util = require("util"); +var mqtt = require("mqtt"); +var events = require("events"); +//var inspect = require("sys").inspect; + +//var Client = module.exports.Client = function( + +var port = 1883; +var host = "localhost"; + +function MQTTClient(port,host) { + this.port = port||1883; + this.host = host||"localhost"; + this.messageId = 1; + this.pendingSubscriptions = {}; + this.inboundMessages = {}; + this.lastOutbound = (new Date()).getTime(); + this.lastInbound = (new Date()).getTime(); + this.connected = false; + + this._nextMessageId = function() { + this.messageId += 1; + if (this.messageId > 0xFFFF) { + this.messageId = 1; + } + return this.messageId; + } + events.EventEmitter.call(this); +} +util.inherits(MQTTClient, events.EventEmitter); + +MQTTClient.prototype.connect = function(options) { + if (!this.connected) { + var self = this; + options = options||{}; + self.options = options; + self.options.keepalive = options.keepalive||15; + self.options.clean = self.options.clean||true; + self.options.protocolId = 'MQIsdp'; + self.options.protocolVersion = 3; + + self.client = mqtt.createConnection(this.port,this.host,function(err,client) { + if (err) { + self.connected = false; + clearInterval(self.watchdog); + self.connectionError = true; + //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err)); + self.emit('connectionlost',err); + return; + } + client.on('close',function(e) { + //util.log('[mqtt] ['+self.uid+'] on close'); + clearInterval(self.watchdog); + if (!self.connectionError) { + if (self.connected) { + self.connected = false; + self.emit('connectionlost',e); + } else { + self.emit('disconnect'); + } + } + }); + client.on('error',function(e) { + //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e)); + clearInterval(self.watchdog); + if (self.connected) { + self.connected = false; + self.emit('connectionlost',e); + } + }); + client.on('connack',function(packet) { + if (packet.returnCode == 0) { + self.watchdog = setInterval(function(self) { + var now = (new Date()).getTime(); + + //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound})); + + if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) { + if (self.pingOutstanding) { + //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect'); + try { + self.client.disconnect(); + } catch (err) { + } + } else { + //util.log('[mqtt] ['+self.uid+'] watchdog pinging'); + self.lastOutbound = (new Date()).getTime(); + self.lastInbound = (new Date()).getTime(); + self.pingOutstanding = true; + self.client.pingreq(); + } + } + + },self.options.keepalive*500,self); + self.pingOutstanding = false; + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.connected = true; + self.connectionError = false; + self.emit('connect'); + } else { + self.connected = false; + self.emit('connectionlost'); + } + }); + client.on('suback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('subscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('unsuback',function(packet) { + self.lastInbound = (new Date()).getTime() + var topic = self.pendingSubscriptions[packet.messageId]; + self.emit('unsubscribe',topic,packet.granted[0]); + delete self.pendingSubscriptions[packet.messageId]; + }); + client.on('publish',function(packet) { + self.lastInbound = (new Date()).getTime() + if (packet.qos < 2) { + var p = packet; + self.emit('message',p.topic,p.payload,p.qos,p.retain); + } else { + self.inboundMessages[packet.messageId] = packet; + this.lastOutbound = (new Date()).getTime() + self.client.pubrec(packet); + } + if (packet.qos == 1) { + this.lastOutbound = (new Date()).getTime() + self.client.puback(packet); + } + }); + + client.on('pubrel',function(packet) { + self.lastInbound = (new Date()).getTime() + var p = self.inboundMessages[packet.messageId]; + if (p) { + self.emit('message',p.topic,p.payload,p.qos,p.retain); + delete self.inboundMessages[packet.messageId]; + } + self.lastOutbound = (new Date()).getTime() + self.client.pubcomp(packet); + }); + + client.on('puback',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-1 complete + }); + + client.on('pubrec',function(packet) { + self.lastInbound = (new Date()).getTime() + self.lastOutbound = (new Date()).getTime() + self.client.pubrel(packet); + }); + client.on('pubcomp',function(packet) { + self.lastInbound = (new Date()).getTime() + // outbound qos-2 complete + }); + client.on('pingresp',function(packet) { + //util.log('[mqtt] ['+self.uid+'] received pingresp'); + self.lastInbound = (new Date()).getTime() + self.pingOutstanding = false; + }); + + this.lastOutbound = (new Date()).getTime() + this.connectionError = false; + client.connect(self.options); + }); + } +} + +MQTTClient.prototype.subscribe = function(topic,qos) { + var self = this; + if (self.connected) { + var options = { + subscriptions:[{topic:topic,qos:qos}], + messageId: self._nextMessageId() + }; + this.pendingSubscriptions[options.messageId] = topic; + this.lastOutbound = (new Date()).getTime() + self.client.subscribe(options); + } +} +MQTTClient.prototype.unsubscribe = function(topic) { + var self = this; + if (self.connected) { + var options = { + topic:topic, + messageId: self._nextMessageId() + }; + this.pendingSubscriptions[options.messageId] = topic; + this.lastOutbound = (new Date()).getTime() + self.client.unsubscribe(options); + } +} + +MQTTClient.prototype.publish = function(topic,payload,qos,retain) { + var self = this; + if (self.connected) { + + if (!Buffer.isBuffer(payload)) { + if (typeof payload === "object") { + payload = JSON.stringify(payload); + } else if (typeof payload !== "string") { + payload = ""+payload; + } + } + var options = { + topic: topic, + payload: payload, + qos: qos||0, + retain:retain||false + }; + if (options.qos != 0) { + options.messageId = self._nextMessageId(); + } + this.lastOutbound = (new Date()).getTime() + self.client.publish(options); + } +} + +MQTTClient.prototype.disconnect = function() { + var self = this; + if (this.connected) { + this.connected = false; + try { + this.client.disconnect(); + } catch(err) { + } + } +} +MQTTClient.prototype.isConnected = function() { + return this.connected; +} +module.exports.createClient = function(port,host) { + var mqtt_client = new MQTTClient(port,host); + return mqtt_client; +} +