2 * Copyright 2013,2014 IBM Corp.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 module.exports = function(RED) {
19 var reconnectTime = RED.settings.socketReconnectTime||10000;
20 var socketTimeout = RED.settings.socketTimeout||null;
21 var net = require('net');
23 var connectionPool = {};
26 RED.nodes.createNode(this,n);
28 this.port = n.port * 1;
30 this.stream = (!n.datamode||n.datamode=='stream'); /* stream,single*/
31 this.datatype = n.datatype||'buffer'; /* buffer,utf8,base64 */
32 this.newline = (n.newline||"").replace("\\n","\n").replace("\\r","\r");
33 this.base64 = n.base64;
34 this.server = (typeof n.server == 'boolean')?n.server:(n.server == "server");
44 var setupTcpClient = function() {
45 node.log("connecting to "+node.host+":"+node.port);
46 node.status({fill:"grey",shape:"dot",text:"connecting"});
47 var id = (1+Math.random()*4294967295).toString(16);
48 client = net.connect(node.port, node.host, function() {
49 buffer = (node.datatype == 'buffer')? new Buffer(0):"";
50 node.log("connected to "+node.host+":"+node.port);
51 node.status({fill:"green",shape:"dot",text:"connected"});
53 connectionPool[id] = client;
55 client.on('data', function (data) {
56 if (node.datatype != 'buffer') {
57 data = data.toString(node.datatype);
60 if ((node.datatype) === "utf8" && node.newline != "") {
62 var parts = buffer.split(node.newline);
63 for (var i = 0;i<parts.length-1;i+=1) {
64 var msg = {topic:node.topic, payload:parts[i]};
65 msg._session = {type:"tcp",id:id};
68 buffer = parts[parts.length-1];
70 var msg = {topic:node.topic, payload:data};
71 msg._session = {type:"tcp",id:id};
75 if ((typeof data) === "string") {
78 buffer = Buffer.concat([buffer,data],buffer.length+data.length);
82 client.on('end', function() {
83 if (!node.stream || (node.datatype == "utf8" && node.newline != "" && buffer.length > 0)) {
84 var msg = {topic:node.topic,payload:buffer};
85 msg._session = {type:"tcp",id:id};
86 if (buffer.length !== 0) {
87 end = true; // only ask for fast re-connect if we actually got something
93 client.on('close', function() {
94 delete connectionPool[id];
95 node.status({fill:"red",shape:"ring",text:"disconnected"});
97 if (end) { // if we were asked to close then try to reconnect once very quick.
99 reconnectTimeout = setTimeout(setupTcpClient, 20);
102 node.log("connection lost to "+node.host+":"+node.port);
103 reconnectTimeout = setTimeout(setupTcpClient, reconnectTime);
107 client.on('error', function(err) {
113 this.on('close', function() {
116 clearTimeout(reconnectTimeout);
119 var server = net.createServer(function (socket) {
120 if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
121 var id = (1+Math.random()*4294967295).toString(16);
122 connectionPool[id] = socket;
123 node.status({text:++count+" connections"});
125 var buffer = (node.datatype == 'buffer')? new Buffer(0):"";
126 socket.on('data', function (data) {
127 if (node.datatype != 'buffer') {
128 data = data.toString(node.datatype);
131 if ((typeof data) === "string" && node.newline != "") {
132 buffer = buffer+data;
133 var parts = buffer.split(node.newline);
134 for (var i = 0;i<parts.length-1;i+=1) {
135 var msg = {topic:node.topic, payload:parts[i],ip:socket.remoteAddress,port:socket.remotePort};
136 msg._session = {type:"tcp",id:id};
139 buffer = parts[parts.length-1];
141 var msg = {topic:node.topic, payload:data};
142 msg._session = {type:"tcp",id:id};
146 if ((typeof data) === "string") {
147 buffer = buffer+data;
149 buffer = Buffer.concat([buffer,data],buffer.length+data.length);
153 socket.on('end', function() {
154 if (!node.stream || (node.datatype === "utf8" && node.newline !== "")) {
155 if (buffer.length > 0) {
156 var msg = {topic:node.topic,payload:buffer};
157 msg._session = {type:"tcp",id:id};
163 socket.on('timeout', function() {
164 node.log('timeout closed socket port '+node.port);
167 socket.on('close', function() {
168 delete connectionPool[id];
169 node.status({text:--count+" connections"});
171 socket.on('error',function(err) {
175 server.on('error', function(err) {
177 node.error('unable to listen on port '+node.port+' : '+err);
180 server.listen(node.port, function(err) {
182 node.error('unable to listen on port '+node.port+' : '+err);
184 node.log('listening on port '+node.port);
186 node.on('close', function() {
189 node.log('stopped listening on port '+node.port);
196 RED.nodes.registerType("tcp in",TcpIn);
199 RED.nodes.createNode(this,n);
201 this.port = n.port * 1;
202 this.base64 = n.base64;
203 this.doend = n.end || false;
204 this.beserver = n.beserver;
206 this.closing = false;
209 if (!node.beserver||node.beserver=="client") {
210 var reconnectTimeout;
212 var connected = false;
215 var setupTcpClient = function() {
216 node.log("connecting to "+node.host+":"+node.port);
217 node.status({fill:"grey",shape:"dot",text:"connecting"});
218 client = net.connect(node.port, node.host, function() {
220 node.log("connected to "+node.host+":"+node.port);
221 node.status({fill:"green",shape:"dot",text:"connected"});
223 client.on('error', function (err) {
224 node.log('error : '+err);
226 client.on('end', function (err) {
228 client.on('close', function() {
229 node.status({fill:"red",shape:"ring",text:"disconnected"});
235 reconnectTimeout = setTimeout(setupTcpClient,20);
238 node.log("connection lost to "+node.host+":"+node.port);
239 reconnectTimeout = setTimeout(setupTcpClient,reconnectTime);
246 node.on("input", function(msg) {
247 if (connected && msg.payload != null) {
248 if (Buffer.isBuffer(msg.payload)) {
249 client.write(msg.payload);
250 } else if (typeof msg.payload === "string" && node.base64) {
251 client.write(new Buffer(msg.payload,'base64'));
253 client.write(new Buffer(""+msg.payload));
255 if (node.doend === true) {
262 node.on("close", function() {
265 clearTimeout(reconnectTimeout);
268 } else if (node.beserver == "reply") {
269 node.on("input",function(msg) {
270 if (msg._session && msg._session.type == "tcp") {
271 var client = connectionPool[msg._session.id];
273 if (Buffer.isBuffer(msg.payload)) {
274 client.write(msg.payload);
275 } else if (typeof msg.payload === "string" && node.base64) {
276 client.write(new Buffer(msg.payload,'base64'));
278 client.write(new Buffer(""+msg.payload));
284 var connectedSockets = [];
285 node.status({text:"0 connections"});
286 var server = net.createServer(function (socket) {
287 if (socketTimeout !== null) { socket.setTimeout(socketTimeout); }
288 var remoteDetails = socket.remoteAddress+":"+socket.remotePort;
289 node.log("connection from "+remoteDetails);
290 connectedSockets.push(socket);
291 node.status({text:connectedSockets.length+" connections"});
292 socket.on('timeout', function() {
293 node.log('timeout closed socket port '+node.port);
296 socket.on('close',function() {
297 node.log("connection closed from "+remoteDetails);
298 connectedSockets.splice(connectedSockets.indexOf(socket),1);
299 node.status({text:connectedSockets.length+" connections"});
301 socket.on('error',function() {
302 node.log("socket error from "+remoteDetails);
303 connectedSockets.splice(connectedSockets.indexOf(socket),1);
304 node.status({text:connectedSockets.length+" connections"});
308 node.on("input", function(msg) {
309 if (msg.payload != null) {
311 if (Buffer.isBuffer(msg.payload)) {
312 buffer = msg.payload;
313 } else if (typeof msg.payload === "string" && node.base64) {
314 buffer = new Buffer(msg.payload,'base64');
316 buffer = new Buffer(""+msg.payload);
318 for (var i = 0; i<connectedSockets.length;i+=1) {
319 if (node.doend === true) { connectedSockets[i].end(buffer); }
320 else { connectedSockets[i].write(buffer); }
325 server.on('error', function(err) {
327 node.error('unable to listen on port '+node.port+' : '+err);
331 server.listen(node.port, function(err) {
333 node.error('unable to listen on port '+node.port+' : '+err);
335 node.log('listening on port '+node.port);
336 node.on('close', function() {
338 node.log('stopped listening on port '+node.port);
344 RED.nodes.registerType("tcp out",TcpOut);
347 RED.nodes.createNode(this,n);
348 this.server = n.server;
349 this.port = Number(n.port);
351 this.splitc = n.splitc;
353 if (this.out != "char") { this.splitc = Number(this.splitc); }
354 else { this.splitc.replace("\\n","\n").replace("\\r","\r").replace("\\t","\t").replace("\\e","\e").replace("\\f","\f").replace("\\0","\0"); }
357 if (this.out == "count") { buf = new Buffer(this.splitc); }
358 else { buf = new Buffer(32768); } // set it to 32k... hopefully big enough for most.... but only hopefully
360 this.connected = false;
364 this.on("input", function(msg) {
366 if ((!Buffer.isBuffer(msg.payload)) && (typeof msg.payload !== "string")) {
367 msg.payload = msg.payload.toString();
369 if (!node.connected) {
370 client = net.Socket();
371 client.setTimeout(socketTimeout);
373 client.connect(node.port, node.server, function() {
374 //node.log('client connected');
375 node.status({fill:"green",shape:"dot",text:"connected"});
376 node.connected = true;
377 client.write(msg.payload);
380 client.on('data', function(data) {
381 //node.log("data:"+ data.length+":"+ data);
382 if (node.splitc === 0) {
383 node.send({"payload": data});
385 else if (node.out === "sit") { // if we are staying connected just send the buffer
386 node.send({"payload": data});
389 for (var j = 0; j < data.length; j++ ) {
390 if (node.out === "time") {
391 // do the timer thing
397 node.tout = setTimeout(function () {
399 var m = new Buffer(i+1);
401 node.send({"payload": m});
409 // count bytes into a buffer...
410 else if (node.out == "count") {
413 if ( i >= node.serialConfig.count) {
414 node.send({"payload": buf});
423 if (data[j] == node.splitc) {
424 var m = new Buffer(i);
426 node.send({"payload": m});
436 client.on('end', function() {
437 //node.log('client disconnected');
438 node.connected = false;
443 client.on('error', function() {
444 node.log('connect failed');
445 node.status({fill:"red",shape:"ring",text:"error"});
446 if (client) { client.end(); }
449 client.on('timeout',function() {
450 node.log('connect timeout');
453 setTimeout(function() {
454 client.connect(node.port, node.server, function() {
455 //node.log('client connected');
456 node.connected = true;
457 client.write(msg.payload);
463 else { client.write(msg.payload); }
466 this.on("close", function() {
467 if (client) { buf = null; client.end(); }
471 RED.nodes.registerType("tcp request",TcpGet);