b3c874df2e6e98578ad01913351fb994a0ebf430
[aai/esr-gui.git] /
1 "use strict";
2
3 var inherits = require('util').inherits,
4   EventEmitter = require('events').EventEmitter,
5   Connection = require('./connection'),
6   MongoError = require('../error'),
7   Logger = require('./logger'),
8   f = require('util').format,
9   Query = require('./commands').Query,
10   CommandResult = require('./command_result'),
11   assign = require('../topologies/shared').assign;
12
13 var MongoCR = require('../auth/mongocr')
14   , X509 = require('../auth/x509')
15   , Plain = require('../auth/plain')
16   , GSSAPI = require('../auth/gssapi')
17   , SSPI = require('../auth/sspi')
18   , ScramSHA1 = require('../auth/scram');
19
20 var DISCONNECTED = 'disconnected';
21 var CONNECTING = 'connecting';
22 var CONNECTED = 'connected';
23 var DESTROYING = 'destroying';
24 var DESTROYED = 'destroyed';
25
26 var _id = 0;
27
28 /**
29  * Creates a new Pool instance
30  * @class
31  * @param {string} options.host The server host
32  * @param {number} options.port The server port
33  * @param {number} [options.size=1] Max server connection pool size
34  * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
35  * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
36  * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
37  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
38  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
39  * @param {boolean} [options.noDelay=true] TCP Connection no delay
40  * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
41  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
42  * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
43  * @param {boolean} [options.ssl=false] Use SSL for connection
44  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
45  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
46  * @param {Buffer} [options.cert] SSL Certificate binary buffer
47  * @param {Buffer} [options.key] SSL Key file binary buffer
48  * @param {string} [options.passPhrase] SSL Certificate pass phrase
49  * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
50  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
51  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
52  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
53  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
54  * @fires Pool#connect
55  * @fires Pool#close
56  * @fires Pool#error
57  * @fires Pool#timeout
58  * @fires Pool#parseError
59  * @return {Pool} A cursor instance
60  */
61 var Pool = function(options) {
62   // Add event listener
63   EventEmitter.call(this);
64   // Add the options
65   this.options = assign({
66     // Host and port settings
67     host: 'localhost',
68     port: 27017,
69     // Pool default max size
70     size: 5,
71     // socket settings
72     connectionTimeout: 30000,
73     socketTimeout: 30000,
74     keepAlive: true,
75     keepAliveInitialDelay: 0,
76     noDelay: true,
77     // SSL Settings
78     ssl: false, checkServerIdentity: true,
79     ca: null, cert: null, key: null, passPhrase: null,
80     rejectUnauthorized: false,
81     promoteLongs: true,
82     promoteValues: true,
83     promoteBuffers: false,
84     // Reconnection options
85     reconnect: true,
86     reconnectInterval: 1000,
87     reconnectTries: 30,
88     // Enable domains
89     domainsEnabled: false
90   }, options);
91
92   // Identification information
93   this.id = _id++;
94   // Current reconnect retries
95   this.retriesLeft = this.options.reconnectTries;
96   this.reconnectId = null;
97   // No bson parser passed in
98   if(!options.bson || (options.bson
99     && (typeof options.bson.serialize != 'function'
100     || typeof options.bson.deserialize != 'function'))) {
101       throw new Error("must pass in valid bson parser");
102   }
103
104   // Logger instance
105   this.logger = Logger('Pool', options);
106   // Pool state
107   this.state = DISCONNECTED;
108   // Connections
109   this.availableConnections = [];
110   this.inUseConnections = [];
111   this.connectingConnections = [];
112   // Currently executing
113   this.executing = false;
114   // Operation work queue
115   this.queue = [];
116
117   // All the authProviders
118   this.authProviders = options.authProviders || {
119       'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
120     , 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
121     , 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
122   }
123
124   // Are we currently authenticating
125   this.authenticating = false;
126   this.loggingout = false;
127   this.nonAuthenticatedConnections = [];
128   this.authenticatingTimestamp = null;
129   // Number of consecutive timeouts caught
130   this.numberOfConsecutiveTimeouts = 0;
131   // Current pool Index
132   this.connectionIndex = 0;
133 }
134
135 inherits(Pool, EventEmitter);
136
137 Object.defineProperty(Pool.prototype, 'size', {
138   enumerable:true,
139   get: function() { return this.options.size; }
140 });
141
142 Object.defineProperty(Pool.prototype, 'connectionTimeout', {
143   enumerable:true,
144   get: function() { return this.options.connectionTimeout; }
145 });
146
147 Object.defineProperty(Pool.prototype, 'socketTimeout', {
148   enumerable:true,
149   get: function() { return this.options.socketTimeout; }
150 });
151
152 function stateTransition(self, newState) {
153   var legalTransitions = {
154     'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
155     'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
156     'connected': [CONNECTED, DISCONNECTED, DESTROYING],
157     'destroying': [DESTROYING, DESTROYED],
158     'destroyed': [DESTROYED]
159   }
160
161   // Get current state
162   var legalStates = legalTransitions[self.state];
163   if(legalStates && legalStates.indexOf(newState) != -1) {
164     self.state = newState;
165   } else {
166     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
167       , self.id, self.state, newState, legalStates));
168   }
169 }
170
171 function authenticate(pool, auth, connection, cb) {
172   if(auth[0] === undefined) return cb(null);
173   // We need to authenticate the server
174   var mechanism = auth[0];
175   var db = auth[1];
176   // Validate if the mechanism exists
177   if(!pool.authProviders[mechanism]) {
178     throw new MongoError(f('authMechanism %s not supported', mechanism));
179   }
180
181   // Get the provider
182   var provider = pool.authProviders[mechanism];
183
184   // Authenticate using the provided mechanism
185   provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
186 }
187
188 // The write function used by the authentication mechanism (bypasses external)
189 function write(self) {
190   return function(connection, command, callback) {
191     // Get the raw buffer
192     // Ensure we stop auth if pool was destroyed
193     if(self.state == DESTROYED || self.state == DESTROYING) {
194       return callback(new MongoError('pool destroyed'));
195     }
196
197     // Set the connection workItem callback
198     connection.workItems.push({
199       cb: callback, command: true, requestId: command.requestId
200     });
201
202     // Write the buffer out to the connection
203     connection.write(command.toBin());
204   };
205 }
206
207
208 function reauthenticate(pool, connection, cb) {
209   // Authenticate
210   function authenticateAgainstProvider(pool, connection, providers, cb) {
211     // Finished re-authenticating against providers
212     if(providers.length == 0) return cb();
213     // Get the provider name
214     var provider = pool.authProviders[providers.pop()];
215
216     // Auth provider
217     provider.reauthenticate(write(pool), [connection], function(err) {
218       // We got an error return immediately
219       if(err) return cb(err);
220       // Continue authenticating the connection
221       authenticateAgainstProvider(pool, connection, providers, cb);
222     });
223   }
224
225   // Start re-authenticating process
226   authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
227 }
228
229 function connectionFailureHandler(self, event) {
230   return function(err) {
231     if (this._connectionFailHandled) return;
232     this._connectionFailHandled = true;
233     // Destroy the connection
234     this.destroy();
235
236     // Remove the connection
237     removeConnection(self, this);
238
239     // Flush all work Items on this connection
240     while(this.workItems.length > 0) {
241       var workItem = this.workItems.shift();
242       // if(workItem.cb) workItem.cb(err);
243       if(workItem.cb) workItem.cb(err);
244     }
245
246     // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
247     if(event == 'timeout') {
248       self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
249
250       // Have we timed out more than reconnectTries in a row ?
251       // Force close the pool as we are trying to connect to tcp sink hole
252       if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
253         self.numberOfConsecutiveTimeouts = 0;
254         // Destroy all connections and pool
255         self.destroy(true);
256         // Emit close event
257         return self.emit('close', self);
258       }
259     }
260
261     // No more socket available propegate the event
262     if(self.socketCount() == 0) {
263       if(self.state != DESTROYED && self.state != DESTROYING) {
264         stateTransition(self, DISCONNECTED);
265       }
266
267       // Do not emit error events, they are always close events
268       // do not trigger the low level error handler in node
269       event = event == 'error' ? 'close' : event;
270       self.emit(event, err);
271     }
272
273     // Start reconnection attempts
274     if(!self.reconnectId && self.options.reconnect) {
275       self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
276     }
277   };
278 }
279
280 function attemptReconnect(self) {
281   return function() {
282     self.emit('attemptReconnect', self);
283     if(self.state == DESTROYED || self.state == DESTROYING) return;
284
285     // We are connected do not try again
286     if(self.isConnected()) {
287       self.reconnectId = null;
288       return;
289     }
290
291     // If we have failure schedule a retry
292     function _connectionFailureHandler(self) {
293       return function() {
294         if (this._connectionFailHandled) return;
295         this._connectionFailHandled = true;
296         // Destroy the connection
297         this.destroy();
298         // Count down the number of reconnects
299         self.retriesLeft = self.retriesLeft - 1;
300         // How many retries are left
301         if(self.retriesLeft == 0) {
302           // Destroy the instance
303           self.destroy();
304           // Emit close event
305           self.emit('reconnectFailed'
306             , new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
307         } else {
308           self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
309         }
310       }
311     }
312
313     // Got a connect handler
314     function _connectHandler(self) {
315       return function() {
316         // Assign
317         var connection = this;
318
319         // Pool destroyed stop the connection
320         if(self.state == DESTROYED || self.state == DESTROYING) {
321           return connection.destroy();
322         }
323
324         // Clear out all handlers
325         handlers.forEach(function(event) {
326           connection.removeAllListeners(event);
327         });
328
329         // Reset reconnect id
330         self.reconnectId = null;
331
332         // Apply pool connection handlers
333         connection.on('error', connectionFailureHandler(self, 'error'));
334         connection.on('close', connectionFailureHandler(self, 'close'));
335         connection.on('timeout', connectionFailureHandler(self, 'timeout'));
336         connection.on('parseError', connectionFailureHandler(self, 'parseError'));
337
338         // Apply any auth to the connection
339         reauthenticate(self, this, function() {
340           // Reset retries
341           self.retriesLeft = self.options.reconnectTries;
342           // Push to available connections
343           self.availableConnections.push(connection);
344           // Emit reconnect event
345           self.emit('reconnect', self);
346           // Trigger execute to start everything up again
347           _execute(self)();
348         });
349       }
350     }
351
352     // Create a connection
353     var connection = new Connection(messageHandler(self), self.options);
354     // Add handlers
355     connection.on('close', _connectionFailureHandler(self, 'close'));
356     connection.on('error', _connectionFailureHandler(self, 'error'));
357     connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
358     connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
359     // On connection
360     connection.on('connect', _connectHandler(self));
361     // Attempt connection
362     connection.connect();
363   }
364 }
365
366 function moveConnectionBetween(connection, from, to) {
367   var index = from.indexOf(connection);
368   // Move the connection from connecting to available
369   if(index != -1) {
370     from.splice(index, 1);
371     to.push(connection);
372   }
373 }
374
375 function messageHandler(self) {
376   return function(message, connection) {
377     // workItem to execute
378     var workItem = null;
379
380     // Locate the workItem
381     for(var i = 0; i < connection.workItems.length; i++) {
382       if(connection.workItems[i].requestId == message.responseTo) {
383         // Get the callback
384         workItem = connection.workItems[i];
385         // Remove from list of workItems
386         connection.workItems.splice(i, 1);
387       }
388     }
389
390
391     // Reset timeout counter
392     self.numberOfConsecutiveTimeouts = 0;
393
394     // Reset the connection timeout if we modified it for
395     // this operation
396     if(workItem.socketTimeout) {
397       connection.resetSocketTimeout();
398     }
399
400     // Log if debug enabled
401     if(self.logger.isDebug()) {
402       self.logger.debug(f('message [%s] received from %s:%s'
403         , message.raw.toString('hex'), self.options.host, self.options.port));
404     }
405
406     // Authenticate any straggler connections
407     function authenticateStragglers(self, connection, callback) {
408       // Get any non authenticated connections
409       var connections = self.nonAuthenticatedConnections.slice(0);
410       var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
411       self.nonAuthenticatedConnections = [];
412
413       // Establish if the connection need to be authenticated
414       // Add to authentication list if
415       // 1. we were in an authentication process when the operation was executed
416       // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
417       if(connection.workItems.length == 1 && (connection.workItems[0].authenticating == true
418         || (typeof connection.workItems[0].authenticatingTimestamp == 'number'
419             && connection.workItems[0].authenticatingTimestamp != self.authenticatingTimestamp))) {
420         // Add connection to the list
421         connections.push(connection);
422       }
423
424       // No connections need to be re-authenticated
425       if(connections.length == 0) {
426         // Release the connection back to the pool
427         moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
428         // Finish
429         return callback();
430       }
431
432       // Apply re-authentication to all connections before releasing back to pool
433       var connectionCount = connections.length;
434       // Authenticate all connections
435       for(var i = 0; i < connectionCount; i++) {
436         reauthenticate(self, connections[i], function() {
437           connectionCount = connectionCount - 1;
438
439           if(connectionCount == 0) {
440             // Put non authenticated connections in available connections
441             self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
442             // Release the connection back to the pool
443             moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
444             // Return
445             callback();
446           }
447         });
448       }
449     }
450
451     function handleOperationCallback(self, cb, err, result) {
452       // No domain enabled
453       if(!self.options.domainsEnabled) {
454         return process.nextTick(function() {
455           return cb(err, result);
456         });
457       }
458
459       // Domain enabled just call the callback
460       cb(err, result);
461     }
462
463     authenticateStragglers(self, connection, function() {
464       // Keep executing, ensure current message handler does not stop execution
465       if(!self.executing) {
466         process.nextTick(function() {
467           _execute(self)();
468         });
469       }
470
471       // Time to dispatch the message if we have a callback
472       if(!workItem.immediateRelease) {
473         try {
474           // Parse the message according to the provided options
475           message.parse(workItem);
476         } catch(err) {
477           return handleOperationCallback(self, workItem.cb, MongoError.create(err));
478         }
479
480         // Establish if we have an error
481         if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
482         || message.documents[0]['errmsg'] || message.documents[0]['code'])) {
483           return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
484         }
485
486         // Add the connection details
487         message.hashedName = connection.hashedName;
488
489         // Return the documents
490         handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
491       }
492     });
493   }
494 }
495
496 /**
497  * Return the total socket count in the pool.
498  * @method
499  * @return {Number} The number of socket available.
500  */
501 Pool.prototype.socketCount = function() {
502   return this.availableConnections.length
503     + this.inUseConnections.length;
504     // + this.connectingConnections.length;
505 }
506
507 /**
508  * Return all pool connections
509  * @method
510  * @return {Connectio[]} The pool connections
511  */
512 Pool.prototype.allConnections = function() {
513   return this.availableConnections
514     .concat(this.inUseConnections)
515     .concat(this.connectingConnections);
516 }
517
518 /**
519  * Get a pool connection (round-robin)
520  * @method
521  * @return {Connection}
522  */
523 Pool.prototype.get = function() {
524   return this.allConnections()[0];
525 }
526
527 /**
528  * Is the pool connected
529  * @method
530  * @return {boolean}
531  */
532 Pool.prototype.isConnected = function() {
533   // We are in a destroyed state
534   if(this.state == DESTROYED || this.state == DESTROYING) {
535     return false;
536   }
537
538   // Get connections
539   var connections = this.availableConnections
540     .concat(this.inUseConnections);
541
542   // Check if we have any connected connections
543   for(var i = 0; i < connections.length; i++) {
544     if(connections[i].isConnected()) return true;
545   }
546
547   // Might be authenticating, but we are still connected
548   if(connections.length == 0 && this.authenticating) {
549     return true
550   }
551
552   // Not connected
553   return false;
554 }
555
556 /**
557  * Was the pool destroyed
558  * @method
559  * @return {boolean}
560  */
561 Pool.prototype.isDestroyed = function() {
562   return this.state == DESTROYED || this.state == DESTROYING;
563 }
564
565 /**
566  * Is the pool in a disconnected state
567  * @method
568  * @return {boolean}
569  */
570 Pool.prototype.isDisconnected = function() {
571   return this.state == DISCONNECTED;
572 }
573
574 /**
575  * Connect pool
576  * @method
577  */
578 Pool.prototype.connect = function() {
579   if(this.state != DISCONNECTED) {
580     throw new MongoError('connection in unlawful state ' + this.state);
581   }
582
583   var self = this;
584   // Transition to connecting state
585   stateTransition(this, CONNECTING);
586   // Create an array of the arguments
587   var args = Array.prototype.slice.call(arguments, 0);
588   // Create a connection
589   var connection = new Connection(messageHandler(self), this.options);
590   // Add to list of connections
591   this.connectingConnections.push(connection);
592   // Add listeners to the connection
593   connection.once('connect', function(connection) {
594     if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
595
596     // Apply any store credentials
597     reauthenticate(self, connection, function(err) {
598       if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
599
600       // We have an error emit it
601       if(err) {
602         // Destroy the pool
603         self.destroy();
604         // Emit the error
605         return self.emit('error', err);
606       }
607
608       // Authenticate
609       authenticate(self, args, connection, function(err) {
610         if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
611
612         // We have an error emit it
613         if(err) {
614           // Destroy the pool
615           self.destroy();
616           // Emit the error
617           return self.emit('error', err);
618         }
619         // Set connected mode
620         stateTransition(self, CONNECTED);
621
622         // Move the active connection
623         moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
624
625         // Emit the connect event
626         self.emit('connect', self);
627       });
628     });
629   });
630
631   // Add error handlers
632   connection.once('error', connectionFailureHandler(this, 'error'));
633   connection.once('close', connectionFailureHandler(this, 'close'));
634   connection.once('timeout', connectionFailureHandler(this, 'timeout'));
635   connection.once('parseError', connectionFailureHandler(this, 'parseError'));
636
637   try {
638     connection.connect();
639   } catch(err) {
640     // SSL or something threw on connect
641     self.emit('error', err);
642   }
643 }
644
645 /**
646  * Authenticate using a specified mechanism
647  * @method
648  * @param {string} mechanism The Auth mechanism we are invoking
649  * @param {string} db The db we are invoking the mechanism against
650  * @param {...object} param Parameters for the specific mechanism
651  * @param {authResultCallback} callback A callback function
652  */
653 Pool.prototype.auth = function(mechanism) {
654   var self = this;
655   var args = Array.prototype.slice.call(arguments, 0);
656   var callback = args.pop();
657
658   // If we don't have the mechanism fail
659   if(self.authProviders[mechanism] == null && mechanism != 'default') {
660     throw new MongoError(f("auth provider %s does not exist", mechanism));
661   }
662
663   // Signal that we are authenticating a new set of credentials
664   this.authenticating = true;
665   this.authenticatingTimestamp = new Date().getTime();
666
667   // Authenticate all live connections
668   function authenticateLiveConnections(self, args, cb) {
669     // Get the current viable connections
670     var connections = self.availableConnections;
671     // Allow nothing else to use the connections while we authenticate them
672     self.availableConnections = [];
673
674     var connectionsCount = connections.length;
675     var error = null;
676     // No connections available, return
677     if(connectionsCount == 0) return callback(null);
678     // Authenticate the connections
679     for(var i = 0; i < connections.length; i++) {
680       authenticate(self, args, connections[i], function(err) {
681         connectionsCount = connectionsCount - 1;
682
683         // Store the error
684         if(err) error = err;
685
686         // Processed all connections
687         if(connectionsCount == 0) {
688           // Auth finished
689           self.authenticating = false;
690           // Add the connections back to available connections
691           self.availableConnections = self.availableConnections.concat(connections);
692           // We had an error, return it
693           if(error) {
694             // Log the error
695             if(self.logger.isError()) {
696               self.logger.error(f('[%s] failed to authenticate against server %s:%s'
697                 , self.id, self.options.host, self.options.port));
698             }
699
700             return cb(error);
701           }
702           cb(null);
703         }
704       });
705     }
706   }
707
708   // Wait for a logout in process to happen
709   function waitForLogout(self, cb) {
710     if(!self.loggingout) return cb();
711     setTimeout(function() {
712       waitForLogout(self, cb);
713     }, 1)
714   }
715
716   // Wait for loggout to finish
717   waitForLogout(self, function() {
718     // Authenticate all live connections
719     authenticateLiveConnections(self, args, function(err) {
720       // Credentials correctly stored in auth provider if successful
721       // Any new connections will now reauthenticate correctly
722       self.authenticating = false;
723       // Return after authentication connections
724       callback(err);
725     });
726   });
727 }
728
729 /**
730  * Logout all users against a database
731  * @method
732  * @param {string} dbName The database name
733  * @param {authResultCallback} callback A callback function
734  */
735 Pool.prototype.logout = function(dbName, callback) {
736   var self = this;
737   if(typeof dbName != 'string') {
738     throw new MongoError('logout method requires a db name as first argument');
739   }
740
741   if(typeof callback != 'function') {
742     throw new MongoError('logout method requires a callback');
743   }
744
745   // Indicate logout in process
746   this.loggingout = true;
747
748   // Get all relevant connections
749   var connections = self.availableConnections.concat(self.inUseConnections);
750   var count = connections.length;
751   // Store any error
752   var error = null;
753
754   // Send logout command over all the connections
755   for(var i = 0; i < connections.length; i++) {
756     write(self)(connections[i], new Query(this.options.bson
757       , f('%s.$cmd', dbName)
758       , {logout:1}, {numberToSkip: 0, numberToReturn: 1}), function(err) {
759       count = count - 1;
760       if(err) error = err;
761
762       if(count == 0) {
763         self.loggingout = false;
764         callback(error);
765       }
766     });
767   }
768 }
769
770 /**
771  * Unref the pool
772  * @method
773  */
774 Pool.prototype.unref = function() {
775   // Get all the known connections
776   var connections = this.availableConnections
777     .concat(this.inUseConnections)
778     .concat(this.connectingConnections);
779   connections.forEach(function(c) {
780     c.unref();
781   });
782 }
783
784 // Events
785 var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
786
787 // Destroy the connections
788 function destroy(self, connections) {
789   // Destroy all connections
790   connections.forEach(function(c) {
791     // Remove all listeners
792     for(var i = 0; i < events.length; i++) {
793       c.removeAllListeners(events[i]);
794     }
795     // Destroy connection
796     c.destroy();
797   });
798
799   // Zero out all connections
800   self.inUseConnections = [];
801   self.availableConnections = [];
802   self.nonAuthenticatedConnections = [];
803   self.connectingConnections = [];
804
805   // Set state to destroyed
806   stateTransition(self, DESTROYED);
807 }
808
809 /**
810  * Destroy pool
811  * @method
812  */
813 Pool.prototype.destroy = function(force) {
814   var self = this;
815   // Do not try again if the pool is already dead
816   if(this.state == DESTROYED || self.state == DESTROYING) return;
817   // Set state to destroyed
818   stateTransition(this, DESTROYING);
819
820   // Are we force closing
821   if(force) {
822     // Get all the known connections
823     var connections = self.availableConnections
824       .concat(self.inUseConnections)
825       .concat(self.nonAuthenticatedConnections)
826       .concat(self.connectingConnections);
827     return destroy(self, connections);
828   }
829
830   // Wait for the operations to drain before we close the pool
831   function checkStatus() {
832     if(self.queue.length == 0) {
833       // Get all the known connections
834       var connections = self.availableConnections
835         .concat(self.inUseConnections)
836         .concat(self.nonAuthenticatedConnections)
837         .concat(self.connectingConnections);
838
839       // Check if we have any in flight operations
840       for(var i = 0; i < connections.length; i++) {
841         // There is an operation still in flight, reschedule a
842         // check waiting for it to drain
843         if(connections[i].workItems.length > 0) {
844           return setTimeout(checkStatus, 1);
845         }
846       }
847
848       destroy(self, connections);
849     } else {
850       setTimeout(checkStatus, 1);
851     }
852   }
853
854   // Initiate drain of operations
855   checkStatus();
856 }
857
858 /**
859  * Write a message to MongoDB
860  * @method
861  * @return {Connection}
862  */
863 Pool.prototype.write = function(commands, options, cb) {
864   var self = this;
865   // Ensure we have a callback
866   if(typeof options == 'function') {
867     cb = options;
868   }
869
870   // Always have options
871   options = options || {};
872
873   // Pool was destroyed error out
874   if(this.state == DESTROYED || this.state == DESTROYING) {
875     // Callback with an error
876     if(cb) {
877       try {
878         cb(new MongoError('pool destroyed'));
879       } catch(err) {
880         process.nextTick(function() {
881           throw err;
882         });
883       }
884     }
885
886     return;
887   }
888
889   if(this.options.domainsEnabled
890     && process.domain && typeof cb === "function") {
891     // if we have a domain bind to it
892     var oldCb = cb;
893     cb = process.domain.bind(function() {
894       // v8 - argumentsToArray one-liner
895       var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
896       // bounce off event loop so domain switch takes place
897       process.nextTick(function() {
898         oldCb.apply(null, args);
899       });
900     });
901   }
902
903   // Do we have an operation
904   var operation = {
905     cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
906   };
907
908   var buffer = null
909
910   if(Array.isArray(commands)) {
911     buffer = [];
912
913     for(var i = 0; i < commands.length; i++) {
914       buffer.push(commands[i].toBin());
915     }
916
917     // Get the requestId
918     operation.requestId = commands[commands.length - 1].requestId;
919   } else {
920     operation.requestId = commands.requestId;
921     buffer = commands.toBin();
922   }
923
924   // Set the buffers
925   operation.buffer = buffer;
926
927   // Set the options for the parsing
928   operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
929   operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
930   operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
931   operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
932   operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
933   operation.documentsReturnedIn = options.documentsReturnedIn;
934   operation.command = typeof options.command == 'boolean' ? options.command : false;
935   operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
936   operation.noResponse = typeof options.noResponse == 'boolean' ? options.noResponse : false;
937   // operation.requestId = options.requestId;
938
939   // Optional per operation socketTimeout
940   operation.socketTimeout = options.socketTimeout;
941   operation.monitoring = options.monitoring;
942   // Custom socket Timeout
943   if(options.socketTimeout) {
944     operation.socketTimeout = options.socketTimeout;
945   }
946
947   // We need to have a callback function unless the message returns no response
948   if(!(typeof cb == 'function') && !options.noResponse) {
949     throw new MongoError('write method must provide a callback');
950   }
951
952   // If we have a monitoring operation schedule as the very first operation
953   // Otherwise add to back of queue
954   if(options.monitoring) {
955     this.queue.unshift(operation);
956   } else {
957     this.queue.push(operation);
958   }
959
960   // Attempt to execute the operation
961   if(!self.executing) {
962     process.nextTick(function() {
963       _execute(self)();
964     });
965   }
966 }
967
968 // Remove connection method
969 function remove(connection, connections) {
970   for(var i = 0; i < connections.length; i++) {
971     if(connections[i] === connection) {
972       connections.splice(i, 1);
973       return true;
974     }
975   }
976 }
977
978 function removeConnection(self, connection) {
979   if(remove(connection, self.availableConnections)) return;
980   if(remove(connection, self.inUseConnections)) return;
981   if(remove(connection, self.connectingConnections)) return;
982   if(remove(connection, self.nonAuthenticatedConnections)) return;
983 }
984
985 // All event handlers
986 var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
987
988 function _createConnection(self) {
989   var connection = new Connection(messageHandler(self), self.options);
990
991   // Push the connection
992   self.connectingConnections.push(connection);
993
994   // Handle any errors
995   var tempErrorHandler = function(_connection) {
996     return function() {
997       // Destroy the connection
998       _connection.destroy();
999       // Remove the connection from the connectingConnections list
1000       removeConnection(self, _connection);
1001       // Start reconnection attempts
1002       if(!self.reconnectId && self.options.reconnect) {
1003         self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
1004       }
1005     }
1006   }
1007
1008   // Handle successful connection
1009   var tempConnectHandler = function(_connection) {
1010     return function() {
1011       // Destroyed state return
1012       if(self.state == DESTROYED || self.state == DESTROYING) {
1013         // Remove the connection from the list
1014         removeConnection(self, _connection);
1015         return _connection.destroy();
1016       }
1017
1018       // Destroy all event emitters
1019       handlers.forEach(function(e) {
1020         _connection.removeAllListeners(e);
1021       });
1022
1023       // Add the final handlers
1024       _connection.once('close', connectionFailureHandler(self, 'close'));
1025       _connection.once('error', connectionFailureHandler(self, 'error'));
1026       _connection.once('timeout', connectionFailureHandler(self, 'timeout'));
1027       _connection.once('parseError', connectionFailureHandler(self, 'parseError'));
1028
1029       // Signal
1030       reauthenticate(self, _connection, function(err) {
1031         if(self.state == DESTROYED || self.state == DESTROYING) {
1032           return _connection.destroy();
1033         }
1034         // Remove the connection from the connectingConnections list
1035         removeConnection(self, _connection);
1036
1037         // Handle error
1038         if(err) {
1039           return _connection.destroy();
1040         }
1041
1042         // If we are authenticating at the moment
1043         // Do not automatially put in available connections
1044         // As we need to apply the credentials first
1045         if(self.authenticating) {
1046           self.nonAuthenticatedConnections.push(_connection);
1047         } else {
1048           // Push to available
1049           self.availableConnections.push(_connection);
1050           // Execute any work waiting
1051           _execute(self)();
1052         }
1053       });
1054     }
1055   }
1056
1057   // Add all handlers
1058   connection.once('close', tempErrorHandler(connection));
1059   connection.once('error', tempErrorHandler(connection));
1060   connection.once('timeout', tempErrorHandler(connection));
1061   connection.once('parseError', tempErrorHandler(connection));
1062   connection.once('connect', tempConnectHandler(connection));
1063
1064   // Start connection
1065   connection.connect();
1066 }
1067
1068 function flushMonitoringOperations(queue) {
1069   for(var i = 0; i < queue.length; i++) {
1070     if(queue[i].monitoring) {
1071       var workItem = queue[i];
1072       queue.splice(i, 1);
1073       workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
1074     }
1075   }
1076 }
1077
1078 function _execute(self) {
1079   return function() {
1080     if(self.state == DESTROYED) return;
1081     // Already executing, skip
1082     if(self.executing) return;
1083     // Set pool as executing
1084     self.executing = true;
1085
1086     // Wait for auth to clear before continuing
1087     function waitForAuth(cb) {
1088       if(!self.authenticating) return cb();
1089       // Wait for a milisecond and try again
1090       setTimeout(function() {
1091         waitForAuth(cb);
1092       }, 1);
1093     }
1094
1095     // Block on any auth in process
1096     waitForAuth(function() {
1097       // As long as we have available connections
1098       while(true) {
1099         // Total availble connections
1100         var totalConnections = self.availableConnections.length
1101           + self.connectingConnections.length
1102           + self.inUseConnections.length;
1103
1104         // No available connections available, flush any monitoring ops
1105         if(self.availableConnections.length == 0) {
1106           // Flush any monitoring operations
1107           flushMonitoringOperations(self.queue);
1108           break;
1109         }
1110
1111         // No queue break
1112         if(self.queue.length == 0) {
1113           break;
1114         }
1115
1116         // Get a connection
1117         var connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
1118         // Is the connection connected
1119         if(connection.isConnected()) {
1120           // Get the next work item
1121           var workItem = self.queue.shift();
1122
1123           // Get actual binary commands
1124           var buffer = workItem.buffer;
1125
1126           // Set current status of authentication process
1127           workItem.authenticating = self.authenticating;
1128           workItem.authenticatingTimestamp = self.authenticatingTimestamp;
1129
1130           // If we are monitoring take the connection of the availableConnections
1131           if (workItem.monitoring) {
1132             moveConnectionBetween(connection, self.availableConnections, self.inUseConnections);
1133           }
1134
1135           // Track the executing commands on the mongo server
1136           // as long as there is an expected response
1137           if (! workItem.noResponse) {
1138             connection.workItems.push(workItem);
1139           }
1140
1141           // We have a custom socketTimeout
1142           if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
1143             connection.setSocketTimeout(workItem.socketTimeout);
1144           }
1145
1146           // Put operation on the wire
1147           if(Array.isArray(buffer)) {
1148             for(var i = 0; i < buffer.length; i++) {
1149               connection.write(buffer[i])
1150             }
1151           } else {
1152             connection.write(buffer);
1153           }
1154
1155           if(workItem.immediateRelease && self.authenticating) {
1156             self.nonAuthenticatedConnections.push(connection);
1157           }
1158
1159           // Have we not reached the max connection size yet
1160           if(totalConnections < self.options.size
1161             && self.queue.length > 0) {
1162             // Create a new connection
1163             _createConnection(self);
1164           }
1165         } else {
1166           // Remove the disconnected connection
1167           removeConnection(self, connection);
1168           // Flush any monitoring operations in the queue, failing fast
1169           flushMonitoringOperations(self.queue);
1170         }
1171       }
1172     });
1173
1174     self.executing = false;
1175   }
1176 }
1177
1178 // Make execution loop available for testing
1179 Pool._execute = _execute;
1180
1181 /**
1182  * A server connect event, used to verify that the connection is up and running
1183  *
1184  * @event Pool#connect
1185  * @type {Pool}
1186  */
1187
1188 /**
1189  * A server reconnect event, used to verify that pool reconnected.
1190  *
1191  * @event Pool#reconnect
1192  * @type {Pool}
1193  */
1194
1195 /**
1196  * The server connection closed, all pool connections closed
1197  *
1198  * @event Pool#close
1199  * @type {Pool}
1200  */
1201
1202 /**
1203  * The server connection caused an error, all pool connections closed
1204  *
1205  * @event Pool#error
1206  * @type {Pool}
1207  */
1208
1209 /**
1210  * The server connection timed out, all pool connections closed
1211  *
1212  * @event Pool#timeout
1213  * @type {Pool}
1214  */
1215
1216 /**
1217  * The driver experienced an invalid message, all pool connections closed
1218  *
1219  * @event Pool#parseError
1220  * @type {Pool}
1221  */
1222
1223 /**
1224  * The driver attempted to reconnect
1225  *
1226  * @event Pool#attemptReconnect
1227  * @type {Pool}
1228  */
1229
1230 /**
1231  * The driver exhausted all reconnect attempts
1232  *
1233  * @event Pool#reconnectFailed
1234  * @type {Pool}
1235  */
1236
1237 module.exports = Pool;