2 * Copyright 2013 IBM Corp.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 var util = require("util");
17 var mqtt = require("./mqtt");
18 var settings = require(process.env.NODE_RED_HOME+"/red/red").settings;
22 function matchTopic(ts,t) {
26 var re = new RegExp("^"+ts.replace(/([\[\]\?\(\)\\\\$\^\*\.|])/g,"\\$1").replace(/\+/g,"[^/]+").replace(/\/#$/,"(\/.*)?")+"$");
31 get: function(broker,port,clientid,username,password,will) {
32 var id = "["+(username||"")+":"+(password||"")+"]["+(clientid||"")+"]@"+broker+":"+port;
33 if (!connections[id]) {
34 connections[id] = function() {
35 var uid = (1+Math.random()*4294967295).toString(16);
36 var client = mqtt.createClient(port,broker);
38 client.setMaxListeners(0);
39 var options = {keepalive:15};
40 options.clientId = clientid || 'mqtt_' + (1+Math.random()*4294967295).toString(16);
41 options.username = username;
42 options.password = password;
45 var subscriptions = [];
46 var connecting = false;
49 publish: function(msg) {
50 if (client.isConnected()) {
51 client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
55 client.connect(options);
60 subscribe: function(topic,qos,callback) {
61 subscriptions.push({topic:topic,qos:qos,callback:callback});
62 client.on('message',function(mtopic,mpayload,mqos,mretain) {
63 if (matchTopic(topic,mtopic)) {
64 callback(mtopic,mpayload,mqos,mretain);
67 if (client.isConnected()) {
68 client.subscribe(topic,qos);
78 if (client && !client.isConnected() && !connecting) {
80 client.connect(options);
83 disconnect: function() {
85 if (this._instances == 0) {
88 delete connections[id];
92 client.on('connect',function() {
94 util.log('[mqtt] ['+uid+'] connected to broker tcp://'+broker+':'+port);
96 for (var s in subscriptions) {
97 var topic = subscriptions[s].topic;
98 var qos = subscriptions[s].qos;
99 var callback = subscriptions[s].callback;
100 client.subscribe(topic,qos);
102 //console.log("connected - publishing",queue.length,"messages");
103 while(queue.length) {
104 var msg = queue.shift();
106 client.publish(msg.topic,msg.payload,msg.qos,msg.retain);
110 client.on('connectionlost', function(err) {
111 util.log('[mqtt] ['+uid+'] connection lost to broker tcp://'+broker+':'+port);
113 setTimeout(function() {
115 }, settings.mqttReconnectTime||5000);
117 client.on('disconnect', function() {
119 util.log('[mqtt] ['+uid+'] disconnected from broker tcp://'+broker+':'+port);
125 connections[id]._instances += 1;
126 return connections[id];