3 var inherits = require('util').inherits
4 , EventEmitter = require('events').EventEmitter
7 , crypto = require('crypto')
8 , f = require('util').format
9 , debugOptions = require('./utils').debugOptions
10 , Response = require('./commands').Response
11 , MongoError = require('../error')
12 , Logger = require('./logger');
15 var debugFields = ['host', 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay'
16 , 'connectionTimeout', 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert'
17 , 'rejectUnauthorized', 'promoteLongs', 'promoteValues', 'promoteBuffers', 'checkServerIdentity'];
18 var connectionAccounting = false;
22 * Creates a new Connection instance
24 * @param {string} options.host The server host
25 * @param {number} options.port The server port
26 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
27 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
28 * @param {boolean} [options.noDelay=true] TCP Connection no delay
29 * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
30 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
31 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
32 * @param {boolean} [options.ssl=false] Use SSL for connection
33 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
34 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
35 * @param {Buffer} [options.cert] SSL Certificate binary buffer
36 * @param {Buffer} [options.key] SSL Key file binary buffer
37 * @param {string} [options.passphrase] SSL Certificate pass phrase
38 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
39 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
40 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
41 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
42 * @fires Connection#connect
43 * @fires Connection#close
44 * @fires Connection#error
45 * @fires Connection#timeout
46 * @fires Connection#parseError
47 * @return {Connection} A cursor instance
49 var Connection = function(messageHandler, options) {
51 EventEmitter.call(this);
52 // Set empty if no options passed
53 this.options = options || {};
54 // Identification information
57 this.logger = Logger('Connection', options);
58 // No bson parser passed in
59 if(!options.bson) throw new Error("must pass in valid bson parser");
61 this.bson = options.bson;
62 // Grouping tag used for debugging purposes
63 this.tag = options.tag;
65 this.messageHandler = messageHandler;
67 // Max BSON message size
68 this.maxBsonMessageSize = options.maxBsonMessageSize || (1024 * 1024 * 16 * 4);
70 if(this.logger.isDebug()) this.logger.debug(f('creating connection %s with options [%s]', this.id, JSON.stringify(debugOptions(debugFields, options))));
73 this.port = options.port || 27017;
74 this.host = options.host || 'localhost';
75 this.keepAlive = typeof options.keepAlive == 'boolean' ? options.keepAlive : true;
76 this.keepAliveInitialDelay = options.keepAliveInitialDelay || 0;
77 this.noDelay = typeof options.noDelay == 'boolean' ? options.noDelay : true;
78 this.connectionTimeout = options.connectionTimeout || 0;
79 this.socketTimeout = options.socketTimeout || 0;
81 // If connection was destroyed
82 this.destroyed = false;
84 // Check if we have a domain socket
85 this.domainSocket = this.host.indexOf('\/') != -1;
87 // Serialize commands using function
88 this.singleBufferSerializtion = typeof options.singleBufferSerializtion == 'boolean' ? options.singleBufferSerializtion : true;
89 this.serializationFunction = this.singleBufferSerializtion ? 'toBinUnified' : 'toBin';
92 this.ca = options.ca || null;
93 this.cert = options.cert || null;
94 this.key = options.key || null;
95 this.passphrase = options.passphrase || null;
96 this.ssl = typeof options.ssl == 'boolean' ? options.ssl : false;
97 this.rejectUnauthorized = typeof options.rejectUnauthorized == 'boolean' ? options.rejectUnauthorized : true;
98 this.checkServerIdentity = typeof options.checkServerIdentity == 'boolean'
99 || typeof options.checkServerIdentity == 'function' ? options.checkServerIdentity : true;
101 // If ssl not enabled
102 if(!this.ssl) this.rejectUnauthorized = false;
105 this.responseOptions = {
106 promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true,
107 promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
108 promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers: false
112 this.flushing = false;
116 this.connection = null;
117 this.writeStream = null;
119 // Create hash method
120 var hash = crypto.createHash('sha1');
121 hash.update(f('%s:%s', this.host, this.port));
123 // Create a hash name
124 this.hashedName = hash.digest('hex');
126 // All operations in flight on the connection
130 inherits(Connection, EventEmitter);
132 Connection.prototype.setSocketTimeout = function(value) {
133 if(this.connection) {
134 this.connection.setTimeout(value);
138 Connection.prototype.resetSocketTimeout = function(value) {
139 if(this.connection) {
140 this.connection.setTimeout(this.socketTimeout);;
144 Connection.enableConnectionAccounting = function() {
145 connectionAccounting = true;
149 Connection.disableConnectionAccounting = function() {
150 connectionAccounting = false;
153 Connection.connections = function() {
158 // Connection handlers
159 var errorHandler = function(self) {
160 return function(err) {
161 if(connectionAccounting) delete connections[self.id];
163 if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] errored out with [%s]', self.id, self.host, self.port, JSON.stringify(err)));
165 if(self.listeners('error').length > 0) self.emit("error", MongoError.create(err), self);
169 var timeoutHandler = function(self) {
170 return function(err) {
171 if(connectionAccounting) delete connections[self.id];
173 if(self.logger.isDebug()) self.logger.debug(f('connection %s for [%s:%s] timed out', self.id, self.host, self.port));
174 // Emit timeout error
176 , MongoError.create(f("connection %s to %s:%s timed out", self.id, self.host, self.port))
181 var closeHandler = function(self) {
182 return function(hadError) {
183 if(connectionAccounting) delete connections[self.id];
185 if(self.logger.isDebug()) self.logger.debug(f('connection %s with for [%s:%s] closed', self.id, self.host, self.port));
190 , MongoError.create(f("connection %s to %s:%s closed", self.id, self.host, self.port))
196 var dataHandler = function(self) {
197 return function(data) {
198 // Parse until we are done with the data
199 while(data.length > 0) {
200 // If we still have bytes to read on the current message
201 if(self.bytesRead > 0 && self.sizeOfMessage > 0) {
202 // Calculate the amount of remaining bytes
203 var remainingBytesToRead = self.sizeOfMessage - self.bytesRead;
204 // Check if the current chunk contains the rest of the message
205 if(remainingBytesToRead > data.length) {
206 // Copy the new data into the exiting buffer (should have been allocated when we know the message size)
207 data.copy(self.buffer, self.bytesRead);
208 // Adjust the number of bytes read so it point to the correct index in the buffer
209 self.bytesRead = self.bytesRead + data.length;
211 // Reset state of buffer
212 data = new Buffer(0);
214 // Copy the missing part of the data into our current buffer
215 data.copy(self.buffer, self.bytesRead, 0, remainingBytesToRead);
216 // Slice the overflow into a new buffer that we will then re-parse
217 data = data.slice(remainingBytesToRead);
219 // Emit current complete message
221 var emitBuffer = self.buffer;
222 // Reset state of buffer
224 self.sizeOfMessage = 0;
226 self.stubBuffer = null;
228 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
230 var errorObject = {err:"socketHandler", trace:err, bin:self.buffer, parseState:{
231 sizeOfMessage:self.sizeOfMessage,
232 bytesRead:self.bytesRead,
233 stubBuffer:self.stubBuffer}};
234 // We got a parse Error fire it off then keep going
235 self.emit("parseError", errorObject, self);
239 // Stub buffer is kept in case we don't get enough bytes to determine the
240 // size of the message (< 4 bytes)
241 if(self.stubBuffer != null && self.stubBuffer.length > 0) {
242 // If we have enough bytes to determine the message size let's do it
243 if(self.stubBuffer.length + data.length > 4) {
245 var newData = new Buffer(self.stubBuffer.length + data.length);
246 self.stubBuffer.copy(newData, 0);
247 data.copy(newData, self.stubBuffer.length);
248 // Reassign for parsing
251 // Reset state of buffer
253 self.sizeOfMessage = 0;
255 self.stubBuffer = null;
259 // Add the the bytes to the stub buffer
260 var newStubBuffer = new Buffer(self.stubBuffer.length + data.length);
261 // Copy existing stub buffer
262 self.stubBuffer.copy(newStubBuffer, 0);
263 // Copy missing part of the data
264 data.copy(newStubBuffer, self.stubBuffer.length);
266 data = new Buffer(0);
269 if(data.length > 4) {
270 // Retrieve the message size
271 // var sizeOfMessage = data.readUInt32LE(0);
272 var sizeOfMessage = data[0] | data[1] << 8 | data[2] << 16 | data[3] << 24;
273 // If we have a negative sizeOfMessage emit error and return
274 if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonMessageSize) {
275 var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
276 sizeOfMessage: sizeOfMessage,
277 bytesRead: self.bytesRead,
278 stubBuffer: self.stubBuffer}};
279 // We got a parse Error fire it off then keep going
280 self.emit("parseError", errorObject, self);
284 // Ensure that the size of message is larger than 0 and less than the max allowed
285 if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage > data.length) {
286 self.buffer = new Buffer(sizeOfMessage);
287 // Copy all the data into the buffer
288 data.copy(self.buffer, 0);
290 self.bytesRead = data.length;
291 // Update sizeOfMessage
292 self.sizeOfMessage = sizeOfMessage;
293 // Ensure stub buffer is null
294 self.stubBuffer = null;
296 data = new Buffer(0);
298 } else if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonMessageSize && sizeOfMessage == data.length) {
300 var emitBuffer = data;
301 // Reset state of buffer
303 self.sizeOfMessage = 0;
305 self.stubBuffer = null;
307 data = new Buffer(0);
309 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
311 self.emit("parseError", err, self);
313 } else if(sizeOfMessage <= 4 || sizeOfMessage > self.maxBsonMessageSize) {
314 var errorObject = {err:"socketHandler", trace:null, bin:data, parseState:{
315 sizeOfMessage:sizeOfMessage,
319 // We got a parse Error fire it off then keep going
320 self.emit("parseError", errorObject, self);
322 // Clear out the state of the parser
324 self.sizeOfMessage = 0;
326 self.stubBuffer = null;
328 data = new Buffer(0);
330 var emitBuffer = data.slice(0, sizeOfMessage);
331 // Reset state of buffer
333 self.sizeOfMessage = 0;
335 self.stubBuffer = null;
336 // Copy rest of message
337 data = data.slice(sizeOfMessage);
339 self.messageHandler(new Response(self.bson, emitBuffer, self.responseOptions), self);
342 // Create a buffer that contains the space for the non-complete message
343 self.stubBuffer = new Buffer(data.length)
344 // Copy the data to the stub buffer
345 data.copy(self.stubBuffer, 0);
347 data = new Buffer(0);
355 // List of socket level valid ssl options
356 var legalSslSocketOptions = ['pfx', 'key', 'passphrase', 'cert', 'ca', 'ciphers'
357 , 'NPNProtocols', 'ALPNProtocols', 'servername'
358 , 'secureProtocol', 'secureContext', 'session'
361 function merge(options1, options2) {
362 // Merge in any allowed ssl options
363 for(var name in options2) {
364 if(options2[name] != null && legalSslSocketOptions.indexOf(name) != -1) {
365 options1[name] = options2[name];
374 Connection.prototype.connect = function(_options) {
376 _options = _options || {};
377 // Set the connections
378 if(connectionAccounting) connections[this.id] = this;
379 // Check if we are overriding the promoteLongs
380 if(typeof _options.promoteLongs == 'boolean') {
381 self.responseOptions.promoteLongs = _options.promoteLongs;
382 self.responseOptions.promoteValues = _options.promoteValues;
383 self.responseOptions.promoteBuffers = _options.promoteBuffers;
386 // Create new connection instance
387 self.connection = self.domainSocket
388 ? net.createConnection(self.host)
389 : net.createConnection(self.port, self.host);
391 // Set the options for the connection
392 self.connection.setKeepAlive(self.keepAlive, self.keepAliveInitialDelay);
393 self.connection.setTimeout(self.connectionTimeout);
394 self.connection.setNoDelay(self.noDelay);
396 // If we have ssl enabled
399 socket: self.connection
400 , rejectUnauthorized: self.rejectUnauthorized
404 merge(sslOptions, this.options);
405 merge(sslOptions, _options);
407 // Set options for ssl
408 if(self.ca) sslOptions.ca = self.ca;
409 if(self.cert) sslOptions.cert = self.cert;
410 if(self.key) sslOptions.key = self.key;
411 if(self.passphrase) sslOptions.passphrase = self.passphrase;
413 // Override checkServerIdentity behavior
414 if(self.checkServerIdentity == false) {
415 // Skip the identiy check by retuning undefined as per node documents
416 // https://nodejs.org/api/tls.html#tls_tls_connect_options_callback
417 sslOptions.checkServerIdentity = function(servername, cert) {
420 } else if(typeof self.checkServerIdentity == 'function') {
421 sslOptions.checkServerIdentity = self.checkServerIdentity;
424 // Attempt SSL connection
425 self.connection = tls.connect(self.port, self.host, sslOptions, function() {
426 // Error on auth or skip
427 if(self.connection.authorizationError && self.rejectUnauthorized) {
428 return self.emit("error", self.connection.authorizationError, self, {ssl:true});
431 // Set socket timeout instead of connection timeout
432 self.connection.setTimeout(self.socketTimeout);
433 // We are done emit connect
434 self.emit('connect', self);
436 self.connection.setTimeout(self.connectionTimeout);
438 self.connection.on('connect', function() {
439 // Set socket timeout instead of connection timeout
440 self.connection.setTimeout(self.socketTimeout);
441 // Emit connect event
442 self.emit('connect', self);
446 // Add handlers for events
447 self.connection.once('error', errorHandler(self));
448 self.connection.once('timeout', timeoutHandler(self));
449 self.connection.once('close', closeHandler(self));
450 self.connection.on('data', dataHandler(self));
454 * Unref this connection
458 Connection.prototype.unref = function() {
459 if (this.connection) this.connection.unref();
462 this.once('connect', function() {
463 self.connection.unref();
472 Connection.prototype.destroy = function() {
473 // Set the connections
474 if(connectionAccounting) delete connections[this.id];
475 if(this.connection) {
476 this.connection.end();
477 this.connection.destroy();
480 this.destroyed = true;
484 * Write to connection
486 * @param {Command} command Command to write out need to implement toBin and toBinUnified
488 Connection.prototype.write = function(buffer) {
490 if(this.logger.isDebug()) {
491 if(!Array.isArray(buffer)) {
492 this.logger.debug(f('writing buffer [%s] to %s:%s', buffer.toString('hex'), this.host, this.port));
494 for(var i = 0; i < buffer.length; i++)
495 this.logger.debug(f('writing buffer [%s] to %s:%s', buffer[i].toString('hex'), this.host, this.port));
499 // Write out the command
500 if(!Array.isArray(buffer)) return this.connection.write(buffer, 'binary');
501 // Iterate over all buffers and write them in order to the socket
502 for(var i = 0; i < buffer.length; i++) this.connection.write(buffer[i], 'binary');
506 * Return id of connection as a string
510 Connection.prototype.toString = function() {
515 * Return json object of connection
519 Connection.prototype.toJSON = function() {
520 return {id: this.id, host: this.host, port: this.port};
524 * Is the connection connected
528 Connection.prototype.isConnected = function() {
529 if(this.destroyed) return false;
530 return !this.connection.destroyed && this.connection.writable;
534 * A server connect event, used to verify that the connection is up and running
536 * @event Connection#connect
541 * The server connection closed, all pool connections closed
543 * @event Connection#close
548 * The server connection caused an error, all pool connections closed
550 * @event Connection#error
555 * The server connection timed out, all pool connections closed
557 * @event Connection#timeout
562 * The driver experienced an invalid message, all pool connections closed
564 * @event Connection#parseError
568 module.exports = Connection;