ab425522f641789df75720b9132961722fab9131
[aai/esr-gui.git] /
1 "use strict";
2
3 var inherits = require('util').inherits
4   , EventEmitter = require('events').EventEmitter
5   , net = require('net')
6   , tls = require('tls')
7   , crypto = require('crypto')
8   , f = require('util').format
9   , debugOptions = require('./utils').debugOptions
10   , Response = require('./commands').Response
11   , MongoError = require('../error')
12   , Logger = require('./logger');
13
14 var _id = 0;
15 var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
16   , 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
17   , 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity'];
18 var connectionAccounting = false;
19 var connections = {};
20
21 /**
22  * Creates a new Connection instance
23  * @class
24  * @param {string} options.host The server host
25  * @param {number} options.port The server port
26  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
27  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
28  * @param {boolean} [options.noDelay=true] TCP Connection no delay
29  * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
30  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
31  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
32  * @param {boolean} [options.ssl=false] Use SSL for connection
33  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
34  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
35  * @param {Buffer} [options.cert] SSL Certificate binary buffer
36  * @param {Buffer} [options.key] SSL Key file binary buffer
37  * @param {string} [options.passphrase] SSL Certificate pass phrase
38  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
39  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
40  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
41  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
42  * @fires Connection#connect
43  * @fires Connection#close
44  * @fires Connection#error
45  * @fires Connection#timeout
46  * @fires Connection#parseError
47  * @return {Connection} A cursor instance
48  */
49 var Connection = function(messageHandler, options) {
50   // Add event listener
51   EventEmitter.call(this);
52   // Set empty if no options passed
53   this.options = options || {};
54   // Identification information
55   this.id = _id++;
56   // Logger instance
57   this.logger = Logger('Connection', options);
58   // No bson parser passed in
59   if(!options.bson) throw new Error("must pass in valid bson parser");
60   // Get bson parser
61   this.bson = options.bson;
62   // Grouping tag used for debugging purposes
63   this.tag = options.tag;
64   // Message handler
65   this.messageHandler = messageHandler;
66
67   // Max BSON message size
68   this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
69   // Debug information
70   if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
71
72   // Default options
73   this.port = options.port || 27017;
74   this.host = options.host || 'localhost';
75   this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
76   this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
77   this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
78   this.connectionTimeout = options.connectionTimeout || 0;
79   this.socketTimeout = options.socketTimeout || 0;
80
81   // If connection was destroyed
82   this.destroyed = false;
83
84   // Check if we have a domain socket
85   this.domainSocket = this.host.indexOf('\/') != -1;
86
87   // Serialize commands using function
88   this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
89   this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
90
91   // SSL options
92   this.ca = options.ca || null;
93   this.cert = options.cert || null;
94   this.key = options.key || null;
95   this.passphrase = options.passphrase || null;
96   this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
97   this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
98   this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
99     || typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
100
101   // If ssl not enabled
102   if(!this.ssl) this.rejectUnauthorized = false;
103
104   // Response options
105   this.responseOptions = {
106     promoteLongs: typeof options.promoteLongs == 'boolean' ?  options.promoteLongs : true,
107     promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
108     promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers: false
109   }
110
111   // Flushing
112   this.flushing = false;
113   this.queue = [];
114
115   // Internal state
116   this.connection = null;
117   this.writeStream = null;
118
119   // Create hash method
120   var hash = crypto.createHash('sha1');
121   hash.update(f('%s:%s', this.host, this.port));
122
123   // Create a hash name
124   this.hashedName = hash.digest('hex');
125
126   // All operations in flight on the connection
127   this.workItems = [];
128 }
129
130 inherits(Connection, EventEmitter);
131
132 Connection.prototype.setSocketTimeout = function(value) {
133   if(this.connection) {
134     this.connection.setTimeout(value);
135   }
136 }
137
138 Connection.prototype.resetSocketTimeout = function() {
139   if(this.connection) {
140     this.connection.setTimeout(this.socketTimeout);
141   }
142 }
143
144 Connection.enableConnectionAccounting = function() {
145   connectionAccounting = true;
146   connections = {};
147 }
148
149 Connection.disableConnectionAccounting = function() {
150   connectionAccounting = false;
151 }
152
153 Connection.connections = function() {
154   return connections;
155 }
156
157 function deleteConnection(id) {
158   delete connections[id];
159 }
160
161 function addConnection(id, connection) {
162   connections[id] = connection;
163 }
164
165 //
166 // Connection handlers
167 var errorHandler = function(self) {
168   return function(err) {
169     if(connectionAccounting) deleteConnection(self.id);
170     // Debug information
171     if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
172     // Emit the error
173     if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
174   }
175 }
176
177 var timeoutHandler = function(self) {
178   return function() {
179     if(connectionAccounting) deleteConnection(self.id);
180     // Debug information
181     if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
182     // Emit timeout error
183     self.emit("timeout"
184       , MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
185       , self);
186   }
187 }
188
189 var closeHandler = function(self) {
190   return function(hadError) {
191     if(connectionAccounting) deleteConnection(self.id);
192     // Debug information
193     if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
194
195     // Emit close event
196     if(!hadError) {
197       self.emit("close"
198         , MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
199         , self);
200     }
201   }
202 }
203
204 var dataHandler = function(self) {
205   return function(data) {
206     // Parse until we are done with the data
207     while(data.length > 0) {
208       // If we still have bytes to read on the current message
209       if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
210         // Calculate the amount of remaining bytes
211         var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
212         // Check if the current chunk contains the rest of the message
213         if(remainingBytesToRead > data.length) {
214           // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
215           data.copy(self.buffer, self.bytesRead);
216           // Adjust the number of bytes read so it point to the correct index in the buffer
217           self.bytesRead = self.bytesRead + data.length;
218
219           // Reset state of buffer
220           data = new Buffer(0);
221         } else {
222           // Copy the missing part of the data into our current buffer
223           data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
224           // Slice the overflow into a new buffer that we will then re-parse
225           data = data.slice(remainingBytesToRead);
226
227           // Emit current complete message
228           try {
229             var emitBuffer = self.buffer;
230             // Reset state of buffer
231             self.buffer = null;
232             self.sizeOfMessage = 0;
233             self.bytesRead = 0;
234             self.stubBuffer = null;
235             // Emit the buffer
236             self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
237           } catch(err) {
238             var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
239               sizeOfMessage:self.sizeOfMessage,
240               bytesRead:self.bytesRead,
241               stubBuffer:self.stubBuffer}};
242             // We got a parse Error fire it off then keep going
243             self.emit("parseError", errorObject, self);
244           }
245         }
246       } else {
247         // Stub buffer is kept in case we don't get enough bytes to determine the
248         // size of the message (< 4 bytes)
249         if(self.stubBuffer != null && self.stubBuffer.length > 0) {
250           // If we have enough bytes to determine the message size let's do it
251           if(self.stubBuffer.length + data.length > 4) {
252             // Prepad the data
253             var newData = new Buffer(self.stubBuffer.length + data.length);
254             self.stubBuffer.copy(newData, 0);
255             data.copy(newData, self.stubBuffer.length);
256             // Reassign for parsing
257             data = newData;
258
259             // Reset state of buffer
260             self.buffer = null;
261             self.sizeOfMessage = 0;
262             self.bytesRead = 0;
263             self.stubBuffer = null;
264
265           } else {
266
267             // Add the the bytes to the stub buffer
268             var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
269             // Copy existing stub buffer
270             self.stubBuffer.copy(newStubBuffer, 0);
271             // Copy missing part of the data
272             data.copy(newStubBuffer, self.stubBuffer.length);
273             // Exit parsing loop
274             data = new Buffer(0);
275           }
276         } else {
277           if(data.length > 4) {
278             // Retrieve the message size
279             // var sizeOfMessage = data.readUInt32LE(0);
280             var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
281             // If we have a negative sizeOfMessage emit error and return
282             if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
283               errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
284                 sizeOfMessage: sizeOfMessage,
285                 bytesRead: self.bytesRead,
286                 stubBuffer: self.stubBuffer}};
287               // We got a parse Error fire it off then keep going
288               self.emit("parseError", errorObject, self);
289               return;
290             }
291
292             // Ensure that the size of message is larger than 0 and less than the max allowed
293             if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
294               self.buffer = new Buffer(sizeOfMessage);
295               // Copy all the data into the buffer
296               data.copy(self.buffer, 0);
297               // Update bytes read
298               self.bytesRead = data.length;
299               // Update sizeOfMessage
300               self.sizeOfMessage = sizeOfMessage;
301               // Ensure stub buffer is null
302               self.stubBuffer = null;
303               // Exit parsing loop
304               data = new Buffer(0);
305
306             } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
307               try {
308                 emitBuffer = data;
309                 // Reset state of buffer
310                 self.buffer = null;
311                 self.sizeOfMessage = 0;
312                 self.bytesRead = 0;
313                 self.stubBuffer = null;
314                 // Exit parsing loop
315                 data = new Buffer(0);
316                 // Emit the message
317                 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
318               } catch (err) {
319                 self.emit("parseError", err, self);
320               }
321             } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
322               errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
323                 sizeOfMessage:sizeOfMessage,
324                 bytesRead:0,
325                 buffer:null,
326                 stubBuffer:null}};
327               // We got a parse Error fire it off then keep going
328               self.emit("parseError", errorObject, self);
329
330               // Clear out the state of the parser
331               self.buffer = null;
332               self.sizeOfMessage = 0;
333               self.bytesRead = 0;
334               self.stubBuffer = null;
335               // Exit parsing loop
336               data = new Buffer(0);
337             } else {
338               emitBuffer = data.slice(0, sizeOfMessage);
339               // Reset state of buffer
340               self.buffer = null;
341               self.sizeOfMessage = 0;
342               self.bytesRead = 0;
343               self.stubBuffer = null;
344               // Copy rest of message
345               data = data.slice(sizeOfMessage);
346               // Emit the message
347               self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
348             }
349           } else {
350             // Create a buffer that contains the space for the non-complete message
351             self.stubBuffer = new Buffer(data.length)
352             // Copy the data to the stub buffer
353             data.copy(self.stubBuffer, 0);
354             // Exit parsing loop
355             data = new Buffer(0);
356           }
357         }
358       }
359     }
360   }
361 }
362
363 // List of socket level valid ssl options
364 var legalSslSocketOptions = ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers'
365   , 'NPNProtocols', 'ALPNProtocols', 'servername'
366   , 'secureProtocol', 'secureContext', 'session'
367   , 'minDHSize'];
368
369 function merge(options1, options2) {
370   // Merge in any allowed ssl options
371   for(var name in options2) {
372     if(options2[name] != null && legalSslSocketOptions.indexOf(name) != -1) {
373       options1[name] = options2[name];
374     }
375   }
376 }
377
378 /**
379  * Connect
380  * @method
381  */
382 Connection.prototype.connect = function(_options) {
383   var self = this;
384   _options = _options || {};
385   // Set the connections
386   if(connectionAccounting) addConnection(this.id, this);
387   // Check if we are overriding the promoteLongs
388   if(typeof _options.promoteLongs == 'boolean') {
389     self.responseOptions.promoteLongs = _options.promoteLongs;
390     self.responseOptions.promoteValues = _options.promoteValues;
391     self.responseOptions.promoteBuffers = _options.promoteBuffers;
392   }
393
394   // Create new connection instance
395   self.connection = self.domainSocket
396     ? net.createConnection(self.host)
397     : net.createConnection(self.port, self.host);
398
399   // Set the options for the connection
400   self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
401   self.connection.setTimeout(self.connectionTimeout);
402   self.connection.setNoDelay(self.noDelay);
403
404   // If we have ssl enabled
405   if(self.ssl) {
406     var sslOptions = {
407         socket: self.connection
408       , rejectUnauthorized: self.rejectUnauthorized
409     }
410
411     // Merge in options
412     merge(sslOptions, this.options);
413     merge(sslOptions, _options);
414
415     // Set options for ssl
416     if(self.ca) sslOptions.ca = self.ca;
417     if(self.cert) sslOptions.cert = self.cert;
418     if(self.key) sslOptions.key = self.key;
419     if(self.passphrase) sslOptions.passphrase = self.passphrase;
420
421     // Override checkServerIdentity behavior
422     if(self.checkServerIdentity == false) {
423       // Skip the identiy check by retuning undefined as per node documents
424       // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
425       sslOptions.checkServerIdentity = function() {
426         return undefined;
427       }
428     } else if(typeof self.checkServerIdentity == 'function') {
429       sslOptions.checkServerIdentity = self.checkServerIdentity;
430     }
431
432     // Set default sni servername to be the same as host
433     if(sslOptions.servername == null) {
434       sslOptions.servername = self.host;
435     }
436
437     // Attempt SSL connection
438     self.connection = tls.connect(self.port, self.host, sslOptions, function() {
439       // Error on auth or skip
440       if(self.connection.authorizationError && self.rejectUnauthorized) {
441         return self.emit("error", self.connection.authorizationError, self, {ssl:true});
442       }
443
444       // Set socket timeout instead of connection timeout
445       self.connection.setTimeout(self.socketTimeout);
446       // We are done emit connect
447       self.emit('connect', self);
448     });
449     self.connection.setTimeout(self.connectionTimeout);
450   } else {
451     self.connection.on('connect', function() {
452       // Set socket timeout instead of connection timeout
453       self.connection.setTimeout(self.socketTimeout);
454       // Emit connect event
455       self.emit('connect', self);
456     });
457   }
458
459   // Add handlers for events
460   self.connection.once('error', errorHandler(self));
461   self.connection.once('timeout', timeoutHandler(self));
462   self.connection.once('close', closeHandler(self));
463   self.connection.on('data', dataHandler(self));
464 }
465
466 /**
467  * Unref this connection
468  * @method
469  * @return {boolean}
470  */
471 Connection.prototype.unref = function() {
472   if (this.connection) this.connection.unref();
473   else {
474     var self = this;
475     this.once('connect', function() {
476       self.connection.unref();
477     });
478   }
479 }
480
481 /**
482  * Destroy connection
483  * @method
484  */
485 Connection.prototype.destroy = function() {
486   // Set the connections
487   if(connectionAccounting) deleteConnection(this.id);
488   if(this.connection) {
489     this.connection.end();
490     this.connection.destroy();
491   }
492
493   this.destroyed = true;
494 }
495
496 /**
497  * Write to connection
498  * @method
499  * @param {Command} command Command to write out need to implement toBin and toBinUnified
500  */
501 Connection.prototype.write = function(buffer) {
502   var i;
503   // Debug Log
504   if(this.logger.isDebug()) {
505     if(!Array.isArray(buffer)) {
506       this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
507     } else {
508       for(i = 0; i < buffer.length; i++)
509         this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
510     }
511   }
512
513   // Write out the command
514   if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
515   // Iterate over all buffers and write them in order to the socket
516   for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
517 }
518
519 /**
520  * Return id of connection as a string
521  * @method
522  * @return {string}
523  */
524 Connection.prototype.toString = function() {
525   return "" + this.id;
526 }
527
528 /**
529  * Return json object of connection
530  * @method
531  * @return {object}
532  */
533 Connection.prototype.toJSON = function() {
534   return {id: this.id, host: this.host, port: this.port};
535 }
536
537 /**
538  * Is the connection connected
539  * @method
540  * @return {boolean}
541  */
542 Connection.prototype.isConnected = function() {
543   if(this.destroyed) return false;
544   return !this.connection.destroyed && this.connection.writable;
545 }
546
547 /**
548  * A server connect event, used to verify that the connection is up and running
549  *
550  * @event Connection#connect
551  * @type {Connection}
552  */
553
554 /**
555  * The server connection closed, all pool connections closed
556  *
557  * @event Connection#close
558  * @type {Connection}
559  */
560
561 /**
562  * The server connection caused an error, all pool connections closed
563  *
564  * @event Connection#error
565  * @type {Connection}
566  */
567
568 /**
569  * The server connection timed out, all pool connections closed
570  *
571  * @event Connection#timeout
572  * @type {Connection}
573  */
574
575 /**
576  * The driver experienced an invalid message, all pool connections closed
577  *
578  * @event Connection#parseError
579  * @type {Connection}
580  */
581
582 module.exports = Connection;