[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / 10-mqtt.js
1 /**
2  * Copyright 2013,2014 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16
17 module.exports = function(RED) {
18     "use strict";
19     var connectionPool = require("./lib/mqttConnectionPool");
20
21     function MQTTBrokerNode(n) {
22         RED.nodes.createNode(this,n);
23         this.broker = n.broker;
24         this.port = n.port;
25         this.clientid = n.clientid;
26         if (this.credentials) {
27             this.username = this.credentials.user;
28             this.password = this.credentials.password;
29         }
30     }
31     RED.nodes.registerType("mqtt-broker",MQTTBrokerNode,{
32         credentials: {
33             user: {type:"text"},
34             password: {type: "password"}
35         }
36     });
37
38     function MQTTInNode(n) {
39         RED.nodes.createNode(this,n);
40         this.topic = n.topic;
41         this.broker = n.broker;
42         this.brokerConfig = RED.nodes.getNode(this.broker);
43         if (this.brokerConfig) {
44             this.status({fill:"red",shape:"ring",text:"disconnected"});
45             this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
46             var node = this;
47             this.client.subscribe(this.topic,2,function(topic,payload,qos,retain) {
48                     var msg = {topic:topic,payload:payload,qos:qos,retain:retain};
49                     if ((node.brokerConfig.broker == "localhost")||(node.brokerConfig.broker == "127.0.0.1")) {
50                         msg._topic = topic;
51                     }
52                     node.send(msg);
53             });
54             this.client.on("connectionlost",function() {
55                 node.status({fill:"red",shape:"ring",text:"disconnected"});
56             });
57             this.client.on("connect",function() {
58                 node.status({fill:"green",shape:"dot",text:"connected"});
59             });
60             this.client.connect();
61         } else {
62             this.error("missing broker configuration");
63         }
64         this.on('close', function() {
65             if (this.client) {
66                 this.client.disconnect();
67             }
68         });
69     }
70     RED.nodes.registerType("mqtt in",MQTTInNode);
71
72     function MQTTOutNode(n) {
73         RED.nodes.createNode(this,n);
74         this.topic = n.topic;
75         this.qos = n.qos || null;
76         this.retain = n.retain;
77         this.broker = n.broker;
78         this.brokerConfig = RED.nodes.getNode(this.broker);
79
80         if (this.brokerConfig) {
81             this.status({fill:"red",shape:"ring",text:"disconnected"},true);
82             this.client = connectionPool.get(this.brokerConfig.broker,this.brokerConfig.port,this.brokerConfig.clientid,this.brokerConfig.username,this.brokerConfig.password);
83             var node = this;
84             this.on("input",function(msg) {
85                 if (msg.qos) {
86                     msg.qos = parseInt(msg.qos);
87                     if ((msg.qos !== 0) && (msg.qos !== 1) && (msg.qos !== 2)) {
88                         msg.qos = null;
89                     }
90                 }
91                 msg.qos = Number(node.qos || msg.qos || 0);
92                 msg.retain = node.retain || msg.retain || false;
93                 msg.retain = ((msg.retain === true) || (msg.retain === "true")) || false;
94                 if (node.topic) {
95                     msg.topic = node.topic;
96                 }
97                 if ((msg.hasOwnProperty("topic")) && (typeof msg.topic === "string") && (msg.topic !== "")) { // topic must exist
98                     this.client.publish(msg);  // send the message
99                 }
100                 else { node.warn("Invalid topic specified"); }
101             });
102             this.client.on("connectionlost",function() {
103                 node.status({fill:"red",shape:"ring",text:"disconnected"});
104             });
105             this.client.on("connect",function() {
106                 node.status({fill:"green",shape:"dot",text:"connected"});
107             });
108             this.client.connect();
109         } else {
110             this.error("missing broker configuration");
111         }
112         this.on('close', function() {
113             if (this.client) {
114                 this.client.disconnect();
115             }
116         });
117     }
118     RED.nodes.registerType("mqtt out",MQTTOutNode);
119 }