261fd7757a43e92b286f16a55f0ebfb602af1edf
[aai/esr-gui.git] /
1 "use strict";
2
3 var inherits = require('util').inherits
4   , EventEmitter = require('events').EventEmitter
5   , net = require('net')
6   , tls = require('tls')
7   , crypto = require('crypto')
8   , f = require('util').format
9   , debugOptions = require('./utils').debugOptions
10   , Response = require('./commands').Response
11   , MongoError = require('../error')
12   , Logger = require('./logger');
13
14 var _id = 0;
15 var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
16   , 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
17   , 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity'];
18 var connectionAccounting = false;
19 var connections = {};
20
21 /**
22  * Creates a new Connection instance
23  * @class
24  * @param {string} options.host The server host
25  * @param {number} options.port The server port
26  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
27  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
28  * @param {boolean} [options.noDelay=true] TCP Connection no delay
29  * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
30  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
31  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
32  * @param {boolean} [options.ssl=false] Use SSL for connection
33  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
34  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
35  * @param {Buffer} [options.cert] SSL Certificate binary buffer
36  * @param {Buffer} [options.key] SSL Key file binary buffer
37  * @param {string} [options.passphrase] SSL Certificate pass phrase
38  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
39  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
40  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
41  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
42  * @fires Connection#connect
43  * @fires Connection#close
44  * @fires Connection#error
45  * @fires Connection#timeout
46  * @fires Connection#parseError
47  * @return {Connection} A cursor instance
48  */
49 var Connection = function(messageHandler, options) {
50   // Add event listener
51   EventEmitter.call(this);
52   // Set empty if no options passed
53   this.options = options || {};
54   // Identification information
55   this.id = _id++;
56   // Logger instance
57   this.logger = Logger('Connection', options);
58   // No bson parser passed in
59   if(!options.bson) throw new Error("must pass in valid bson parser");
60   // Get bson parser
61   this.bson = options.bson;
62   // Grouping tag used for debugging purposes
63   this.tag = options.tag;
64   // Message handler
65   this.messageHandler = messageHandler;
66
67   // Max BSON message size
68   this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
69   // Debug information
70   if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
71
72   // Default options
73   this.port = options.port || 27017;
74   this.host = options.host || 'localhost';
75   this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
76   this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
77   this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
78   this.connectionTimeout = options.connectionTimeout || 0;
79   this.socketTimeout = options.socketTimeout || 0;
80
81   // If connection was destroyed
82   this.destroyed = false;
83
84   // Check if we have a domain socket
85   this.domainSocket = this.host.indexOf('\/') != -1;
86
87   // Serialize commands using function
88   this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
89   this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
90
91   // SSL options
92   this.ca = options.ca || null;
93   this.cert = options.cert || null;
94   this.key = options.key || null;
95   this.passphrase = options.passphrase || null;
96   this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
97   this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
98   this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
99     || typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
100
101   // If ssl not enabled
102   if(!this.ssl) this.rejectUnauthorized = false;
103
104   // Response options
105   this.responseOptions = {
106     promoteLongs: typeof options.promoteLongs == 'boolean' ?  options.promoteLongs : true,
107     promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
108     promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers: false
109   }
110
111   // Flushing
112   this.flushing = false;
113   this.queue = [];
114
115   // Internal state
116   this.connection = null;
117   this.writeStream = null;
118
119   // Create hash method
120   var hash = crypto.createHash('sha1');
121   hash.update(f('%s:%s', this.host, this.port));
122
123   // Create a hash name
124   this.hashedName = hash.digest('hex');
125
126   // All operations in flight on the connection
127   this.workItems = [];
128 }
129
130 inherits(Connection, EventEmitter);
131
132 Connection.prototype.setSocketTimeout = function(value) {
133   if(this.connection) {
134     this.connection.setTimeout(value);
135   }
136 }
137
138 Connection.prototype.resetSocketTimeout = function(value) {
139   if(this.connection) {
140     this.connection.setTimeout(this.socketTimeout);;
141   }
142 }
143
144 Connection.enableConnectionAccounting = function() {
145   connectionAccounting = true;
146   connections = {};
147 }
148
149 Connection.disableConnectionAccounting = function() {
150   connectionAccounting = false;
151 }
152
153 Connection.connections = function() {
154   return connections;
155 }
156
157 //
158 // Connection handlers
159 var errorHandler = function(self) {
160   return function(err) {
161     if(connectionAccounting) delete connections[self.id];
162     // Debug information
163     if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
164     // Emit the error
165     if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
166   }
167 }
168
169 var timeoutHandler = function(self) {
170   return function(err) {
171     if(connectionAccounting) delete connections[self.id];
172     // Debug information
173     if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
174     // Emit timeout error
175     self.emit("timeout"
176       , MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
177       , self);
178   }
179 }
180
181 var closeHandler = function(self) {
182   return function(hadError) {
183     if(connectionAccounting) delete connections[self.id];
184     // Debug information
185     if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
186
187     // Emit close event
188     if(!hadError) {
189       self.emit("close"
190         , MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
191         , self);
192     }
193   }
194 }
195
196 var dataHandler = function(self) {
197   return function(data) {
198     // Parse until we are done with the data
199     while(data.length > 0) {
200       // If we still have bytes to read on the current message
201       if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
202         // Calculate the amount of remaining bytes
203         var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
204         // Check if the current chunk contains the rest of the message
205         if(remainingBytesToRead > data.length) {
206           // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
207           data.copy(self.buffer, self.bytesRead);
208           // Adjust the number of bytes read so it point to the correct index in the buffer
209           self.bytesRead = self.bytesRead + data.length;
210
211           // Reset state of buffer
212           data = new Buffer(0);
213         } else {
214           // Copy the missing part of the data into our current buffer
215           data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
216           // Slice the overflow into a new buffer that we will then re-parse
217           data = data.slice(remainingBytesToRead);
218
219           // Emit current complete message
220           try {
221             var emitBuffer = self.buffer;
222             // Reset state of buffer
223             self.buffer = null;
224             self.sizeOfMessage = 0;
225             self.bytesRead = 0;
226             self.stubBuffer = null;
227             // Emit the buffer
228             self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
229           } catch(err) {
230             var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
231               sizeOfMessage:self.sizeOfMessage,
232               bytesRead:self.bytesRead,
233               stubBuffer:self.stubBuffer}};
234             // We got a parse Error fire it off then keep going
235             self.emit("parseError", errorObject, self);
236           }
237         }
238       } else {
239         // Stub buffer is kept in case we don't get enough bytes to determine the
240         // size of the message (< 4 bytes)
241         if(self.stubBuffer != null && self.stubBuffer.length > 0) {
242           // If we have enough bytes to determine the message size let's do it
243           if(self.stubBuffer.length + data.length > 4) {
244             // Prepad the data
245             var newData = new Buffer(self.stubBuffer.length + data.length);
246             self.stubBuffer.copy(newData, 0);
247             data.copy(newData, self.stubBuffer.length);
248             // Reassign for parsing
249             data = newData;
250
251             // Reset state of buffer
252             self.buffer = null;
253             self.sizeOfMessage = 0;
254             self.bytesRead = 0;
255             self.stubBuffer = null;
256
257           } else {
258
259             // Add the the bytes to the stub buffer
260             var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
261             // Copy existing stub buffer
262             self.stubBuffer.copy(newStubBuffer, 0);
263             // Copy missing part of the data
264             data.copy(newStubBuffer, self.stubBuffer.length);
265             // Exit parsing loop
266             data = new Buffer(0);
267           }
268         } else {
269           if(data.length > 4) {
270             // Retrieve the message size
271             // var sizeOfMessage = data.readUInt32LE(0);
272             var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
273             // If we have a negative sizeOfMessage emit error and return
274             if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
275               var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
276                 sizeOfMessage: sizeOfMessage,
277                 bytesRead: self.bytesRead,
278                 stubBuffer: self.stubBuffer}};
279               // We got a parse Error fire it off then keep going
280               self.emit("parseError", errorObject, self);
281               return;
282             }
283
284             // Ensure that the size of message is larger than 0 and less than the max allowed
285             if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
286               self.buffer = new Buffer(sizeOfMessage);
287               // Copy all the data into the buffer
288               data.copy(self.buffer, 0);
289               // Update bytes read
290               self.bytesRead = data.length;
291               // Update sizeOfMessage
292               self.sizeOfMessage = sizeOfMessage;
293               // Ensure stub buffer is null
294               self.stubBuffer = null;
295               // Exit parsing loop
296               data = new Buffer(0);
297
298             } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
299               try {
300                 var emitBuffer = data;
301                 // Reset state of buffer
302                 self.buffer = null;
303                 self.sizeOfMessage = 0;
304                 self.bytesRead = 0;
305                 self.stubBuffer = null;
306                 // Exit parsing loop
307                 data = new Buffer(0);
308                 // Emit the message
309                 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
310               } catch (err) {
311                 self.emit("parseError", err, self);
312               }
313             } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
314               var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
315                 sizeOfMessage:sizeOfMessage,
316                 bytesRead:0,
317                 buffer:null,
318                 stubBuffer:null}};
319               // We got a parse Error fire it off then keep going
320               self.emit("parseError", errorObject, self);
321
322               // Clear out the state of the parser
323               self.buffer = null;
324               self.sizeOfMessage = 0;
325               self.bytesRead = 0;
326               self.stubBuffer = null;
327               // Exit parsing loop
328               data = new Buffer(0);
329             } else {
330               var emitBuffer = data.slice(0, sizeOfMessage);
331               // Reset state of buffer
332               self.buffer = null;
333               self.sizeOfMessage = 0;
334               self.bytesRead = 0;
335               self.stubBuffer = null;
336               // Copy rest of message
337               data = data.slice(sizeOfMessage);
338               // Emit the message
339               self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
340             }
341           } else {
342             // Create a buffer that contains the space for the non-complete message
343             self.stubBuffer = new Buffer(data.length)
344             // Copy the data to the stub buffer
345             data.copy(self.stubBuffer, 0);
346             // Exit parsing loop
347             data = new Buffer(0);
348           }
349         }
350       }
351     }
352   }
353 }
354
355 // List of socket level valid ssl options
356 var legalSslSocketOptions = ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers'
357   , 'NPNProtocols', 'ALPNProtocols', 'servername'
358   , 'secureProtocol', 'secureContext', 'session'
359   , 'minDHSize'];
360
361 function merge(options1, options2) {
362   // Merge in any allowed ssl options
363   for(var name in options2) {
364     if(options2[name] != null && legalSslSocketOptions.indexOf(name) != -1) {
365       options1[name] = options2[name];
366     }
367   }
368 }
369
370 /**
371  * Connect
372  * @method
373  */
374 Connection.prototype.connect = function(_options) {
375   var self = this;
376   _options = _options || {};
377   // Set the connections
378   if(connectionAccounting) connections[this.id] = this;
379   // Check if we are overriding the promoteLongs
380   if(typeof _options.promoteLongs == 'boolean') {
381     self.responseOptions.promoteLongs = _options.promoteLongs;
382     self.responseOptions.promoteValues = _options.promoteValues;
383     self.responseOptions.promoteBuffers = _options.promoteBuffers;
384   }
385
386   // Create new connection instance
387   self.connection = self.domainSocket
388     ? net.createConnection(self.host)
389     : net.createConnection(self.port, self.host);
390
391   // Set the options for the connection
392   self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
393   self.connection.setTimeout(self.connectionTimeout);
394   self.connection.setNoDelay(self.noDelay);
395
396   // If we have ssl enabled
397   if(self.ssl) {
398     var sslOptions = {
399         socket: self.connection
400       , rejectUnauthorized: self.rejectUnauthorized
401     }
402
403     // Merge in options
404     merge(sslOptions, this.options);
405     merge(sslOptions, _options);
406
407     // Set options for ssl
408     if(self.ca) sslOptions.ca = self.ca;
409     if(self.cert) sslOptions.cert = self.cert;
410     if(self.key) sslOptions.key = self.key;
411     if(self.passphrase) sslOptions.passphrase = self.passphrase;
412
413     // Override checkServerIdentity behavior
414     if(self.checkServerIdentity == false) {
415       // Skip the identiy check by retuning undefined as per node documents
416       // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
417       sslOptions.checkServerIdentity = function(servername, cert) {
418         return undefined;
419       }
420     } else if(typeof self.checkServerIdentity == 'function') {
421       sslOptions.checkServerIdentity = self.checkServerIdentity;
422     }
423
424     // Attempt SSL connection
425     self.connection = tls.connect(self.port, self.host, sslOptions, function() {
426       // Error on auth or skip
427       if(self.connection.authorizationError && self.rejectUnauthorized) {
428         return self.emit("error", self.connection.authorizationError, self, {ssl:true});
429       }
430
431       // Set socket timeout instead of connection timeout
432       self.connection.setTimeout(self.socketTimeout);
433       // We are done emit connect
434       self.emit('connect', self);
435     });
436     self.connection.setTimeout(self.connectionTimeout);
437   } else {
438     self.connection.on('connect', function() {
439       // Set socket timeout instead of connection timeout
440       self.connection.setTimeout(self.socketTimeout);
441       // Emit connect event
442       self.emit('connect', self);
443     });
444   }
445
446   // Add handlers for events
447   self.connection.once('error', errorHandler(self));
448   self.connection.once('timeout', timeoutHandler(self));
449   self.connection.once('close', closeHandler(self));
450   self.connection.on('data', dataHandler(self));
451 }
452
453 /**
454  * Unref this connection
455  * @method
456  * @return {boolean}
457  */
458 Connection.prototype.unref = function() {
459   if (this.connection) this.connection.unref();
460   else {
461     var self = this;
462     this.once('connect', function() {
463       self.connection.unref();
464     });
465   }
466 }
467
468 /**
469  * Destroy connection
470  * @method
471  */
472 Connection.prototype.destroy = function() {
473   // Set the connections
474   if(connectionAccounting) delete connections[this.id];
475   if(this.connection) {
476     this.connection.end();
477     this.connection.destroy();
478   }
479
480   this.destroyed = true;
481 }
482
483 /**
484  * Write to connection
485  * @method
486  * @param {Command} command Command to write out need to implement toBin and toBinUnified
487  */
488 Connection.prototype.write = function(buffer) {
489   // Debug Log
490   if(this.logger.isDebug()) {
491     if(!Array.isArray(buffer)) {
492       this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
493     } else {
494       for(var i = 0; i < buffer.length; i++)
495         this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
496     }
497   }
498
499   // Write out the command
500   if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
501   // Iterate over all buffers and write them in order to the socket
502   for(var i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
503 }
504
505 /**
506  * Return id of connection as a string
507  * @method
508  * @return {string}
509  */
510 Connection.prototype.toString = function() {
511   return "" + this.id;
512 }
513
514 /**
515  * Return json object of connection
516  * @method
517  * @return {object}
518  */
519 Connection.prototype.toJSON = function() {
520   return {id: this.id, host: this.host, port: this.port};
521 }
522
523 /**
524  * Is the connection connected
525  * @method
526  * @return {boolean}
527  */
528 Connection.prototype.isConnected = function() {
529   if(this.destroyed) return false;
530   return !this.connection.destroyed && this.connection.writable;
531 }
532
533 /**
534  * A server connect event, used to verify that the connection is up and running
535  *
536  * @event Connection#connect
537  * @type {Connection}
538  */
539
540 /**
541  * The server connection closed, all pool connections closed
542  *
543  * @event Connection#close
544  * @type {Connection}
545  */
546
547 /**
548  * The server connection caused an error, all pool connections closed
549  *
550  * @event Connection#error
551  * @type {Connection}
552  */
553
554 /**
555  * The server connection timed out, all pool connections closed
556  *
557  * @event Connection#timeout
558  * @type {Connection}
559  */
560
561 /**
562  * The driver experienced an invalid message, all pool connections closed
563  *
564  * @event Connection#parseError
565  * @type {Connection}
566  */
567
568 module.exports = Connection;