3 var inherits = require('util').inherits
4 , EventEmitter = require('events').EventEmitter
7 , crypto = require('crypto')
8 , f = require('util').format
9 , debugOptions = require('./utils').debugOptions
10 , Response = require('./commands').Response
11 , MongoError = require('../error')
12 , Logger = require('./logger');
15 var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
16 , 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
17 , 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity'];
18 var connectionAccounting = false;
22 * Creates a new Connection instance
24 * @param {string} options.host The server host
25 * @param {number} options.port The server port
26 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
27 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
28 * @param {boolean} [options.noDelay=true] TCP Connection no delay
29 * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
30 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
31 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
32 * @param {boolean} [options.ssl=false] Use SSL for connection
33 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
34 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
35 * @param {Buffer} [options.cert] SSL Certificate binary buffer
36 * @param {Buffer} [options.key] SSL Key file binary buffer
37 * @param {string} [options.passphrase] SSL Certificate pass phrase
38 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
39 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
40 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
41 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
42 * @fires Connection#connect
43 * @fires Connection#close
44 * @fires Connection#error
45 * @fires Connection#timeout
46 * @fires Connection#parseError
47 * @return {Connection} A cursor instance
49 var Connection = function(messageHandler, options) {
51 EventEmitter.call(this);
52 // Set empty if no options passed
53 this.options = options || {};
54 // Identification information
57 this.logger = Logger('Connection', options);
58 // No bson parser passed in
59 if(!options.bson) throw new Error("must pass in valid bson parser");
61 this.bson = options.bson;
62 // Grouping tag used for debugging purposes
63 this.tag = options.tag;
65 this.messageHandler = messageHandler;
67 // Max BSON message size
68 this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
70 if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
73 this.port = options.port || 27017;
74 this.host = options.host || 'localhost';
75 this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
76 this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
77 this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
78 this.connectionTimeout = options.connectionTimeout || 0;
79 this.socketTimeout = options.socketTimeout || 0;
81 // If connection was destroyed
82 this.destroyed = false;
84 // Check if we have a domain socket
85 this.domainSocket = this.host.indexOf('\/') != -1;
87 // Serialize commands using function
88 this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
89 this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
92 this.ca = options.ca || null;
93 this.cert = options.cert || null;
94 this.key = options.key || null;
95 this.passphrase = options.passphrase || null;
96 this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
97 this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
98 this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
99 || typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
101 // If ssl not enabled
102 if(!this.ssl) this.rejectUnauthorized = false;
105 this.responseOptions = {
106 promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true,
107 promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
108 promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers: false
112 this.flushing = false;
116 this.connection = null;
117 this.writeStream = null;
119 // Create hash method
120 var hash = crypto.createHash('sha1');
121 hash.update(f('%s:%s', this.host, this.port));
123 // Create a hash name
124 this.hashedName = hash.digest('hex');
126 // All operations in flight on the connection
130 inherits(Connection, EventEmitter);
132 Connection.prototype.setSocketTimeout = function(value) {
133 if(this.connection) {
134 this.connection.setTimeout(value);
138 Connection.prototype.resetSocketTimeout = function() {
139 if(this.connection) {
140 this.connection.setTimeout(this.socketTimeout);
144 Connection.enableConnectionAccounting = function() {
145 connectionAccounting = true;
149 Connection.disableConnectionAccounting = function() {
150 connectionAccounting = false;
153 Connection.connections = function() {
157 function deleteConnection(id) {
158 delete connections[id];
161 function addConnection(id, connection) {
162 connections[id] = connection;
166 // Connection handlers
167 var errorHandler = function(self) {
168 return function(err) {
169 if(connectionAccounting) deleteConnection(self.id);
171 if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
173 if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
177 var timeoutHandler = function(self) {
179 if(connectionAccounting) deleteConnection(self.id);
181 if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
182 // Emit timeout error
184 , MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
189 var closeHandler = function(self) {
190 return function(hadError) {
191 if(connectionAccounting) deleteConnection(self.id);
193 if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
198 , MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
204 var dataHandler = function(self) {
205 return function(data) {
206 // Parse until we are done with the data
207 while(data.length > 0) {
208 // If we still have bytes to read on the current message
209 if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
210 // Calculate the amount of remaining bytes
211 var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
212 // Check if the current chunk contains the rest of the message
213 if(remainingBytesToRead > data.length) {
214 // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
215 data.copy(self.buffer, self.bytesRead);
216 // Adjust the number of bytes read so it point to the correct index in the buffer
217 self.bytesRead = self.bytesRead + data.length;
219 // Reset state of buffer
220 data = new Buffer(0);
222 // Copy the missing part of the data into our current buffer
223 data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
224 // Slice the overflow into a new buffer that we will then re-parse
225 data = data.slice(remainingBytesToRead);
227 // Emit current complete message
229 var emitBuffer = self.buffer;
230 // Reset state of buffer
232 self.sizeOfMessage = 0;
234 self.stubBuffer = null;
236 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
238 var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
239 sizeOfMessage:self.sizeOfMessage,
240 bytesRead:self.bytesRead,
241 stubBuffer:self.stubBuffer}};
242 // We got a parse Error fire it off then keep going
243 self.emit("parseError", errorObject, self);
247 // Stub buffer is kept in case we don't get enough bytes to determine the
248 // size of the message (< 4 bytes)
249 if(self.stubBuffer != null && self.stubBuffer.length > 0) {
250 // If we have enough bytes to determine the message size let's do it
251 if(self.stubBuffer.length + data.length > 4) {
253 var newData = new Buffer(self.stubBuffer.length + data.length);
254 self.stubBuffer.copy(newData, 0);
255 data.copy(newData, self.stubBuffer.length);
256 // Reassign for parsing
259 // Reset state of buffer
261 self.sizeOfMessage = 0;
263 self.stubBuffer = null;
267 // Add the the bytes to the stub buffer
268 var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
269 // Copy existing stub buffer
270 self.stubBuffer.copy(newStubBuffer, 0);
271 // Copy missing part of the data
272 data.copy(newStubBuffer, self.stubBuffer.length);
274 data = new Buffer(0);
277 if(data.length > 4) {
278 // Retrieve the message size
279 // var sizeOfMessage = data.readUInt32LE(0);
280 var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
281 // If we have a negative sizeOfMessage emit error and return
282 if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
283 errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
284 sizeOfMessage: sizeOfMessage,
285 bytesRead: self.bytesRead,
286 stubBuffer: self.stubBuffer}};
287 // We got a parse Error fire it off then keep going
288 self.emit("parseError", errorObject, self);
292 // Ensure that the size of message is larger than 0 and less than the max allowed
293 if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
294 self.buffer = new Buffer(sizeOfMessage);
295 // Copy all the data into the buffer
296 data.copy(self.buffer, 0);
298 self.bytesRead = data.length;
299 // Update sizeOfMessage
300 self.sizeOfMessage = sizeOfMessage;
301 // Ensure stub buffer is null
302 self.stubBuffer = null;
304 data = new Buffer(0);
306 } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
309 // Reset state of buffer
311 self.sizeOfMessage = 0;
313 self.stubBuffer = null;
315 data = new Buffer(0);
317 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
319 self.emit("parseError", err, self);
321 } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
322 errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
323 sizeOfMessage:sizeOfMessage,
327 // We got a parse Error fire it off then keep going
328 self.emit("parseError", errorObject, self);
330 // Clear out the state of the parser
332 self.sizeOfMessage = 0;
334 self.stubBuffer = null;
336 data = new Buffer(0);
338 emitBuffer = data.slice(0, sizeOfMessage);
339 // Reset state of buffer
341 self.sizeOfMessage = 0;
343 self.stubBuffer = null;
344 // Copy rest of message
345 data = data.slice(sizeOfMessage);
347 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
350 // Create a buffer that contains the space for the non-complete message
351 self.stubBuffer = new Buffer(data.length)
352 // Copy the data to the stub buffer
353 data.copy(self.stubBuffer, 0);
355 data = new Buffer(0);
363 // List of socket level valid ssl options
364 var legalSslSocketOptions = ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers'
365 , 'NPNProtocols', 'ALPNProtocols', 'servername'
366 , 'secureProtocol', 'secureContext', 'session'
369 function merge(options1, options2) {
370 // Merge in any allowed ssl options
371 for(var name in options2) {
372 if(options2[name] != null && legalSslSocketOptions.indexOf(name) != -1) {
373 options1[name] = options2[name];
382 Connection.prototype.connect = function(_options) {
384 _options = _options || {};
385 // Set the connections
386 if(connectionAccounting) addConnection(this.id, this);
387 // Check if we are overriding the promoteLongs
388 if(typeof _options.promoteLongs == 'boolean') {
389 self.responseOptions.promoteLongs = _options.promoteLongs;
390 self.responseOptions.promoteValues = _options.promoteValues;
391 self.responseOptions.promoteBuffers = _options.promoteBuffers;
394 // Create new connection instance
395 self.connection = self.domainSocket
396 ? net.createConnection(self.host)
397 : net.createConnection(self.port, self.host);
399 // Set the options for the connection
400 self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
401 self.connection.setTimeout(self.connectionTimeout);
402 self.connection.setNoDelay(self.noDelay);
404 // If we have ssl enabled
407 socket: self.connection
408 , rejectUnauthorized: self.rejectUnauthorized
412 merge(sslOptions, this.options);
413 merge(sslOptions, _options);
415 // Set options for ssl
416 if(self.ca) sslOptions.ca = self.ca;
417 if(self.cert) sslOptions.cert = self.cert;
418 if(self.key) sslOptions.key = self.key;
419 if(self.passphrase) sslOptions.passphrase = self.passphrase;
421 // Override checkServerIdentity behavior
422 if(self.checkServerIdentity == false) {
423 // Skip the identiy check by retuning undefined as per node documents
424 // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
425 sslOptions.checkServerIdentity = function() {
428 } else if(typeof self.checkServerIdentity == 'function') {
429 sslOptions.checkServerIdentity = self.checkServerIdentity;
432 // Set default sni servername to be the same as host
433 if(sslOptions.servername == null) {
434 sslOptions.servername = self.host;
437 // Attempt SSL connection
438 self.connection = tls.connect(self.port, self.host, sslOptions, function() {
439 // Error on auth or skip
440 if(self.connection.authorizationError && self.rejectUnauthorized) {
441 return self.emit("error", self.connection.authorizationError, self, {ssl:true});
444 // Set socket timeout instead of connection timeout
445 self.connection.setTimeout(self.socketTimeout);
446 // We are done emit connect
447 self.emit('connect', self);
449 self.connection.setTimeout(self.connectionTimeout);
451 self.connection.on('connect', function() {
452 // Set socket timeout instead of connection timeout
453 self.connection.setTimeout(self.socketTimeout);
454 // Emit connect event
455 self.emit('connect', self);
459 // Add handlers for events
460 self.connection.once('error', errorHandler(self));
461 self.connection.once('timeout', timeoutHandler(self));
462 self.connection.once('close', closeHandler(self));
463 self.connection.on('data', dataHandler(self));
467 * Unref this connection
471 Connection.prototype.unref = function() {
472 if (this.connection) this.connection.unref();
475 this.once('connect', function() {
476 self.connection.unref();
485 Connection.prototype.destroy = function() {
486 // Set the connections
487 if(connectionAccounting) deleteConnection(this.id);
488 if(this.connection) {
489 this.connection.end();
490 this.connection.destroy();
493 this.destroyed = true;
497 * Write to connection
499 * @param {Command} command Command to write out need to implement toBin and toBinUnified
501 Connection.prototype.write = function(buffer) {
504 if(this.logger.isDebug()) {
505 if(!Array.isArray(buffer)) {
506 this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
508 for(i = 0; i < buffer.length; i++)
509 this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
513 // Write out the command
514 if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
515 // Iterate over all buffers and write them in order to the socket
516 for(i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
520 * Return id of connection as a string
524 Connection.prototype.toString = function() {
529 * Return json object of connection
533 Connection.prototype.toJSON = function() {
534 return {id: this.id, host: this.host, port: this.port};
538 * Is the connection connected
542 Connection.prototype.isConnected = function() {
543 if(this.destroyed) return false;
544 return !this.connection.destroyed && this.connection.writable;
548 * A server connect event, used to verify that the connection is up and running
550 * @event Connection#connect
555 * The server connection closed, all pool connections closed
557 * @event Connection#close
562 * The server connection caused an error, all pool connections closed
564 * @event Connection#error
569 * The server connection timed out, all pool connections closed
571 * @event Connection#timeout
576 * The driver experienced an invalid message, all pool connections closed
578 * @event Connection#parseError
582 module.exports = Connection;