[CCSDK-28] populated the seed code for dgbuilder
[ccsdk/distribution.git] / dgbuilder / core_nodes / io / 31-tcpin.js
1 /**
2  * Copyright 2013,2014 IBM Corp.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  **/
16
17 module.exports = function(RED) {
18     "use strict";
19     var reconnectTime = RED.settings.socketReconnectTime||10000;
20     var socketTimeout = RED.settings.socketTimeout||null;
21     var net = require('net');
22
23     var connectionPool = {};
24
25     function TcpIn(n) {
26         RED.nodes.createNode(this,n);
27         this.host = n.host;
28         this.port = n.port * 1;
29         this.topic = n.topic;
30         this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/
31         this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */
32         this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r");
33         this.base64 = n.base64;
34         this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
35         this.closing = false;
36         var node = this;
37         var count = 0;
38
39         if (!node.server) {
40             var buffer = null;
41             var client;
42             var reconnectTimeout;
43             var end = false;
44             var setupTcpClient = function() {
45                 node.log("connecting to "+node.host+":"+node.port);
46                 node.status({fill:"grey",shape:"dot",text:"connecting"});
47                 var id = (1+Math.random()*4294967295).toString(16);
48                 client = net.connect(node.port, node.host, function() {
49                     buffer = (node.datatype == 'buffer')? new Buffer(0):"";
50                     node.log("connected to "+node.host+":"+node.port);
51                     node.status({fill:"green",shape:"dot",text:"connected"});
52                 });
53                 connectionPool[id] = client;
54
55                 client.on('data', function (data) {
56                     if (node.datatype != 'buffer') {
57                         data = data.toString(node.datatype);
58                     }
59                     if (node.stream) {
60                         if ((node.datatype) === "utf8" && node.newline != "") {
61                             buffer = buffer+data;
62                             var parts = buffer.split(node.newline);
63                             for (var i = 0;i<parts.length-1;i+=1) {
64                                 var msg = {topic:node.topic, payload:parts[i]};
65                                 msg._session = {type:"tcp",id:id};
66                                 node.send(msg);
67                             }
68                             buffer = parts[parts.length-1];
69                         } else {
70                             var msg = {topic:node.topic, payload:data};
71                             msg._session = {type:"tcp",id:id};
72                             node.send(msg);
73                         }
74                     } else {
75                         if ((typeof data) === "string") {
76                             buffer = buffer+data;
77                         } else {
78                             buffer = Buffer.concat([buffer,data],buffer.length+data.length);
79                         }
80                     }
81                 });
82                 client.on('end', function() {
83                     if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
84                         var msg = {topic:node.topic,payload:buffer};
85                         msg._session = {type:"tcp",id:id};
86                         if (buffer.length !== 0) {
87                             end = true; // only ask for fast re-connect if we actually got something
88                             node.send(msg);
89                         }
90                         buffer = null;
91                     }
92                 });
93                 client.on('close', function() {
94                     delete connectionPool[id];
95                     node.status({fill:"red",shape:"ring",text:"disconnected"});
96                     if (!node.closing) {
97                         if (end) { // if we were asked to close then try to reconnect once very quick.
98                             end = false;
99                             reconnectTimeout = setTimeout(setupTcpClient, 20);
100                         }
101                         else {
102                             node.log("connection lost to "+node.host+":"+node.port);
103                             reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
104                         }
105                     }
106                 });
107                 client.on('error', function(err) {
108                     node.log(err);
109                 });
110             }
111             setupTcpClient();
112
113             this.on('close', function() {
114                 this.closing = true;
115                 client.end();
116                 clearTimeout(reconnectTimeout);
117             });
118         } else {
119             var server = net.createServer(function (socket) {
120                 if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
121                 var id = (1+Math.random()*4294967295).toString(16);
122                 connectionPool[id] = socket;
123                 node.status({text:++count+" connections"});
124
125                 var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
126                 socket.on('data', function (data) {
127                     if (node.datatype != 'buffer') {
128                         data = data.toString(node.datatype);
129                     }
130                     if (node.stream) {
131                         if ((typeof data) === "string" && node.newline != "") {
132                             buffer = buffer+data;
133                             var parts = buffer.split(node.newline);
134                             for (var i = 0;i<parts.length-1;i+=1) {
135                                 var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
136                                 msg._session = {type:"tcp",id:id};
137                                 node.send(msg);
138                             }
139                             buffer = parts[parts.length-1];
140                         } else {
141                             var msg = {topic:node.topic, payload:data};
142                             msg._session = {type:"tcp",id:id};
143                             node.send(msg);
144                         }
145                     } else {
146                         if ((typeof data) === "string") {
147                             buffer = buffer+data;
148                         } else {
149                             buffer = Buffer.concat([buffer,data],buffer.length+data.length);
150                         }
151                     }
152                 });
153                 socket.on('end', function() {
154                     if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) {
155                         if (buffer.length > 0) {
156                             var msg = {topic:node.topic,payload:buffer};
157                             msg._session = {type:"tcp",id:id};
158                             node.send(msg);
159                         }
160                         buffer = null;
161                     }
162                 });
163                 socket.on('timeout', function() {
164                     node.log('timeout closed socket port '+node.port);
165                     socket.end();
166                 });
167                 socket.on('close', function() {
168                     delete connectionPool[id];
169                     node.status({text:--count+" connections"});
170                 });
171                 socket.on('error',function(err) {
172                     node.log(err);
173                 });
174             });
175             server.on('error', function(err) {
176                 if (err) {
177                     node.error('unable to listen on port '+node.port+' : '+err);
178                 }
179             });
180             server.listen(node.port, function(err) {
181                 if (err) {
182                     node.error('unable to listen on port '+node.port+' : '+err);
183                 } else {
184                     node.log('listening on port '+node.port);
185
186                     node.on('close', function() {
187                         node.closing = true;
188                         server.close();
189                         node.log('stopped listening on port '+node.port);
190                     });
191                 }
192             });
193         }
194
195     }
196     RED.nodes.registerType("tcp in",TcpIn);
197
198     function TcpOut(n) {
199         RED.nodes.createNode(this,n);
200         this.host = n.host;
201         this.port = n.port * 1;
202         this.base64 = n.base64;
203         this.doend = n.end || false;
204         this.beserver = n.beserver;
205         this.name = n.name;
206         this.closing = false;
207         var node = this;
208
209         if (!node.beserver||node.beserver=="client") {
210             var reconnectTimeout;
211             var client = null;
212             var connected = false;
213             var end = false;
214
215             var setupTcpClient = function() {
216                 node.log("connecting to "+node.host+":"+node.port);
217                 node.status({fill:"grey",shape:"dot",text:"connecting"});
218                 client = net.connect(node.port, node.host, function() {
219                     connected = true;
220                     node.log("connected to "+node.host+":"+node.port);
221                     node.status({fill:"green",shape:"dot",text:"connected"});
222                 });
223                 client.on('error', function (err) {
224                     node.log('error : '+err);
225                 });
226                 client.on('end', function (err) {
227                 });
228                 client.on('close', function() {
229                     node.status({fill:"red",shape:"ring",text:"disconnected"});
230                     connected = false;
231                     client.destroy();
232                     if (!node.closing) {
233                         if (end) {
234                             end = false;
235                             reconnectTimeout = setTimeout(setupTcpClient,20);
236                         }
237                         else {
238                             node.log("connection lost to "+node.host+":"+node.port);
239                             reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
240                         }
241                     }
242                 });
243             }
244             setupTcpClient();
245
246             node.on("input", function(msg) {
247                 if (connected && msg.payload != null) {
248                     if (Buffer.isBuffer(msg.payload)) {
249                         client.write(msg.payload);
250                     } else if (typeof msg.payload === "string" && node.base64) {
251                         client.write(new Buffer(msg.payload,'base64'));
252                     } else {
253                         client.write(new Buffer(""+msg.payload));
254                     }
255                     if (node.doend === true) {
256                         end = true;
257                         client.end();
258                     }
259                 }
260             });
261
262             node.on("close", function() {
263                 this.closing = true;
264                 client.end();
265                 clearTimeout(reconnectTimeout);
266             });
267
268         } else if (node.beserver == "reply") {
269             node.on("input",function(msg) {
270                 if (msg._session && msg._session.type == "tcp") {
271                     var client = connectionPool[msg._session.id];
272                     if (client) {
273                         if (Buffer.isBuffer(msg.payload)) {
274                             client.write(msg.payload);
275                         } else if (typeof msg.payload === "string" && node.base64) {
276                             client.write(new Buffer(msg.payload,'base64'));
277                         } else {
278                             client.write(new Buffer(""+msg.payload));
279                         }
280                     }
281                 }
282             });
283         } else {
284             var connectedSockets = [];
285             node.status({text:"0 connections"});
286             var server = net.createServer(function (socket) {
287                 if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
288                 var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
289                 node.log("connection from "+remoteDetails);
290                 connectedSockets.push(socket);
291                 node.status({text:connectedSockets.length+" connections"});
292                 socket.on('timeout', function() {
293                     node.log('timeout closed socket port '+node.port);
294                     socket.end();
295                 });
296                 socket.on('close',function() {
297                     node.log("connection closed from "+remoteDetails);
298                     connectedSockets.splice(connectedSockets.indexOf(socket),1);
299                     node.status({text:connectedSockets.length+" connections"});
300                 });
301                 socket.on('error',function() {
302                     node.log("socket error from "+remoteDetails);
303                     connectedSockets.splice(connectedSockets.indexOf(socket),1);
304                     node.status({text:connectedSockets.length+" connections"});
305                 });
306             });
307
308             node.on("input", function(msg) {
309                 if (msg.payload != null) {
310                     var buffer;
311                     if (Buffer.isBuffer(msg.payload)) {
312                         buffer = msg.payload;
313                     } else if (typeof msg.payload === "string" && node.base64) {
314                         buffer = new Buffer(msg.payload,'base64');
315                     } else {
316                         buffer = new Buffer(""+msg.payload);
317                     }
318                     for (var i = 0; i<connectedSockets.length;i+=1) {
319                         if (node.doend === true) { connectedSockets[i].end(buffer); }
320                         else { connectedSockets[i].write(buffer); }
321                     }
322                 }
323             });
324
325             server.on('error', function(err) {
326                 if (err) {
327                     node.error('unable to listen on port '+node.port+' : '+err);
328                 }
329             });
330
331             server.listen(node.port, function(err) {
332                 if (err) {
333                     node.error('unable to listen on port '+node.port+' : '+err);
334                 } else {
335                     node.log('listening on port '+node.port);
336                     node.on('close', function() {
337                         server.close();
338                         node.log('stopped listening on port '+node.port);
339                     });
340                 }
341             });
342         }
343     }
344     RED.nodes.registerType("tcp out",TcpOut);
345
346     function TcpGet(n) {
347         RED.nodes.createNode(this,n);
348         this.server = n.server;
349         this.port = Number(n.port);
350         this.out = n.out;
351         this.splitc = n.splitc;
352
353         if (this.out != "char") { this.splitc = Number(this.splitc); }
354         else { this.splitc.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); }
355
356         var buf;
357         if (this.out == "count") { buf = new Buffer(this.splitc); }
358         else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully
359
360         this.connected = false;
361         var node = this;
362         var client;
363
364         this.on("input", function(msg) {
365             var i = 0;
366             if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
367                 msg.payload = msg.payload.toString();
368             }
369             if (!node.connected) {
370                 client = net.Socket();
371                 client.setTimeout(socketTimeout);
372                 node.status({});
373                 client.connect(node.port, node.server, function() {
374                     //node.log('client connected');
375                     node.status({fill:"green",shape:"dot",text:"connected"});
376                     node.connected = true;
377                     client.write(msg.payload);
378                 });
379
380                 client.on('data', function(data) {
381                     //node.log("data:"+ data.length+":"+ data);
382                     if (node.splitc === 0) {
383                         node.send({"payload": data});
384                     }
385                     else if (node.out === "sit") { // if we are staying connected just send the buffer
386                         node.send({"payload": data});
387                     }
388                     else {
389                         for (var j = 0; j < data.length; j++ ) {
390                             if (node.out === "time")  {
391                                 // do the timer thing
392                                 if (node.tout) {
393                                     i += 1;
394                                     buf[i] = data[j];
395                                 }
396                                 else {
397                                     node.tout = setTimeout(function () {
398                                         node.tout = null;
399                                         var m = new Buffer(i+1);
400                                         buf.copy(m,0,0,i+1);
401                                         node.send({"payload": m});
402                                         client.end();
403                                         m = null;
404                                     }, node.splitc);
405                                     i = 0;
406                                     buf[0] = data[j];
407                                 }
408                             }
409                             // count bytes into a buffer...
410                             else if (node.out == "count") {
411                                 buf[i] = data[j];
412                                 i += 1;
413                                 if ( i >= node.serialConfig.count) {
414                                     node.send({"payload": buf});
415                                     client.end();
416                                     i = 0;
417                                 }
418                             }
419                             // look for a char
420                             else {
421                                 buf[i] = data[j];
422                                 i += 1;
423                                 if (data[j] == node.splitc) {
424                                     var m = new Buffer(i);
425                                     buf.copy(m,0,0,i);
426                                     node.send({"payload": m});
427                                     client.end();
428                                     m = null;
429                                     i = 0;
430                                 }
431                             }
432                         }
433                     }
434                 });
435
436                 client.on('end', function() {
437                     //node.log('client disconnected');
438                     node.connected = false;
439                     node.status({});
440                     client = null;
441                 });
442
443                 client.on('error', function() {
444                     node.log('connect failed');
445                     node.status({fill:"red",shape:"ring",text:"error"});
446                     if (client) { client.end(); }
447                 });
448
449                 client.on('timeout',function() {
450                     node.log('connect timeout');
451                     if (client) {
452                         client.end();
453                         setTimeout(function() {
454                             client.connect(node.port, node.server, function() {
455                                 //node.log('client connected');
456                                 node.connected = true;
457                                 client.write(msg.payload);
458                             });
459                         },reconnectTime);
460                     }
461                 });
462             }
463             else { client.write(msg.payload); }
464         });
465
466         this.on("close", function() {
467             if (client) { buf = null; client.end(); }
468         });
469
470     }
471     RED.nodes.registerType("tcp request",TcpGet);
472 }