2 * Copyright 2013 IBM Corp.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 var util = require("util");
17 var mqtt = require("mqtt");
18 var events = require("events");
19 //var inspect = require("sys").inspect;
21 //var Client = module.exports.Client = function(
24 var host = "localhost";
26 function MQTTClient(port,host) {
27 this.port = port||1883;
28 this.host = host||"localhost";
30 this.pendingSubscriptions = {};
31 this.inboundMessages = {};
32 this.lastOutbound = (new Date()).getTime();
33 this.lastInbound = (new Date()).getTime();
34 this.connected = false;
36 this._nextMessageId = function() {
38 if (this.messageId > 0xFFFF) {
41 return this.messageId;
43 events.EventEmitter.call(this);
45 util.inherits(MQTTClient, events.EventEmitter);
47 MQTTClient.prototype.connect = function(options) {
48 if (!this.connected) {
50 options = options||{};
51 self.options = options;
52 self.options.keepalive = options.keepalive||15;
53 self.options.clean = self.options.clean||true;
54 self.options.protocolId = 'MQIsdp';
55 self.options.protocolVersion = 3;
57 self.client = mqtt.createConnection(this.port,this.host,function(err,client) {
59 self.connected = false;
60 clearInterval(self.watchdog);
61 self.connectionError = true;
62 //util.log('[mqtt] ['+self.uid+'] connection error 1 : '+inspect(err));
63 self.emit('connectionlost',err);
66 client.on('close',function(e) {
67 //util.log('[mqtt] ['+self.uid+'] on close');
68 clearInterval(self.watchdog);
69 if (!self.connectionError) {
71 self.connected = false;
72 self.emit('connectionlost',e);
74 self.emit('disconnect');
78 client.on('error',function(e) {
79 //util.log('[mqtt] ['+self.uid+'] on error : '+inspect(e));
80 clearInterval(self.watchdog);
82 self.connected = false;
83 self.emit('connectionlost',e);
86 client.on('connack',function(packet) {
87 if (packet.returnCode == 0) {
88 self.watchdog = setInterval(function(self) {
89 var now = (new Date()).getTime();
91 //util.log('[mqtt] ['+self.uid+'] watchdog '+inspect({connected:self.connected,connectionError:self.connectionError,pingOutstanding:self.pingOutstanding,now:now,lastOutbound:self.lastOutbound,lastInbound:self.lastInbound}));
93 if (now - self.lastOutbound > self.options.keepalive*500 || now - self.lastInbound > self.options.keepalive*500) {
94 if (self.pingOutstanding) {
95 //util.log('[mqtt] ['+self.uid+'] watchdog pingOustanding - disconnect');
97 self.client.disconnect();
101 //util.log('[mqtt] ['+self.uid+'] watchdog pinging');
102 self.lastOutbound = (new Date()).getTime();
103 self.lastInbound = (new Date()).getTime();
104 self.pingOutstanding = true;
105 self.client.pingreq();
109 },self.options.keepalive*500,self);
110 self.pingOutstanding = false;
111 self.lastInbound = (new Date()).getTime()
112 self.lastOutbound = (new Date()).getTime()
113 self.connected = true;
114 self.connectionError = false;
115 self.emit('connect');
117 self.connected = false;
118 self.emit('connectionlost');
121 client.on('suback',function(packet) {
122 self.lastInbound = (new Date()).getTime()
123 var topic = self.pendingSubscriptions[packet.messageId];
124 self.emit('subscribe',topic,packet.granted[0]);
125 delete self.pendingSubscriptions[packet.messageId];
127 client.on('unsuback',function(packet) {
128 self.lastInbound = (new Date()).getTime()
129 var topic = self.pendingSubscriptions[packet.messageId];
130 self.emit('unsubscribe',topic,packet.granted[0]);
131 delete self.pendingSubscriptions[packet.messageId];
133 client.on('publish',function(packet) {
134 self.lastInbound = (new Date()).getTime()
135 if (packet.qos < 2) {
137 self.emit('message',p.topic,p.payload,p.qos,p.retain);
139 self.inboundMessages[packet.messageId] = packet;
140 this.lastOutbound = (new Date()).getTime()
141 self.client.pubrec(packet);
143 if (packet.qos == 1) {
144 this.lastOutbound = (new Date()).getTime()
145 self.client.puback(packet);
149 client.on('pubrel',function(packet) {
150 self.lastInbound = (new Date()).getTime()
151 var p = self.inboundMessages[packet.messageId];
153 self.emit('message',p.topic,p.payload,p.qos,p.retain);
154 delete self.inboundMessages[packet.messageId];
156 self.lastOutbound = (new Date()).getTime()
157 self.client.pubcomp(packet);
160 client.on('puback',function(packet) {
161 self.lastInbound = (new Date()).getTime()
162 // outbound qos-1 complete
165 client.on('pubrec',function(packet) {
166 self.lastInbound = (new Date()).getTime()
167 self.lastOutbound = (new Date()).getTime()
168 self.client.pubrel(packet);
170 client.on('pubcomp',function(packet) {
171 self.lastInbound = (new Date()).getTime()
172 // outbound qos-2 complete
174 client.on('pingresp',function(packet) {
175 //util.log('[mqtt] ['+self.uid+'] received pingresp');
176 self.lastInbound = (new Date()).getTime()
177 self.pingOutstanding = false;
180 this.lastOutbound = (new Date()).getTime()
181 this.connectionError = false;
182 client.connect(self.options);
187 MQTTClient.prototype.subscribe = function(topic,qos) {
189 if (self.connected) {
191 subscriptions:[{topic:topic,qos:qos}],
192 messageId: self._nextMessageId()
194 this.pendingSubscriptions[options.messageId] = topic;
195 this.lastOutbound = (new Date()).getTime()
196 self.client.subscribe(options);
199 MQTTClient.prototype.unsubscribe = function(topic) {
201 if (self.connected) {
204 messageId: self._nextMessageId()
206 this.pendingSubscriptions[options.messageId] = topic;
207 this.lastOutbound = (new Date()).getTime()
208 self.client.unsubscribe(options);
212 MQTTClient.prototype.publish = function(topic,payload,qos,retain) {
214 if (self.connected) {
216 if (!Buffer.isBuffer(payload)) {
217 if (typeof payload === "object") {
218 payload = JSON.stringify(payload);
219 } else if (typeof payload !== "string") {
220 payload = ""+payload;
229 if (options.qos != 0) {
230 options.messageId = self._nextMessageId();
232 this.lastOutbound = (new Date()).getTime()
233 self.client.publish(options);
237 MQTTClient.prototype.disconnect = function() {
239 if (this.connected) {
240 this.connected = false;
242 this.client.disconnect();
247 MQTTClient.prototype.isConnected = function() {
248 return this.connected;
250 module.exports.createClient = function(port,host) {
251 var mqtt_client = new MQTTClient(port,host);