3 var inherits = require('util').inherits,
4 EventEmitter = require('events').EventEmitter,
5 Connection = require('./connection'),
6 MongoError = require('../error'),
7 Logger = require('./logger'),
8 f = require('util').format,
9 Query = require('./commands').Query,
10 CommandResult = require('./command_result'),
11 assign = require('../topologies/shared').assign;
13 var MongoCR = require('../auth/mongocr')
14 , X509 = require('../auth/x509')
15 , Plain = require('../auth/plain')
16 , GSSAPI = require('../auth/gssapi')
17 , SSPI = require('../auth/sspi')
18 , ScramSHA1 = require('../auth/scram');
20 var DISCONNECTED = 'disconnected';
21 var CONNECTING = 'connecting';
22 var CONNECTED = 'connected';
23 var DESTROYING = 'destroying';
24 var DESTROYED = 'destroyed';
29 * Creates a new Pool instance
31 * @param {string} options.host The server host
32 * @param {number} options.port The server port
33 * @param {number} [options.size=1] Max server connection pool size
34 * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
35 * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
36 * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
37 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
38 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
39 * @param {boolean} [options.noDelay=true] TCP Connection no delay
40 * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
41 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
42 * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
43 * @param {boolean} [options.ssl=false] Use SSL for connection
44 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
45 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
46 * @param {Buffer} [options.cert] SSL Certificate binary buffer
47 * @param {Buffer} [options.key] SSL Key file binary buffer
48 * @param {string} [options.passPhrase] SSL Certificate pass phrase
49 * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
50 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
51 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
52 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
53 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
58 * @fires Pool#parseError
59 * @return {Pool} A cursor instance
61 var Pool = function(options) {
64 EventEmitter.call(this);
66 this.options = assign({
67 // Host and port settings
70 // Pool default max size
73 connectionTimeout: 30000,
76 keepAliveInitialDelay: 0,
79 ssl: false, checkServerIdentity: true,
80 ca: null, cert: null, key: null, passPhrase: null,
81 rejectUnauthorized: false,
84 promoteBuffers: false,
85 // Reconnection options
87 reconnectInterval: 1000,
93 // Identification information
95 // Current reconnect retries
96 this.retriesLeft = this.options.reconnectTries;
97 this.reconnectId = null;
98 // No bson parser passed in
99 if(!options.bson || (options.bson
100 && (typeof options.bson.serialize != 'function'
101 || typeof options.bson.deserialize != 'function'))) {
102 throw new Error("must pass in valid bson parser");
106 this.logger = Logger('Pool', options);
108 this.state = DISCONNECTED;
110 this.availableConnections = [];
111 this.inUseConnections = [];
112 this.connectingConnections = [];
113 // Currently executing
114 this.executing = false;
115 // Operation work queue
118 // All the authProviders
119 this.authProviders = options.authProviders || {
120 'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
121 , 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
122 , 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
125 // Are we currently authenticating
126 this.authenticating = false;
127 this.loggingout = false;
128 this.nonAuthenticatedConnections = [];
129 this.authenticatingTimestamp = null;
130 // Number of consecutive timeouts caught
131 this.numberOfConsecutiveTimeouts = 0;
132 // Current pool Index
133 this.connectionIndex = 0;
136 inherits(Pool, EventEmitter);
138 Object.defineProperty(Pool.prototype, 'size', {
140 get: function() { return this.options.size; }
143 Object.defineProperty(Pool.prototype, 'connectionTimeout', {
145 get: function() { return this.options.connectionTimeout; }
148 Object.defineProperty(Pool.prototype, 'socketTimeout', {
150 get: function() { return this.options.socketTimeout; }
153 function stateTransition(self, newState) {
154 var legalTransitions = {
155 'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
156 'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
157 'connected': [CONNECTED, DISCONNECTED, DESTROYING],
158 'destroying': [DESTROYING, DESTROYED],
159 'destroyed': [DESTROYED]
163 var legalStates = legalTransitions[self.state];
164 if(legalStates && legalStates.indexOf(newState) != -1) {
165 self.state = newState;
167 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
168 , self.id, self.state, newState, legalStates));
172 function authenticate(pool, auth, connection, cb) {
173 if(auth[0] === undefined) return cb(null);
174 // We need to authenticate the server
175 var mechanism = auth[0];
177 // Validate if the mechanism exists
178 if(!pool.authProviders[mechanism]) {
179 throw new MongoError(f('authMechanism %s not supported', mechanism));
183 var provider = pool.authProviders[mechanism];
185 // Authenticate using the provided mechanism
186 provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
189 // The write function used by the authentication mechanism (bypasses external)
190 function write(self) {
191 return function(connection, command, callback) {
192 // Get the raw buffer
193 // Ensure we stop auth if pool was destroyed
194 if(self.state == DESTROYED || self.state == DESTROYING) {
195 return callback(new MongoError('pool destroyed'));
198 // Set the connection workItem callback
199 connection.workItems.push({
200 cb: callback, command: true, requestId: command.requestId
203 // Write the buffer out to the connection
204 connection.write(command.toBin());
209 function reauthenticate(pool, connection, cb) {
211 function authenticateAgainstProvider(pool, connection, providers, cb) {
212 // Finished re-authenticating against providers
213 if(providers.length == 0) return cb();
214 // Get the provider name
215 var provider = pool.authProviders[providers.pop()];
218 provider.reauthenticate(write(pool), [connection], function(err, r) {
219 // We got an error return immediately
220 if(err) return cb(err);
221 // Continue authenticating the connection
222 authenticateAgainstProvider(pool, connection, providers, cb);
226 // Start re-authenticating process
227 authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
230 function connectionFailureHandler(self, event) {
231 return function(err) {
232 if (this._connectionFailHandled) return;
233 this._connectionFailHandled = true;
234 // Destroy the connection
237 // Remove the connection
238 removeConnection(self, this);
240 // Flush all work Items on this connection
241 while(this.workItems.length > 0) {
242 var workItem = this.workItems.shift();
243 // if(workItem.cb) workItem.cb(err);
244 if(workItem.cb) workItem.cb(err);
247 // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
248 if(event == 'timeout') {
249 self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
251 // Have we timed out more than reconnectTries in a row ?
252 // Force close the pool as we are trying to connect to tcp sink hole
253 if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
254 self.numberOfConsecutiveTimeouts = 0;
255 // Destroy all connections and pool
258 return self.emit('close', self);
262 // No more socket available propegate the event
263 if(self.socketCount() == 0) {
264 if(self.state != DESTROYED && self.state != DESTROYING) {
265 stateTransition(self, DISCONNECTED);
268 // Do not emit error events, they are always close events
269 // do not trigger the low level error handler in node
270 event = event == 'error' ? 'close' : event;
271 self.emit(event, err);
274 // Start reconnection attempts
275 if(!self.reconnectId && self.options.reconnect) {
276 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
281 function attemptReconnect(self) {
283 self.emit('attemptReconnect', self);
284 if(self.state == DESTROYED || self.state == DESTROYING) return;
286 // We are connected do not try again
287 if(self.isConnected()) {
288 self.reconnectId = null;
292 // If we have failure schedule a retry
293 function _connectionFailureHandler(self, event) {
295 if (this._connectionFailHandled) return;
296 this._connectionFailHandled = true;
297 // Destroy the connection
299 // Count down the number of reconnects
300 self.retriesLeft = self.retriesLeft - 1;
301 // How many retries are left
302 if(self.retriesLeft == 0) {
303 // Destroy the instance
306 self.emit('reconnectFailed'
307 , new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
309 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
314 // Got a connect handler
315 function _connectHandler(self) {
318 var connection = this;
320 // Pool destroyed stop the connection
321 if(self.state == DESTROYED || self.state == DESTROYING) {
322 return connection.destroy();
325 // Clear out all handlers
326 handlers.forEach(function(event) {
327 connection.removeAllListeners(event);
330 // Reset reconnect id
331 self.reconnectId = null;
333 // Apply pool connection handlers
334 connection.on('error', connectionFailureHandler(self, 'error'));
335 connection.on('close', connectionFailureHandler(self, 'close'));
336 connection.on('timeout', connectionFailureHandler(self, 'timeout'));
337 connection.on('parseError', connectionFailureHandler(self, 'parseError'));
339 // Apply any auth to the connection
340 reauthenticate(self, this, function(err) {
342 self.retriesLeft = self.options.reconnectTries;
343 // Push to available connections
344 self.availableConnections.push(connection);
345 // Emit reconnect event
346 self.emit('reconnect', self);
347 // Trigger execute to start everything up again
353 // Create a connection
354 var connection = new Connection(messageHandler(self), self.options);
356 connection.on('close', _connectionFailureHandler(self, 'close'));
357 connection.on('error', _connectionFailureHandler(self, 'error'));
358 connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
359 connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
361 connection.on('connect', _connectHandler(self));
362 // Attempt connection
363 connection.connect();
367 function moveConnectionBetween(connection, from, to) {
368 var index = from.indexOf(connection);
369 // Move the connection from connecting to available
371 from.splice(index, 1);
376 function messageHandler(self) {
377 return function(message, connection) {
378 // workItem to execute
381 // Locate the workItem
382 for(var i = 0; i < connection.workItems.length; i++) {
383 if(connection.workItems[i].requestId == message.responseTo) {
385 var workItem = connection.workItems[i];
386 // Remove from list of workItems
387 connection.workItems.splice(i, 1);
391 // Reset timeout counter
392 self.numberOfConsecutiveTimeouts = 0;
394 // Reset the connection timeout if we modified it for
396 if(workItem.socketTimeout) {
397 connection.resetSocketTimeout();
400 // Log if debug enabled
401 if(self.logger.isDebug()) {
402 self.logger.debug(f('message [%s] received from %s:%s'
403 , message.raw.toString('hex'), self.options.host, self.options.port));
406 // Authenticate any straggler connections
407 function authenticateStragglers(self, connection, callback) {
408 // Get any non authenticated connections
409 var connections = self.nonAuthenticatedConnections.slice(0);
410 var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
411 self.nonAuthenticatedConnections = [];
413 // Establish if the connection need to be authenticated
414 // Add to authentication list if
415 // 1. we were in an authentication process when the operation was executed
416 // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
417 if(connection.workItems.length == 1 && (connection.workItems[0].authenticating == true
418 || (typeof connection.workItems[0].authenticatingTimestamp == 'number'
419 && connection.workItems[0].authenticatingTimestamp != self.authenticatingTimestamp))) {
420 // Add connection to the list
421 connections.push(connection);
424 // No connections need to be re-authenticated
425 if(connections.length == 0) {
426 // Release the connection back to the pool
427 moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
432 // Apply re-authentication to all connections before releasing back to pool
433 var connectionCount = connections.length;
434 // Authenticate all connections
435 for(var i = 0; i < connectionCount; i++) {
436 reauthenticate(self, connections[i], function(err) {
437 connectionCount = connectionCount - 1;
439 if(connectionCount == 0) {
440 // Put non authenticated connections in available connections
441 self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
442 // Release the connection back to the pool
443 moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
451 function handleOperationCallback(self, cb, err, result) {
453 if(!self.options.domainsEnabled) {
454 return process.nextTick(function() {
455 return cb(err, result);
459 // Domain enabled just call the callback
463 authenticateStragglers(self, connection, function(err) {
464 // Keep executing, ensure current message handler does not stop execution
465 if(!self.executing) {
466 process.nextTick(function() {
471 // Time to dispatch the message if we have a callback
472 if(!workItem.immediateRelease) {
474 // Parse the message according to the provided options
475 message.parse(workItem);
477 return handleOperationCallback(self, workItem.cb, MongoError.create(err));
480 // Establish if we have an error
481 if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
482 || message.documents[0]['errmsg'] || message.documents[0]['code'])) {
483 return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
486 // Add the connection details
487 message.hashedName = connection.hashedName;
489 // Return the documents
490 handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
497 * Return the total socket count in the pool.
499 * @return {Number} The number of socket available.
501 Pool.prototype.socketCount = function() {
502 return this.availableConnections.length
503 + this.inUseConnections.length
504 + this.connectingConnections.length;
508 * Return all pool connections
510 * @return {Connectio[]} The pool connections
512 Pool.prototype.allConnections = function() {
513 return this.availableConnections
514 .concat(this.inUseConnections)
515 .concat(this.connectingConnections);
519 * Get a pool connection (round-robin)
521 * @return {Connection}
523 Pool.prototype.get = function() {
524 return this.allConnections()[0];
528 * Is the pool connected
532 Pool.prototype.isConnected = function() {
533 // We are in a destroyed state
534 if(this.state == DESTROYED || this.state == DESTROYING) {
539 var connections = this.availableConnections
540 .concat(this.inUseConnections);
542 for(var i = 0; i < connections.length; i++) {
543 if(connections[i].isConnected()) return true;
546 // Might be authenticating, but we are still connected
547 if(connections.length == 0 && this.authenticating) {
556 * Was the pool destroyed
560 Pool.prototype.isDestroyed = function() {
561 return this.state == DESTROYED || this.state == DESTROYING;
565 * Is the pool in a disconnected state
569 Pool.prototype.isDisconnected = function() {
570 return this.state == DISCONNECTED;
577 Pool.prototype.connect = function(auth) {
578 if(this.state != DISCONNECTED) {
579 throw new MongoError('connection in unlawful state ' + this.state);
583 // Transition to connecting state
584 stateTransition(this, CONNECTING);
585 // Create an array of the arguments
586 var args = Array.prototype.slice.call(arguments, 0);
587 // Create a connection
588 var connection = new Connection(messageHandler(self), this.options);
589 // Add to list of connections
590 this.connectingConnections.push(connection);
591 // Add listeners to the connection
592 connection.once('connect', function(connection) {
593 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
595 // Apply any store credentials
596 reauthenticate(self, connection, function(err) {
597 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
599 // We have an error emit it
604 return self.emit('error', err);
608 authenticate(self, args, connection, function(err) {
609 if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
611 // We have an error emit it
616 return self.emit('error', err);
618 // Set connected mode
619 stateTransition(self, CONNECTED);
621 // Move the active connection
622 moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
624 // Emit the connect event
625 self.emit('connect', self);
630 // Add error handlers
631 connection.once('error', connectionFailureHandler(this, 'error'));
632 connection.once('close', connectionFailureHandler(this, 'close'));
633 connection.once('timeout', connectionFailureHandler(this, 'timeout'));
634 connection.once('parseError', connectionFailureHandler(this, 'parseError'));
637 connection.connect();
639 // SSL or something threw on connect
640 self.emit('error', err);
645 * Authenticate using a specified mechanism
647 * @param {string} mechanism The Auth mechanism we are invoking
648 * @param {string} db The db we are invoking the mechanism against
649 * @param {...object} param Parameters for the specific mechanism
650 * @param {authResultCallback} callback A callback function
652 Pool.prototype.auth = function(mechanism, db) {
654 var args = Array.prototype.slice.call(arguments, 0);
655 var callback = args.pop();
657 // If we don't have the mechanism fail
658 if(self.authProviders[mechanism] == null && mechanism != 'default') {
659 throw new MongoError(f("auth provider %s does not exist", mechanism));
662 // Signal that we are authenticating a new set of credentials
663 this.authenticating = true;
664 this.authenticatingTimestamp = new Date().getTime();
666 // Authenticate all live connections
667 function authenticateLiveConnections(self, args, cb) {
668 // Get the current viable connections
669 var connections = self.availableConnections;
670 // Allow nothing else to use the connections while we authenticate them
671 self.availableConnections = [];
673 var connectionsCount = connections.length;
675 // No connections available, return
676 if(connectionsCount == 0) return callback(null);
677 // Authenticate the connections
678 for(var i = 0; i < connections.length; i++) {
679 authenticate(self, args, connections[i], function(err) {
680 connectionsCount = connectionsCount - 1;
685 // Processed all connections
686 if(connectionsCount == 0) {
688 self.authenticating = false;
689 // Add the connections back to available connections
690 self.availableConnections = self.availableConnections.concat(connections);
691 // We had an error, return it
694 if(self.logger.isError()) {
695 self.logger.error(f('[%s] failed to authenticate against server %s:%s'
696 , self.id, self.options.host, self.options.port));
707 // Wait for a logout in process to happen
708 function waitForLogout(self, cb) {
709 if(!self.loggingout) return cb();
710 setTimeout(function() {
711 waitForLogout(self, cb);
715 // Wait for loggout to finish
716 waitForLogout(self, function() {
717 // Authenticate all live connections
718 authenticateLiveConnections(self, args, function(err) {
719 // Credentials correctly stored in auth provider if successful
720 // Any new connections will now reauthenticate correctly
721 self.authenticating = false;
722 // Return after authentication connections
729 * Logout all users against a database
731 * @param {string} dbName The database name
732 * @param {authResultCallback} callback A callback function
734 Pool.prototype.logout = function(dbName, callback) {
736 if(typeof dbName != 'string') {
737 throw new MongoError('logout method requires a db name as first argument');
740 if(typeof callback != 'function') {
741 throw new MongoError('logout method requires a callback');
744 // Indicate logout in process
745 this.loggingout = true;
747 // Get all relevant connections
748 var connections = self.availableConnections.concat(self.inUseConnections);
749 var count = connections.length;
753 // Send logout command over all the connections
754 for(var i = 0; i < connections.length; i++) {
755 write(self)(connections[i], new Query(this.options.bson
756 , f('%s.$cmd', dbName)
757 , {logout:1}, {numberToSkip: 0, numberToReturn: 1}), function(err, r) {
762 self.loggingout = false;
773 Pool.prototype.unref = function() {
774 // Get all the known connections
775 var connections = this.availableConnections
776 .concat(this.inUseConnections)
777 .concat(this.connectingConnections);
778 connections.forEach(function(c) {
784 var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
786 // Destroy the connections
787 function destroy(self, connections) {
788 // Destroy all connections
789 connections.forEach(function(c) {
790 // Remove all listeners
791 for(var i = 0; i < events.length; i++) {
792 c.removeAllListeners(events[i]);
794 // Destroy connection
798 // Zero out all connections
799 self.inUseConnections = [];
800 self.availableConnections = [];
801 self.nonAuthenticatedConnections = [];
802 self.connectingConnections = [];
804 // Set state to destroyed
805 stateTransition(self, DESTROYED);
812 Pool.prototype.destroy = function(force) {
814 // Do not try again if the pool is already dead
815 if(this.state == DESTROYED || self.state == DESTROYING) return;
816 // Set state to destroyed
817 stateTransition(this, DESTROYING);
819 // Are we force closing
821 // Get all the known connections
822 var connections = self.availableConnections
823 .concat(self.inUseConnections)
824 .concat(self.nonAuthenticatedConnections)
825 .concat(self.connectingConnections);
826 return destroy(self, connections);
829 // Wait for the operations to drain before we close the pool
830 function checkStatus() {
831 if(self.queue.length == 0) {
832 // Get all the known connections
833 var connections = self.availableConnections
834 .concat(self.inUseConnections)
835 .concat(self.nonAuthenticatedConnections)
836 .concat(self.connectingConnections);
838 // Check if we have any in flight operations
839 for(var i = 0; i < connections.length; i++) {
840 // There is an operation still in flight, reschedule a
841 // check waiting for it to drain
842 if(connections[i].workItems.length > 0) {
843 return setTimeout(checkStatus, 1);
847 destroy(self, connections);
849 setTimeout(checkStatus, 1);
853 // Initiate drain of operations
858 * Write a message to MongoDB
860 * @return {Connection}
862 Pool.prototype.write = function(commands, options, cb) {
864 // Ensure we have a callback
865 if(typeof options == 'function') {
869 // Always have options
870 options = options || {};
872 // Pool was destroyed error out
873 if(this.state == DESTROYED || this.state == DESTROYING) {
874 // Callback with an error
877 cb(new MongoError('pool destroyed'));
879 process.nextTick(function() {
888 if(this.options.domainsEnabled
889 && process.domain && typeof cb === "function") {
890 // if we have a domain bind to it
892 cb = process.domain.bind(function() {
893 // v8 - argumentsToArray one-liner
894 var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
895 // bounce off event loop so domain switch takes place
896 process.nextTick(function() {
897 oldCb.apply(null, args);
902 // Do we have an operation
904 cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
909 if(Array.isArray(commands)) {
912 for(var i = 0; i < commands.length; i++) {
913 buffer.push(commands[i].toBin());
917 operation.requestId = commands[commands.length - 1].requestId;
919 operation.requestId = commands.requestId;
920 buffer = commands.toBin();
924 operation.buffer = buffer;
926 // Set the options for the parsing
927 operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
928 operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
929 operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
930 operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
931 operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
932 operation.documentsReturnedIn = options.documentsReturnedIn;
933 operation.command = typeof options.command == 'boolean' ? options.command : false;
934 operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
935 // operation.requestId = options.requestId;
937 // Optional per operation socketTimeout
938 operation.socketTimeout = options.socketTimeout;
939 operation.monitoring = options.monitoring;
941 // We need to have a callback function unless the message returns no response
942 if(!(typeof cb == 'function') && !options.noResponse) {
943 throw new MongoError('write method must provide a callback');
946 // If we have a monitoring operation schedule as the very first operation
947 // Otherwise add to back of queue
948 if(options.monitoring) {
949 this.queue.unshift(operation);
951 this.queue.push(operation);
954 // Attempt to execute the operation
955 if(!self.executing) {
956 process.nextTick(function() {
962 // Remove connection method
963 function remove(connection, connections) {
964 for(var i = 0; i < connections.length; i++) {
965 if(connections[i] === connection) {
966 connections.splice(i, 1);
972 function removeConnection(self, connection) {
973 if(remove(connection, self.availableConnections)) return;
974 if(remove(connection, self.inUseConnections)) return;
975 if(remove(connection, self.connectingConnections)) return;
976 if(remove(connection, self.nonAuthenticatedConnections)) return;
979 // All event handlers
980 var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
982 function _createConnection(self) {
983 var connection = new Connection(messageHandler(self), self.options);
985 // Push the connection
986 self.connectingConnections.push(connection);
989 var tempErrorHandler = function(_connection) {
990 return function(err) {
991 // Destroy the connection
992 _connection.destroy();
993 // Remove the connection from the connectingConnections list
994 removeConnection(self, _connection);
995 // Start reconnection attempts
996 if(!self.reconnectId && self.options.reconnect) {
997 self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
1002 // Handle successful connection
1003 var tempConnectHandler = function(_connection) {
1005 // Destroyed state return
1006 if(self.state == DESTROYED || self.state == DESTROYING) {
1007 // Remove the connection from the list
1008 removeConnection(self, _connection);
1009 return _connection.destroy();
1012 // Destroy all event emitters
1013 handlers.forEach(function(e) {
1014 _connection.removeAllListeners(e);
1017 // Add the final handlers
1018 _connection.once('close', connectionFailureHandler(self, 'close'));
1019 _connection.once('error', connectionFailureHandler(self, 'error'));
1020 _connection.once('timeout', connectionFailureHandler(self, 'timeout'));
1021 _connection.once('parseError', connectionFailureHandler(self, 'parseError'));
1024 reauthenticate(self, _connection, function(err) {
1025 if(self.state == DESTROYED || self.state == DESTROYING) {
1026 return _connection.destroy();
1028 // Remove the connection from the connectingConnections list
1029 removeConnection(self, _connection);
1033 return _connection.destroy();
1036 // If we are authenticating at the moment
1037 // Do not automatially put in available connections
1038 // As we need to apply the credentials first
1039 if(self.authenticating) {
1040 self.nonAuthenticatedConnections.push(_connection);
1042 // Push to available
1043 self.availableConnections.push(_connection);
1044 // Execute any work waiting
1052 connection.once('close', tempErrorHandler(connection));
1053 connection.once('error', tempErrorHandler(connection));
1054 connection.once('timeout', tempErrorHandler(connection));
1055 connection.once('parseError', tempErrorHandler(connection));
1056 connection.once('connect', tempConnectHandler(connection));
1059 connection.connect();
1062 function flushMonitoringOperations(queue) {
1063 for(var i = 0; i < queue.length; i++) {
1064 if(queue[i].monitoring) {
1065 var workItem = queue[i];
1067 workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
1072 function _execute(self) {
1074 if(self.state == DESTROYED) return;
1075 // Already executing, skip
1076 if(self.executing) return;
1077 // Set pool as executing
1078 self.executing = true;
1080 // Wait for auth to clear before continuing
1081 function waitForAuth(cb) {
1082 if(!self.authenticating) return cb();
1083 // Wait for a milisecond and try again
1084 setTimeout(function() {
1089 // Block on any auth in process
1090 waitForAuth(function() {
1091 // As long as we have available connections
1093 // Total availble connections
1094 var totalConnections = self.availableConnections.length
1095 + self.connectingConnections.length
1096 + self.inUseConnections.length;
1098 // No available connections available, flush any monitoring ops
1099 if(self.availableConnections.length == 0) {
1100 // Flush any monitoring operations
1101 flushMonitoringOperations(self.queue);
1106 if(self.queue.length == 0) {
1111 // var connection = self.availableConnections.pop();
1112 var connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
1113 // Is the connection connected
1114 if(connection.isConnected()) {
1115 // Get the next work item
1116 var workItem = self.queue.shift();
1118 // Get actual binary commands
1119 var buffer = workItem.buffer;
1121 // Set current status of authentication process
1122 workItem.authenticating = self.authenticating;
1123 workItem.authenticatingTimestamp = self.authenticatingTimestamp;
1125 // Add current associated callback to the connection
1126 // connection.workItem = workItem
1127 connection.workItems.push(workItem);
1129 // We have a custom socketTimeout
1130 if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
1131 connection.setSocketTimeout(workItem.socketTimeout);
1134 // Put operation on the wire
1135 if(Array.isArray(buffer)) {
1136 for(var i = 0; i < buffer.length; i++) {
1137 connection.write(buffer[i])
1140 connection.write(buffer);
1143 if(workItem.immediateRelease && self.authenticating) {
1144 self.nonAuthenticatedConnections.push(connection);
1147 // Have we not reached the max connection size yet
1148 if(totalConnections < self.options.size
1149 && self.queue.length > 0) {
1150 // Create a new connection
1151 _createConnection(self);
1154 flushMonitoringOperations(self.queue);
1159 self.executing = false;
1163 var connectionId = 0
1165 * A server connect event, used to verify that the connection is up and running
1167 * @event Pool#connect
1172 * A server reconnect event, used to verify that pool reconnected.
1174 * @event Pool#reconnect
1179 * The server connection closed, all pool connections closed
1186 * The server connection caused an error, all pool connections closed
1193 * The server connection timed out, all pool connections closed
1195 * @event Pool#timeout
1200 * The driver experienced an invalid message, all pool connections closed
1202 * @event Pool#parseError
1207 * The driver attempted to reconnect
1209 * @event Pool#attemptReconnect
1214 * The driver exhausted all reconnect attempts
1216 * @event Pool#reconnectFailed
1220 module.exports = Pool;