3 var inherits = require('util').inherits,
4 EventEmitter = require('events').EventEmitter,
5 Connection = require('./connection'),
6 MongoError = require('../error'),
7 Logger = require('./logger'),
8 f = require('util').format,
9 Query = require('./commands').Query,
10 CommandResult = require('./command_result'),
11 assign = require('../topologies/shared').assign;
13 var MongoCR = require('../auth/mongocr')
14 , X509 = require('../auth/x509')
15 , Plain = require('../auth/plain')
16 , GSSAPI = require('../auth/gssapi')
17 , SSPI = require('../auth/sspi')
18 , ScramSHA1 = require('../auth/scram');
20 var DISCONNECTED = 'disconnected';
21 var CONNECTING = 'connecting';
22 var CONNECTED = 'connected';
23 var DESTROYING = 'destroying';
24 var DESTROYED = 'destroyed';
29 * Creates a new Pool instance
31 * @param {string} options.host The server host
32 * @param {number} options.port The server port
33 * @param {number} [options.size=1] Max server connection pool size
34 * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
35 * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
36 * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
37 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
38 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
39 * @param {boolean} [options.noDelay=true] TCP Connection no delay
40 * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
41 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
42 * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
43 * @param {boolean} [options.ssl=false] Use SSL for connection
44 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
45 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
46 * @param {Buffer} [options.cert] SSL Certificate binary buffer
47 * @param {Buffer} [options.key] SSL Key file binary buffer
48 * @param {string} [options.passPhrase] SSL Certificate pass phrase
49 * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
50 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
51 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
52 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
53 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
58 * @fires Pool#parseError
59 * @return {Pool} A cursor instance
61 var Pool = function(options) {
63 EventEmitter.call(this);
65 this.options = assign({
66 // Host and port settings
69 // Pool default max size
72 connectionTimeout: 30000,
75 keepAliveInitialDelay: 0,
78 ssl: false, checkServerIdentity: true,
79 ca: null, cert: null, key: null, passPhrase: null,
80 rejectUnauthorized: false,
83 promoteBuffers: false,
84 // Reconnection options
86 reconnectInterval: 1000,
92 // Identification information
94 // Current reconnect retries
95 this.retriesLeft = this.options.reconnectTries;
96 this.reconnectId = null;
97 // No bson parser passed in
98 if(!options.bson || (options.bson
99 && (typeof options.bson.serialize != 'function'
100 || typeof options.bson.deserialize != 'function'))) {
101 throw new Error("must pass in valid bson parser");
105 this.logger = Logger('Pool', options);
107 this.state = DISCONNECTED;
109 this.availableConnections = [];
110 this.inUseConnections = [];
111 this.connectingConnections = [];
112 // Currently executing
113 this.executing = false;
114 // Operation work queue
117 // All the authProviders
118 this.authProviders = options.authProviders || {
119 'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
120 , 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
121 , 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
124 // Are we currently authenticating
125 this.authenticating = false;
126 this.loggingout = false;
127 this.nonAuthenticatedConnections = [];
128 this.authenticatingTimestamp = null;
129 // Number of consecutive timeouts caught
130 this.numberOfConsecutiveTimeouts = 0;
131 // Current pool Index
132 this.connectionIndex = 0;
135 inherits(Pool, EventEmitter);
137 Object.defineProperty(Pool.prototype, 'size', {
139 get: function() { return this.options.size; }
142 Object.defineProperty(Pool.prototype, 'connectionTimeout', {
144 get: function() { return this.options.connectionTimeout; }
147 Object.defineProperty(Pool.prototype, 'socketTimeout', {
149 get: function() { return this.options.socketTimeout; }
152 function stateTransition(self, newState) {
153 var legalTransitions = {
154 'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
155 'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
156 'connected': [CONNECTED, DISCONNECTED, DESTROYING],
157 'destroying': [DESTROYING, DESTROYED],
158 'destroyed': [DESTROYED]
162 var legalStates = legalTransitions[self.state];
163 if(legalStates && legalStates.indexOf(newState) != -1) {
164 self.state = newState;
166 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
167 , self.id, self.state, newState, legalStates));
171 function authenticate(pool, auth, connection, cb) {
172 if(auth[0] === undefined) return cb(null);
173 // We need to authenticate the server
174 var mechanism = auth[0];
176 // Validate if the mechanism exists
177 if(!pool.authProviders[mechanism]) {
178 throw new MongoError(f('authMechanism %s not supported', mechanism));
182 var provider = pool.authProviders[mechanism];
184 // Authenticate using the provided mechanism
185 provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
188 // The write function used by the authentication mechanism (bypasses external)
189 function write(self) {
190 return function(connection, command, callback) {
191 // Get the raw buffer
192 // Ensure we stop auth if pool was destroyed
193 if(self.state == DESTROYED || self.state == DESTROYING) {
194 return callback(new MongoError('pool destroyed'));
197 // Set the connection workItem callback
198 connection.workItems.push({
199 cb: callback, command: true, requestId: command.requestId
202 // Write the buffer out to the connection
203 connection.write(command.toBin());
208 function reauthenticate(pool, connection, cb) {
210 function authenticateAgainstProvider(pool, connection, providers, cb) {
211 // Finished re-authenticating against providers
212 if(providers.length == 0) return cb();
213 // Get the provider name
214 var provider = pool.authProviders[providers.pop()];
217 provider.reauthenticate(write(pool), [connection], function(err) {
218 // We got an error return immediately
219 if(err) return cb(err);
220 // Continue authenticating the connection
221 authenticateAgainstProvider(pool, connection, providers, cb);
225 // Start re-authenticating process
226 authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
229 function connectionFailureHandler(self, event) {
230 return function(err) {
231 if (this._connectionFailHandled) return;
232 this._connectionFailHandled = true;
233 // Destroy the connection
236 // Remove the connection
237 removeConnection(self, this);
239 // Flush all work Items on this connection
240 while(this.workItems.length > 0) {
241 var workItem = this.workItems.shift();
242 // if(workItem.cb) workItem.cb(err);
243 if(workItem.cb) workItem.cb(err);
246 // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
247 if(event == 'timeout') {
248 self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
250 // Have we timed out more than reconnectTries in a row ?
251 // Force close the pool as we are trying to connect to tcp sink hole
252 if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
253 self.numberOfConsecutiveTimeouts = 0;
254 // Destroy all connections and pool
257 return self.emit('close', self);
261 // No more socket available propegate the event
262 if(self.socketCount() == 0) {
263 if(self.state != DESTROYED && self.state != DESTROYING) {
264 stateTransition(self, DISCONNECTED);
267 // Do not emit error events, they are always close events
268 // do not trigger the low level error handler in node
269 event = event == 'error' ? 'close' : event;
270 self.emit(event, err);
273 // Start reconnection attempts
274 if(!self.reconnectId && self.options.reconnect) {
275 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
280 function attemptReconnect(self) {
282 self.emit('attemptReconnect', self);
283 if(self.state == DESTROYED || self.state == DESTROYING) return;
285 // We are connected do not try again
286 if(self.isConnected()) {
287 self.reconnectId = null;
291 // If we have failure schedule a retry
292 function _connectionFailureHandler(self) {
294 if (this._connectionFailHandled) return;
295 this._connectionFailHandled = true;
296 // Destroy the connection
298 // Count down the number of reconnects
299 self.retriesLeft = self.retriesLeft - 1;
300 // How many retries are left
301 if(self.retriesLeft == 0) {
302 // Destroy the instance
305 self.emit('reconnectFailed'
306 , new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
308 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
313 // Got a connect handler
314 function _connectHandler(self) {
317 var connection = this;
319 // Pool destroyed stop the connection
320 if(self.state == DESTROYED || self.state == DESTROYING) {
321 return connection.destroy();
324 // Clear out all handlers
325 handlers.forEach(function(event) {
326 connection.removeAllListeners(event);
329 // Reset reconnect id
330 self.reconnectId = null;
332 // Apply pool connection handlers
333 connection.on('error', connectionFailureHandler(self, 'error'));
334 connection.on('close', connectionFailureHandler(self, 'close'));
335 connection.on('timeout', connectionFailureHandler(self, 'timeout'));
336 connection.on('parseError', connectionFailureHandler(self, 'parseError'));
338 // Apply any auth to the connection
339 reauthenticate(self, this, function() {
341 self.retriesLeft = self.options.reconnectTries;
342 // Push to available connections
343 self.availableConnections.push(connection);
344 // Emit reconnect event
345 self.emit('reconnect', self);
346 // Trigger execute to start everything up again
352 // Create a connection
353 var connection = new Connection(messageHandler(self), self.options);
355 connection.on('close', _connectionFailureHandler(self, 'close'));
356 connection.on('error', _connectionFailureHandler(self, 'error'));
357 connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
358 connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
360 connection.on('connect', _connectHandler(self));
361 // Attempt connection
362 connection.connect();
366 function moveConnectionBetween(connection, from, to) {
367 var index = from.indexOf(connection);
368 // Move the connection from connecting to available
370 from.splice(index, 1);
375 function messageHandler(self) {
376 return function(message, connection) {
377 // workItem to execute
380 // Locate the workItem
381 for(var i = 0; i < connection.workItems.length; i++) {
382 if(connection.workItems[i].requestId == message.responseTo) {
384 workItem = connection.workItems[i];
385 // Remove from list of workItems
386 connection.workItems.splice(i, 1);
391 // Reset timeout counter
392 self.numberOfConsecutiveTimeouts = 0;
394 // Reset the connection timeout if we modified it for
396 if(workItem.socketTimeout) {
397 connection.resetSocketTimeout();
400 // Log if debug enabled
401 if(self.logger.isDebug()) {
402 self.logger.debug(f('message [%s] received from %s:%s'
403 , message.raw.toString('hex'), self.options.host, self.options.port));
406 // Authenticate any straggler connections
407 function authenticateStragglers(self, connection, callback) {
408 // Get any non authenticated connections
409 var connections = self.nonAuthenticatedConnections.slice(0);
410 var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
411 self.nonAuthenticatedConnections = [];
413 // Establish if the connection need to be authenticated
414 // Add to authentication list if
415 // 1. we were in an authentication process when the operation was executed
416 // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
417 if(connection.workItems.length == 1 && (connection.workItems[0].authenticating == true
418 || (typeof connection.workItems[0].authenticatingTimestamp == 'number'
419 && connection.workItems[0].authenticatingTimestamp != self.authenticatingTimestamp))) {
420 // Add connection to the list
421 connections.push(connection);
424 // No connections need to be re-authenticated
425 if(connections.length == 0) {
426 // Release the connection back to the pool
427 moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
432 // Apply re-authentication to all connections before releasing back to pool
433 var connectionCount = connections.length;
434 // Authenticate all connections
435 for(var i = 0; i < connectionCount; i++) {
436 reauthenticate(self, connections[i], function() {
437 connectionCount = connectionCount - 1;
439 if(connectionCount == 0) {
440 // Put non authenticated connections in available connections
441 self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
442 // Release the connection back to the pool
443 moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
451 function handleOperationCallback(self, cb, err, result) {
453 if(!self.options.domainsEnabled) {
454 return process.nextTick(function() {
455 return cb(err, result);
459 // Domain enabled just call the callback
463 authenticateStragglers(self, connection, function() {
464 // Keep executing, ensure current message handler does not stop execution
465 if(!self.executing) {
466 process.nextTick(function() {
471 // Time to dispatch the message if we have a callback
472 if(!workItem.immediateRelease) {
474 // Parse the message according to the provided options
475 message.parse(workItem);
477 return handleOperationCallback(self, workItem.cb, MongoError.create(err));
480 // Establish if we have an error
481 if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
482 || message.documents[0]['errmsg'] || message.documents[0]['code'])) {
483 return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
486 // Add the connection details
487 message.hashedName = connection.hashedName;
489 // Return the documents
490 handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
497 * Return the total socket count in the pool.
499 * @return {Number} The number of socket available.
501 Pool.prototype.socketCount = function() {
502 return this.availableConnections.length
503 + this.inUseConnections.length;
504 // + this.connectingConnections.length;
508 * Return all pool connections
510 * @return {Connectio[]} The pool connections
512 Pool.prototype.allConnections = function() {
513 return this.availableConnections
514 .concat(this.inUseConnections)
515 .concat(this.connectingConnections);
519 * Get a pool connection (round-robin)
521 * @return {Connection}
523 Pool.prototype.get = function() {
524 return this.allConnections()[0];
528 * Is the pool connected
532 Pool.prototype.isConnected = function() {
533 // We are in a destroyed state
534 if(this.state == DESTROYED || this.state == DESTROYING) {
539 var connections = this.availableConnections
540 .concat(this.inUseConnections);
542 // Check if we have any connected connections
543 for(var i = 0; i < connections.length; i++) {
544 if(connections[i].isConnected()) return true;
547 // Might be authenticating, but we are still connected
548 if(connections.length == 0 && this.authenticating) {
557 * Was the pool destroyed
561 Pool.prototype.isDestroyed = function() {
562 return this.state == DESTROYED || this.state == DESTROYING;
566 * Is the pool in a disconnected state
570 Pool.prototype.isDisconnected = function() {
571 return this.state == DISCONNECTED;
578 Pool.prototype.connect = function() {
579 if(this.state != DISCONNECTED) {
580 throw new MongoError('connection in unlawful state ' + this.state);
584 // Transition to connecting state
585 stateTransition(this, CONNECTING);
586 // Create an array of the arguments
587 var args = Array.prototype.slice.call(arguments, 0);
588 // Create a connection
589 var connection = new Connection(messageHandler(self), this.options);
590 // Add to list of connections
591 this.connectingConnections.push(connection);
592 // Add listeners to the connection
593 connection.once('connect', function(connection) {
594 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
596 // Apply any store credentials
597 reauthenticate(self, connection, function(err) {
598 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
600 // We have an error emit it
605 return self.emit('error', err);
609 authenticate(self, args, connection, function(err) {
610 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
612 // We have an error emit it
617 return self.emit('error', err);
619 // Set connected mode
620 stateTransition(self, CONNECTED);
622 // Move the active connection
623 moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
625 // Emit the connect event
626 self.emit('connect', self);
631 // Add error handlers
632 connection.once('error', connectionFailureHandler(this, 'error'));
633 connection.once('close', connectionFailureHandler(this, 'close'));
634 connection.once('timeout', connectionFailureHandler(this, 'timeout'));
635 connection.once('parseError', connectionFailureHandler(this, 'parseError'));
638 connection.connect();
640 // SSL or something threw on connect
641 self.emit('error', err);
646 * Authenticate using a specified mechanism
648 * @param {string} mechanism The Auth mechanism we are invoking
649 * @param {string} db The db we are invoking the mechanism against
650 * @param {...object} param Parameters for the specific mechanism
651 * @param {authResultCallback} callback A callback function
653 Pool.prototype.auth = function(mechanism) {
655 var args = Array.prototype.slice.call(arguments, 0);
656 var callback = args.pop();
658 // If we don't have the mechanism fail
659 if(self.authProviders[mechanism] == null && mechanism != 'default') {
660 throw new MongoError(f("auth provider %s does not exist", mechanism));
663 // Signal that we are authenticating a new set of credentials
664 this.authenticating = true;
665 this.authenticatingTimestamp = new Date().getTime();
667 // Authenticate all live connections
668 function authenticateLiveConnections(self, args, cb) {
669 // Get the current viable connections
670 var connections = self.availableConnections;
671 // Allow nothing else to use the connections while we authenticate them
672 self.availableConnections = [];
674 var connectionsCount = connections.length;
676 // No connections available, return
677 if(connectionsCount == 0) return callback(null);
678 // Authenticate the connections
679 for(var i = 0; i < connections.length; i++) {
680 authenticate(self, args, connections[i], function(err) {
681 connectionsCount = connectionsCount - 1;
686 // Processed all connections
687 if(connectionsCount == 0) {
689 self.authenticating = false;
690 // Add the connections back to available connections
691 self.availableConnections = self.availableConnections.concat(connections);
692 // We had an error, return it
695 if(self.logger.isError()) {
696 self.logger.error(f('[%s] failed to authenticate against server %s:%s'
697 , self.id, self.options.host, self.options.port));
708 // Wait for a logout in process to happen
709 function waitForLogout(self, cb) {
710 if(!self.loggingout) return cb();
711 setTimeout(function() {
712 waitForLogout(self, cb);
716 // Wait for loggout to finish
717 waitForLogout(self, function() {
718 // Authenticate all live connections
719 authenticateLiveConnections(self, args, function(err) {
720 // Credentials correctly stored in auth provider if successful
721 // Any new connections will now reauthenticate correctly
722 self.authenticating = false;
723 // Return after authentication connections
730 * Logout all users against a database
732 * @param {string} dbName The database name
733 * @param {authResultCallback} callback A callback function
735 Pool.prototype.logout = function(dbName, callback) {
737 if(typeof dbName != 'string') {
738 throw new MongoError('logout method requires a db name as first argument');
741 if(typeof callback != 'function') {
742 throw new MongoError('logout method requires a callback');
745 // Indicate logout in process
746 this.loggingout = true;
748 // Get all relevant connections
749 var connections = self.availableConnections.concat(self.inUseConnections);
750 var count = connections.length;
754 // Send logout command over all the connections
755 for(var i = 0; i < connections.length; i++) {
756 write(self)(connections[i], new Query(this.options.bson
757 , f('%s.$cmd', dbName)
758 , {logout:1}, {numberToSkip: 0, numberToReturn: 1}), function(err) {
763 self.loggingout = false;
774 Pool.prototype.unref = function() {
775 // Get all the known connections
776 var connections = this.availableConnections
777 .concat(this.inUseConnections)
778 .concat(this.connectingConnections);
779 connections.forEach(function(c) {
785 var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
787 // Destroy the connections
788 function destroy(self, connections) {
789 // Destroy all connections
790 connections.forEach(function(c) {
791 // Remove all listeners
792 for(var i = 0; i < events.length; i++) {
793 c.removeAllListeners(events[i]);
795 // Destroy connection
799 // Zero out all connections
800 self.inUseConnections = [];
801 self.availableConnections = [];
802 self.nonAuthenticatedConnections = [];
803 self.connectingConnections = [];
805 // Set state to destroyed
806 stateTransition(self, DESTROYED);
813 Pool.prototype.destroy = function(force) {
815 // Do not try again if the pool is already dead
816 if(this.state == DESTROYED || self.state == DESTROYING) return;
817 // Set state to destroyed
818 stateTransition(this, DESTROYING);
820 // Are we force closing
822 // Get all the known connections
823 var connections = self.availableConnections
824 .concat(self.inUseConnections)
825 .concat(self.nonAuthenticatedConnections)
826 .concat(self.connectingConnections);
827 return destroy(self, connections);
830 // Wait for the operations to drain before we close the pool
831 function checkStatus() {
832 if(self.queue.length == 0) {
833 // Get all the known connections
834 var connections = self.availableConnections
835 .concat(self.inUseConnections)
836 .concat(self.nonAuthenticatedConnections)
837 .concat(self.connectingConnections);
839 // Check if we have any in flight operations
840 for(var i = 0; i < connections.length; i++) {
841 // There is an operation still in flight, reschedule a
842 // check waiting for it to drain
843 if(connections[i].workItems.length > 0) {
844 return setTimeout(checkStatus, 1);
848 destroy(self, connections);
850 setTimeout(checkStatus, 1);
854 // Initiate drain of operations
859 * Write a message to MongoDB
861 * @return {Connection}
863 Pool.prototype.write = function(commands, options, cb) {
865 // Ensure we have a callback
866 if(typeof options == 'function') {
870 // Always have options
871 options = options || {};
873 // Pool was destroyed error out
874 if(this.state == DESTROYED || this.state == DESTROYING) {
875 // Callback with an error
878 cb(new MongoError('pool destroyed'));
880 process.nextTick(function() {
889 if(this.options.domainsEnabled
890 && process.domain && typeof cb === "function") {
891 // if we have a domain bind to it
893 cb = process.domain.bind(function() {
894 // v8 - argumentsToArray one-liner
895 var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
896 // bounce off event loop so domain switch takes place
897 process.nextTick(function() {
898 oldCb.apply(null, args);
903 // Do we have an operation
905 cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
910 if(Array.isArray(commands)) {
913 for(var i = 0; i < commands.length; i++) {
914 buffer.push(commands[i].toBin());
918 operation.requestId = commands[commands.length - 1].requestId;
920 operation.requestId = commands.requestId;
921 buffer = commands.toBin();
925 operation.buffer = buffer;
927 // Set the options for the parsing
928 operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
929 operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
930 operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
931 operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
932 operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
933 operation.documentsReturnedIn = options.documentsReturnedIn;
934 operation.command = typeof options.command == 'boolean' ? options.command : false;
935 operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
936 operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false;
937 // operation.requestId = options.requestId;
939 // Optional per operation socketTimeout
940 operation.socketTimeout = options.socketTimeout;
941 operation.monitoring = options.monitoring;
942 // Custom socket Timeout
943 if(options.socketTimeout) {
944 operation.socketTimeout = options.socketTimeout;
947 // We need to have a callback function unless the message returns no response
948 if(!(typeof cb == 'function') && !options.noResponse) {
949 throw new MongoError('write method must provide a callback');
952 // If we have a monitoring operation schedule as the very first operation
953 // Otherwise add to back of queue
954 if(options.monitoring) {
955 this.queue.unshift(operation);
957 this.queue.push(operation);
960 // Attempt to execute the operation
961 if(!self.executing) {
962 process.nextTick(function() {
968 // Remove connection method
969 function remove(connection, connections) {
970 for(var i = 0; i < connections.length; i++) {
971 if(connections[i] === connection) {
972 connections.splice(i, 1);
978 function removeConnection(self, connection) {
979 if(remove(connection, self.availableConnections)) return;
980 if(remove(connection, self.inUseConnections)) return;
981 if(remove(connection, self.connectingConnections)) return;
982 if(remove(connection, self.nonAuthenticatedConnections)) return;
985 // All event handlers
986 var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
988 function _createConnection(self) {
989 var connection = new Connection(messageHandler(self), self.options);
991 // Push the connection
992 self.connectingConnections.push(connection);
995 var tempErrorHandler = function(_connection) {
997 // Destroy the connection
998 _connection.destroy();
999 // Remove the connection from the connectingConnections list
1000 removeConnection(self, _connection);
1001 // Start reconnection attempts
1002 if(!self.reconnectId && self.options.reconnect) {
1003 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
1008 // Handle successful connection
1009 var tempConnectHandler = function(_connection) {
1011 // Destroyed state return
1012 if(self.state == DESTROYED || self.state == DESTROYING) {
1013 // Remove the connection from the list
1014 removeConnection(self, _connection);
1015 return _connection.destroy();
1018 // Destroy all event emitters
1019 handlers.forEach(function(e) {
1020 _connection.removeAllListeners(e);
1023 // Add the final handlers
1024 _connection.once('close', connectionFailureHandler(self, 'close'));
1025 _connection.once('error', connectionFailureHandler(self, 'error'));
1026 _connection.once('timeout', connectionFailureHandler(self, 'timeout'));
1027 _connection.once('parseError', connectionFailureHandler(self, 'parseError'));
1030 reauthenticate(self, _connection, function(err) {
1031 if(self.state == DESTROYED || self.state == DESTROYING) {
1032 return _connection.destroy();
1034 // Remove the connection from the connectingConnections list
1035 removeConnection(self, _connection);
1039 return _connection.destroy();
1042 // If we are authenticating at the moment
1043 // Do not automatially put in available connections
1044 // As we need to apply the credentials first
1045 if(self.authenticating) {
1046 self.nonAuthenticatedConnections.push(_connection);
1048 // Push to available
1049 self.availableConnections.push(_connection);
1050 // Execute any work waiting
1058 connection.once('close', tempErrorHandler(connection));
1059 connection.once('error', tempErrorHandler(connection));
1060 connection.once('timeout', tempErrorHandler(connection));
1061 connection.once('parseError', tempErrorHandler(connection));
1062 connection.once('connect', tempConnectHandler(connection));
1065 connection.connect();
1068 function flushMonitoringOperations(queue) {
1069 for(var i = 0; i < queue.length; i++) {
1070 if(queue[i].monitoring) {
1071 var workItem = queue[i];
1073 workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
1078 function _execute(self) {
1080 if(self.state == DESTROYED) return;
1081 // Already executing, skip
1082 if(self.executing) return;
1083 // Set pool as executing
1084 self.executing = true;
1086 // Wait for auth to clear before continuing
1087 function waitForAuth(cb) {
1088 if(!self.authenticating) return cb();
1089 // Wait for a milisecond and try again
1090 setTimeout(function() {
1095 // Block on any auth in process
1096 waitForAuth(function() {
1097 // As long as we have available connections
1099 // Total availble connections
1100 var totalConnections = self.availableConnections.length
1101 + self.connectingConnections.length
1102 + self.inUseConnections.length;
1104 // No available connections available, flush any monitoring ops
1105 if(self.availableConnections.length == 0) {
1106 // Flush any monitoring operations
1107 flushMonitoringOperations(self.queue);
1112 if(self.queue.length == 0) {
1117 var connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
1118 // Is the connection connected
1119 if(connection.isConnected()) {
1120 // Get the next work item
1121 var workItem = self.queue.shift();
1123 // Get actual binary commands
1124 var buffer = workItem.buffer;
1126 // Set current status of authentication process
1127 workItem.authenticating = self.authenticating;
1128 workItem.authenticatingTimestamp = self.authenticatingTimestamp;
1130 // If we are monitoring take the connection of the availableConnections
1131 if (workItem.monitoring) {
1132 moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
1135 // Track the executing commands on the mongo server
1136 // as long as there is an expected response
1137 if (! workItem.noResponse) {
1138 connection.workItems.push(workItem);
1141 // We have a custom socketTimeout
1142 if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
1143 connection.setSocketTimeout(workItem.socketTimeout);
1146 // Put operation on the wire
1147 if(Array.isArray(buffer)) {
1148 for(var i = 0; i < buffer.length; i++) {
1149 connection.write(buffer[i])
1152 connection.write(buffer);
1155 if(workItem.immediateRelease && self.authenticating) {
1156 self.nonAuthenticatedConnections.push(connection);
1159 // Have we not reached the max connection size yet
1160 if(totalConnections < self.options.size
1161 && self.queue.length > 0) {
1162 // Create a new connection
1163 _createConnection(self);
1166 // Remove the disconnected connection
1167 removeConnection(self, connection);
1168 // Flush any monitoring operations in the queue, failing fast
1169 flushMonitoringOperations(self.queue);
1174 self.executing = false;
1178 // Make execution loop available for testing
1179 Pool._execute = _execute;
1182 * A server connect event, used to verify that the connection is up and running
1184 * @event Pool#connect
1189 * A server reconnect event, used to verify that pool reconnected.
1191 * @event Pool#reconnect
1196 * The server connection closed, all pool connections closed
1203 * The server connection caused an error, all pool connections closed
1210 * The server connection timed out, all pool connections closed
1212 * @event Pool#timeout
1217 * The driver experienced an invalid message, all pool connections closed
1219 * @event Pool#parseError
1224 * The driver attempted to reconnect
1226 * @event Pool#attemptReconnect
1231 * The driver exhausted all reconnect attempts
1233 * @event Pool#reconnectFailed
1237 module.exports = Pool;