4e3ad83c9186427c66cae4fc1cbd5f1b9cd5e39c
[aai/esr-gui.git] /
1 "use strict";
2
3 var inherits = require('util').inherits,
4   EventEmitter = require('events').EventEmitter,
5   Connection = require('./connection'),
6   MongoError = require('../error'),
7   Logger = require('./logger'),
8   f = require('util').format,
9   Query = require('./commands').Query,
10   CommandResult = require('./command_result'),
11   assign = require('../topologies/shared').assign;
12
13 var MongoCR = require('../auth/mongocr')
14   , X509 = require('../auth/x509')
15   , Plain = require('../auth/plain')
16   , GSSAPI = require('../auth/gssapi')
17   , SSPI = require('../auth/sspi')
18   , ScramSHA1 = require('../auth/scram');
19
20 var DISCONNECTED = 'disconnected';
21 var CONNECTING = 'connecting';
22 var CONNECTED = 'connected';
23 var DESTROYING = 'destroying';
24 var DESTROYED = 'destroyed';
25
26 var _id = 0;
27
28 /**
29  * Creates a new Pool instance
30  * @class
31  * @param {string} options.host The server host
32  * @param {number} options.port The server port
33  * @param {number} [options.size=1] Max server connection pool size
34  * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
35  * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
36  * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
37  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
38  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
39  * @param {boolean} [options.noDelay=true] TCP Connection no delay
40  * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
41  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
42  * @param {number} [options.monitoringSocketTimeout=30000] TCP Socket timeout setting for replicaset monitoring socket
43  * @param {boolean} [options.ssl=false] Use SSL for connection
44  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
45  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
46  * @param {Buffer} [options.cert] SSL Certificate binary buffer
47  * @param {Buffer} [options.key] SSL Key file binary buffer
48  * @param {string} [options.passPhrase] SSL Certificate pass phrase
49  * @param {boolean} [options.rejectUnauthorized=false] Reject unauthorized server certificates
50  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
51  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
52  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
53  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
54  * @fires Pool#connect
55  * @fires Pool#close
56  * @fires Pool#error
57  * @fires Pool#timeout
58  * @fires Pool#parseError
59  * @return {Pool} A cursor instance
60  */
61 var Pool = function(options) {
62   var self = this;
63   // Add event listener
64   EventEmitter.call(this);
65   // Add the options
66   this.options = assign({
67     // Host and port settings
68     host: 'localhost',
69     port: 27017,
70     // Pool default max size
71     size: 5,
72     // socket settings
73     connectionTimeout: 30000,
74     socketTimeout: 30000,
75     keepAlive: true,
76     keepAliveInitialDelay: 0,
77     noDelay: true,
78     // SSL Settings
79     ssl: false, checkServerIdentity: true,
80     ca: null, cert: null, key: null, passPhrase: null,
81     rejectUnauthorized: false,
82     promoteLongs: true,
83     promoteValues: true,
84     promoteBuffers: false,
85     // Reconnection options
86     reconnect: true,
87     reconnectInterval: 1000,
88     reconnectTries: 30,
89     // Enable domains
90     domainsEnabled: false
91   }, options);
92
93   // Identification information
94   this.id = _id++;
95   // Current reconnect retries
96   this.retriesLeft = this.options.reconnectTries;
97   this.reconnectId = null;
98   // No bson parser passed in
99   if(!options.bson || (options.bson
100     && (typeof options.bson.serialize != 'function'
101     || typeof options.bson.deserialize != 'function'))) {
102       throw new Error("must pass in valid bson parser");
103   }
104
105   // Logger instance
106   this.logger = Logger('Pool', options);
107   // Pool state
108   this.state = DISCONNECTED;
109   // Connections
110   this.availableConnections = [];
111   this.inUseConnections = [];
112   this.connectingConnections = [];
113   // Currently executing
114   this.executing = false;
115   // Operation work queue
116   this.queue = [];
117
118   // All the authProviders
119   this.authProviders = options.authProviders || {
120       'mongocr': new MongoCR(options.bson), 'x509': new X509(options.bson)
121     , 'plain': new Plain(options.bson), 'gssapi': new GSSAPI(options.bson)
122     , 'sspi': new SSPI(options.bson), 'scram-sha-1': new ScramSHA1(options.bson)
123   }
124
125   // Are we currently authenticating
126   this.authenticating = false;
127   this.loggingout = false;
128   this.nonAuthenticatedConnections = [];
129   this.authenticatingTimestamp = null;
130   // Number of consecutive timeouts caught
131   this.numberOfConsecutiveTimeouts = 0;
132   // Current pool Index
133   this.connectionIndex = 0;
134 }
135
136 inherits(Pool, EventEmitter);
137
138 Object.defineProperty(Pool.prototype, 'size', {
139   enumerable:true,
140   get: function() { return this.options.size; }
141 });
142
143 Object.defineProperty(Pool.prototype, 'connectionTimeout', {
144   enumerable:true,
145   get: function() { return this.options.connectionTimeout; }
146 });
147
148 Object.defineProperty(Pool.prototype, 'socketTimeout', {
149   enumerable:true,
150   get: function() { return this.options.socketTimeout; }
151 });
152
153 function stateTransition(self, newState) {
154   var legalTransitions = {
155     'disconnected': [CONNECTING, DESTROYING, DISCONNECTED],
156     'connecting': [CONNECTING, DESTROYING, CONNECTED, DISCONNECTED],
157     'connected': [CONNECTED, DISCONNECTED, DESTROYING],
158     'destroying': [DESTROYING, DESTROYED],
159     'destroyed': [DESTROYED]
160   }
161
162   // Get current state
163   var legalStates = legalTransitions[self.state];
164   if(legalStates && legalStates.indexOf(newState) != -1) {
165     self.state = newState;
166   } else {
167     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
168       , self.id, self.state, newState, legalStates));
169   }
170 }
171
172 function authenticate(pool, auth, connection, cb) {
173   if(auth[0] === undefined) return cb(null);
174   // We need to authenticate the server
175   var mechanism = auth[0];
176   var db = auth[1];
177   // Validate if the mechanism exists
178   if(!pool.authProviders[mechanism]) {
179     throw new MongoError(f('authMechanism %s not supported', mechanism));
180   }
181
182   // Get the provider
183   var provider = pool.authProviders[mechanism];
184
185   // Authenticate using the provided mechanism
186   provider.auth.apply(provider, [write(pool), [connection], db].concat(auth.slice(2)).concat([cb]));
187 }
188
189 // The write function used by the authentication mechanism (bypasses external)
190 function write(self) {
191   return function(connection, command, callback) {
192     // Get the raw buffer
193     // Ensure we stop auth if pool was destroyed
194     if(self.state == DESTROYED || self.state == DESTROYING) {
195       return callback(new MongoError('pool destroyed'));
196     }
197
198     // Set the connection workItem callback
199     connection.workItems.push({
200       cb: callback, command: true, requestId: command.requestId
201     });
202
203     // Write the buffer out to the connection
204     connection.write(command.toBin());
205   };
206 }
207
208
209 function reauthenticate(pool, connection, cb) {
210   // Authenticate
211   function authenticateAgainstProvider(pool, connection, providers, cb) {
212     // Finished re-authenticating against providers
213     if(providers.length == 0) return cb();
214     // Get the provider name
215     var provider = pool.authProviders[providers.pop()];
216
217     // Auth provider
218     provider.reauthenticate(write(pool), [connection], function(err, r) {
219       // We got an error return immediately
220       if(err) return cb(err);
221       // Continue authenticating the connection
222       authenticateAgainstProvider(pool, connection, providers, cb);
223     });
224   }
225
226   // Start re-authenticating process
227   authenticateAgainstProvider(pool, connection, Object.keys(pool.authProviders), cb);
228 }
229
230 function connectionFailureHandler(self, event) {
231   return function(err) {
232     if (this._connectionFailHandled) return;
233     this._connectionFailHandled = true;
234     // Destroy the connection
235     this.destroy();
236
237     // Remove the connection
238     removeConnection(self, this);
239
240     // Flush all work Items on this connection
241     while(this.workItems.length > 0) {
242       var workItem = this.workItems.shift();
243       // if(workItem.cb) workItem.cb(err);
244       if(workItem.cb) workItem.cb(err);
245     }
246
247     // Did we catch a timeout, increment the numberOfConsecutiveTimeouts
248     if(event == 'timeout') {
249       self.numberOfConsecutiveTimeouts = self.numberOfConsecutiveTimeouts + 1;
250
251       // Have we timed out more than reconnectTries in a row ?
252       // Force close the pool as we are trying to connect to tcp sink hole
253       if(self.numberOfConsecutiveTimeouts > self.options.reconnectTries) {
254         self.numberOfConsecutiveTimeouts = 0;
255         // Destroy all connections and pool
256         self.destroy(true);
257         // Emit close event
258         return self.emit('close', self);
259       }
260     }
261
262     // No more socket available propegate the event
263     if(self.socketCount() == 0) {
264       if(self.state != DESTROYED && self.state != DESTROYING) {
265         stateTransition(self, DISCONNECTED);
266       }
267
268       // Do not emit error events, they are always close events
269       // do not trigger the low level error handler in node
270       event = event == 'error' ? 'close' : event;
271       self.emit(event, err);
272     }
273
274     // Start reconnection attempts
275     if(!self.reconnectId && self.options.reconnect) {
276       self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
277     }
278   };
279 }
280
281 function attemptReconnect(self) {
282   return function() {
283     self.emit('attemptReconnect', self);
284     if(self.state == DESTROYED || self.state == DESTROYING) return;
285
286     // We are connected do not try again
287     if(self.isConnected()) {
288       self.reconnectId = null;
289       return;
290     }
291
292     // If we have failure schedule a retry
293     function _connectionFailureHandler(self, event) {
294       return function() {
295         if (this._connectionFailHandled) return;
296         this._connectionFailHandled = true;
297         // Destroy the connection
298         this.destroy();
299         // Count down the number of reconnects
300         self.retriesLeft = self.retriesLeft - 1;
301         // How many retries are left
302         if(self.retriesLeft == 0) {
303           // Destroy the instance
304           self.destroy();
305           // Emit close event
306           self.emit('reconnectFailed'
307             , new MongoError(f('failed to reconnect after %s attempts with interval %s ms', self.options.reconnectTries, self.options.reconnectInterval)));
308         } else {
309           self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
310         }
311       }
312     }
313
314     // Got a connect handler
315     function _connectHandler(self) {
316       return function() {
317         // Assign
318         var connection = this;
319
320         // Pool destroyed stop the connection
321         if(self.state == DESTROYED || self.state == DESTROYING) {
322           return connection.destroy();
323         }
324
325         // Clear out all handlers
326         handlers.forEach(function(event) {
327           connection.removeAllListeners(event);
328         });
329
330         // Reset reconnect id
331         self.reconnectId = null;
332
333         // Apply pool connection handlers
334         connection.on('error', connectionFailureHandler(self, 'error'));
335         connection.on('close', connectionFailureHandler(self, 'close'));
336         connection.on('timeout', connectionFailureHandler(self, 'timeout'));
337         connection.on('parseError', connectionFailureHandler(self, 'parseError'));
338
339         // Apply any auth to the connection
340         reauthenticate(self, this, function(err) {
341           // Reset retries
342           self.retriesLeft = self.options.reconnectTries;
343           // Push to available connections
344           self.availableConnections.push(connection);
345           // Emit reconnect event
346           self.emit('reconnect', self);
347           // Trigger execute to start everything up again
348           _execute(self)();
349         });
350       }
351     }
352
353     // Create a connection
354     var connection = new Connection(messageHandler(self), self.options);
355     // Add handlers
356     connection.on('close', _connectionFailureHandler(self, 'close'));
357     connection.on('error', _connectionFailureHandler(self, 'error'));
358     connection.on('timeout', _connectionFailureHandler(self, 'timeout'));
359     connection.on('parseError', _connectionFailureHandler(self, 'parseError'));
360     // On connection
361     connection.on('connect', _connectHandler(self));
362     // Attempt connection
363     connection.connect();
364   }
365 }
366
367 function moveConnectionBetween(connection, from, to) {
368   var index = from.indexOf(connection);
369   // Move the connection from connecting to available
370   if(index != -1) {
371     from.splice(index, 1);
372     to.push(connection);
373   }
374 }
375
376 function messageHandler(self) {
377   return function(message, connection) {
378     // workItem to execute
379     var workItem = null;
380
381     // Locate the workItem
382     for(var i = 0; i < connection.workItems.length; i++) {
383       if(connection.workItems[i].requestId == message.responseTo) {
384         // Get the callback
385         var workItem = connection.workItems[i];
386         // Remove from list of workItems
387         connection.workItems.splice(i, 1);
388       }
389     }
390
391     // Reset timeout counter
392     self.numberOfConsecutiveTimeouts = 0;
393
394     // Reset the connection timeout if we modified it for
395     // this operation
396     if(workItem.socketTimeout) {
397       connection.resetSocketTimeout();
398     }
399
400     // Log if debug enabled
401     if(self.logger.isDebug()) {
402       self.logger.debug(f('message [%s] received from %s:%s'
403         , message.raw.toString('hex'), self.options.host, self.options.port));
404     }
405
406     // Authenticate any straggler connections
407     function authenticateStragglers(self, connection, callback) {
408       // Get any non authenticated connections
409       var connections = self.nonAuthenticatedConnections.slice(0);
410       var nonAuthenticatedConnections = self.nonAuthenticatedConnections;
411       self.nonAuthenticatedConnections = [];
412
413       // Establish if the connection need to be authenticated
414       // Add to authentication list if
415       // 1. we were in an authentication process when the operation was executed
416       // 2. our current authentication timestamp is from the workItem one, meaning an auth has happened
417       if(connection.workItems.length == 1 && (connection.workItems[0].authenticating == true
418         || (typeof connection.workItems[0].authenticatingTimestamp == 'number'
419             && connection.workItems[0].authenticatingTimestamp != self.authenticatingTimestamp))) {
420         // Add connection to the list
421         connections.push(connection);
422       }
423
424       // No connections need to be re-authenticated
425       if(connections.length == 0) {
426         // Release the connection back to the pool
427         moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
428         // Finish
429         return callback();
430       }
431
432       // Apply re-authentication to all connections before releasing back to pool
433       var connectionCount = connections.length;
434       // Authenticate all connections
435       for(var i = 0; i < connectionCount; i++) {
436         reauthenticate(self, connections[i], function(err) {
437           connectionCount = connectionCount - 1;
438
439           if(connectionCount == 0) {
440             // Put non authenticated connections in available connections
441             self.availableConnections = self.availableConnections.concat(nonAuthenticatedConnections);
442             // Release the connection back to the pool
443             moveConnectionBetween(connection, self.inUseConnections, self.availableConnections);
444             // Return
445             callback();
446           }
447         });
448       }
449     }
450
451     function handleOperationCallback(self, cb, err, result) {
452       // No domain enabled
453       if(!self.options.domainsEnabled) {
454         return process.nextTick(function() {
455           return cb(err, result);
456         });
457       }
458
459       // Domain enabled just call the callback
460       cb(err, result);
461     }
462
463     authenticateStragglers(self, connection, function(err) {
464       // Keep executing, ensure current message handler does not stop execution
465       if(!self.executing) {
466         process.nextTick(function() {
467           _execute(self)();
468         });
469       }
470
471       // Time to dispatch the message if we have a callback
472       if(!workItem.immediateRelease) {
473         try {
474           // Parse the message according to the provided options
475           message.parse(workItem);
476         } catch(err) {
477           return handleOperationCallback(self, workItem.cb, MongoError.create(err));
478         }
479
480         // Establish if we have an error
481         if(workItem.command && message.documents[0] && (message.documents[0].ok == 0 || message.documents[0]['$err']
482         || message.documents[0]['errmsg'] || message.documents[0]['code'])) {
483           return handleOperationCallback(self, workItem.cb, MongoError.create(message.documents[0]));
484         }
485
486         // Add the connection details
487         message.hashedName = connection.hashedName;
488
489         // Return the documents
490         handleOperationCallback(self, workItem.cb, null, new CommandResult(workItem.fullResult ? message : message.documents[0], connection, message));
491       }
492     });
493   }
494 }
495
496 /**
497  * Return the total socket count in the pool.
498  * @method
499  * @return {Number} The number of socket available.
500  */
501 Pool.prototype.socketCount = function() {
502   return this.availableConnections.length
503     + this.inUseConnections.length
504     + this.connectingConnections.length;
505 }
506
507 /**
508  * Return all pool connections
509  * @method
510  * @return {Connectio[]} The pool connections
511  */
512 Pool.prototype.allConnections = function() {
513   return this.availableConnections
514     .concat(this.inUseConnections)
515     .concat(this.connectingConnections);
516 }
517
518 /**
519  * Get a pool connection (round-robin)
520  * @method
521  * @return {Connection}
522  */
523 Pool.prototype.get = function() {
524   return this.allConnections()[0];
525 }
526
527 /**
528  * Is the pool connected
529  * @method
530  * @return {boolean}
531  */
532 Pool.prototype.isConnected = function() {
533   // We are in a destroyed state
534   if(this.state == DESTROYED || this.state == DESTROYING) {
535     return false;
536   }
537
538   // Get connections
539   var connections = this.availableConnections
540     .concat(this.inUseConnections);
541
542   for(var i = 0; i < connections.length; i++) {
543     if(connections[i].isConnected()) return true;
544   }
545
546   // Might be authenticating, but we are still connected
547   if(connections.length == 0 && this.authenticating) {
548     return true
549   }
550
551   // Not connected
552   return false;
553 }
554
555 /**
556  * Was the pool destroyed
557  * @method
558  * @return {boolean}
559  */
560 Pool.prototype.isDestroyed = function() {
561   return this.state == DESTROYED || this.state == DESTROYING;
562 }
563
564 /**
565  * Is the pool in a disconnected state
566  * @method
567  * @return {boolean}
568  */
569 Pool.prototype.isDisconnected = function() {
570   return this.state == DISCONNECTED;
571 }
572
573 /**
574  * Connect pool
575  * @method
576  */
577 Pool.prototype.connect = function(auth) {
578   if(this.state != DISCONNECTED) {
579     throw new MongoError('connection in unlawful state ' + this.state);
580   }
581
582   var self = this;
583   // Transition to connecting state
584   stateTransition(this, CONNECTING);
585   // Create an array of the arguments
586   var args = Array.prototype.slice.call(arguments, 0);
587   // Create a connection
588   var connection = new Connection(messageHandler(self), this.options);
589   // Add to list of connections
590   this.connectingConnections.push(connection);
591   // Add listeners to the connection
592   connection.once('connect', function(connection) {
593     if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
594
595     // Apply any store credentials
596     reauthenticate(self, connection, function(err) {
597       if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
598
599       // We have an error emit it
600       if(err) {
601         // Destroy the pool
602         self.destroy();
603         // Emit the error
604         return self.emit('error', err);
605       }
606
607       // Authenticate
608       authenticate(self, args, connection, function(err) {
609         if(self.state == DESTROYED || self.state == DESTROYING) return self.destroy();
610
611         // We have an error emit it
612         if(err) {
613           // Destroy the pool
614           self.destroy();
615           // Emit the error
616           return self.emit('error', err);
617         }
618         // Set connected mode
619         stateTransition(self, CONNECTED);
620
621         // Move the active connection
622         moveConnectionBetween(connection, self.connectingConnections, self.availableConnections);
623
624         // Emit the connect event
625         self.emit('connect', self);
626       });
627     });
628   });
629
630   // Add error handlers
631   connection.once('error', connectionFailureHandler(this, 'error'));
632   connection.once('close', connectionFailureHandler(this, 'close'));
633   connection.once('timeout', connectionFailureHandler(this, 'timeout'));
634   connection.once('parseError', connectionFailureHandler(this, 'parseError'));
635
636   try {
637     connection.connect();
638   } catch(err) {
639     // SSL or something threw on connect
640     self.emit('error', err);
641   }
642 }
643
644 /**
645  * Authenticate using a specified mechanism
646  * @method
647  * @param {string} mechanism The Auth mechanism we are invoking
648  * @param {string} db The db we are invoking the mechanism against
649  * @param {...object} param Parameters for the specific mechanism
650  * @param {authResultCallback} callback A callback function
651  */
652 Pool.prototype.auth = function(mechanism, db) {
653   var self = this;
654   var args = Array.prototype.slice.call(arguments, 0);
655   var callback = args.pop();
656
657   // If we don't have the mechanism fail
658   if(self.authProviders[mechanism] == null && mechanism != 'default') {
659     throw new MongoError(f("auth provider %s does not exist", mechanism));
660   }
661
662   // Signal that we are authenticating a new set of credentials
663   this.authenticating = true;
664   this.authenticatingTimestamp = new Date().getTime();
665
666   // Authenticate all live connections
667   function authenticateLiveConnections(self, args, cb) {
668     // Get the current viable connections
669     var connections = self.availableConnections;
670     // Allow nothing else to use the connections while we authenticate them
671     self.availableConnections = [];
672
673     var connectionsCount = connections.length;
674     var error = null;
675     // No connections available, return
676     if(connectionsCount == 0) return callback(null);
677     // Authenticate the connections
678     for(var i = 0; i < connections.length; i++) {
679       authenticate(self, args, connections[i], function(err) {
680         connectionsCount = connectionsCount - 1;
681
682         // Store the error
683         if(err) error = err;
684
685         // Processed all connections
686         if(connectionsCount == 0) {
687           // Auth finished
688           self.authenticating = false;
689           // Add the connections back to available connections
690           self.availableConnections = self.availableConnections.concat(connections);
691           // We had an error, return it
692           if(error) {
693             // Log the error
694             if(self.logger.isError()) {
695               self.logger.error(f('[%s] failed to authenticate against server %s:%s'
696                 , self.id, self.options.host, self.options.port));
697             }
698
699             return cb(error);
700           }
701           cb(null);
702         }
703       });
704     }
705   }
706
707   // Wait for a logout in process to happen
708   function waitForLogout(self, cb) {
709     if(!self.loggingout) return cb();
710     setTimeout(function() {
711       waitForLogout(self, cb);
712     }, 1)
713   }
714
715   // Wait for loggout to finish
716   waitForLogout(self, function() {
717     // Authenticate all live connections
718     authenticateLiveConnections(self, args, function(err) {
719       // Credentials correctly stored in auth provider if successful
720       // Any new connections will now reauthenticate correctly
721       self.authenticating = false;
722       // Return after authentication connections
723       callback(err);
724     });
725   });
726 }
727
728 /**
729  * Logout all users against a database
730  * @method
731  * @param {string} dbName The database name
732  * @param {authResultCallback} callback A callback function
733  */
734 Pool.prototype.logout = function(dbName, callback) {
735   var self = this;
736   if(typeof dbName != 'string') {
737     throw new MongoError('logout method requires a db name as first argument');
738   }
739
740   if(typeof callback != 'function') {
741     throw new MongoError('logout method requires a callback');
742   }
743
744   // Indicate logout in process
745   this.loggingout = true;
746
747   // Get all relevant connections
748   var connections = self.availableConnections.concat(self.inUseConnections);
749   var count = connections.length;
750   // Store any error
751   var error = null;
752
753   // Send logout command over all the connections
754   for(var i = 0; i < connections.length; i++) {
755     write(self)(connections[i], new Query(this.options.bson
756       , f('%s.$cmd', dbName)
757       , {logout:1}, {numberToSkip: 0, numberToReturn: 1}), function(err, r) {
758       count = count - 1;
759       if(err) error = err;
760
761       if(count == 0) {
762         self.loggingout = false;
763         callback(error);
764       };
765     });
766   }
767 }
768
769 /**
770  * Unref the pool
771  * @method
772  */
773 Pool.prototype.unref = function() {
774   // Get all the known connections
775   var connections = this.availableConnections
776     .concat(this.inUseConnections)
777     .concat(this.connectingConnections);
778   connections.forEach(function(c) {
779     c.unref();
780   });
781 }
782
783 // Events
784 var events = ['error', 'close', 'timeout', 'parseError', 'connect'];
785
786 // Destroy the connections
787 function destroy(self, connections) {
788   // Destroy all connections
789   connections.forEach(function(c) {
790     // Remove all listeners
791     for(var i = 0; i < events.length; i++) {
792       c.removeAllListeners(events[i]);
793     }
794     // Destroy connection
795     c.destroy();
796   });
797
798   // Zero out all connections
799   self.inUseConnections = [];
800   self.availableConnections = [];
801   self.nonAuthenticatedConnections = [];
802   self.connectingConnections = [];
803
804   // Set state to destroyed
805   stateTransition(self, DESTROYED);
806 }
807
808 /**
809  * Destroy pool
810  * @method
811  */
812 Pool.prototype.destroy = function(force) {
813   var self = this;
814   // Do not try again if the pool is already dead
815   if(this.state == DESTROYED || self.state == DESTROYING) return;
816   // Set state to destroyed
817   stateTransition(this, DESTROYING);
818
819   // Are we force closing
820   if(force) {
821     // Get all the known connections
822     var connections = self.availableConnections
823       .concat(self.inUseConnections)
824       .concat(self.nonAuthenticatedConnections)
825       .concat(self.connectingConnections);
826     return destroy(self, connections);
827   }
828
829   // Wait for the operations to drain before we close the pool
830   function checkStatus() {
831     if(self.queue.length == 0) {
832       // Get all the known connections
833       var connections = self.availableConnections
834         .concat(self.inUseConnections)
835         .concat(self.nonAuthenticatedConnections)
836         .concat(self.connectingConnections);
837
838       // Check if we have any in flight operations
839       for(var i = 0; i < connections.length; i++) {
840         // There is an operation still in flight, reschedule a
841         // check waiting for it to drain
842         if(connections[i].workItems.length > 0) {
843           return setTimeout(checkStatus, 1);
844         }
845       }
846
847       destroy(self, connections);
848     } else {
849       setTimeout(checkStatus, 1);
850     }
851   }
852
853   // Initiate drain of operations
854   checkStatus();
855 }
856
857 /**
858  * Write a message to MongoDB
859  * @method
860  * @return {Connection}
861  */
862 Pool.prototype.write = function(commands, options, cb) {
863   var self = this;
864   // Ensure we have a callback
865   if(typeof options == 'function') {
866     cb = options;
867   }
868
869   // Always have options
870   options = options || {};
871
872   // Pool was destroyed error out
873   if(this.state == DESTROYED || this.state == DESTROYING) {
874     // Callback with an error
875     if(cb) {
876       try {
877         cb(new MongoError('pool destroyed'));
878       } catch(err) {
879         process.nextTick(function() {
880           throw err;
881         });
882       }
883     }
884
885     return;
886   }
887
888   if(this.options.domainsEnabled
889     && process.domain && typeof cb === "function") {
890     // if we have a domain bind to it
891     var oldCb = cb;
892     cb = process.domain.bind(function() {
893       // v8 - argumentsToArray one-liner
894       var args = new Array(arguments.length); for(var i = 0; i < arguments.length; i++) { args[i] = arguments[i]; }
895       // bounce off event loop so domain switch takes place
896       process.nextTick(function() {
897         oldCb.apply(null, args);
898       });
899     });
900   }
901
902   // Do we have an operation
903   var operation = {
904     cb: cb, raw: false, promoteLongs: true, promoteValues: true, promoteBuffers: false, fullResult: false
905   };
906
907   var buffer = null
908
909   if(Array.isArray(commands)) {
910     buffer = [];
911
912     for(var i = 0; i < commands.length; i++) {
913       buffer.push(commands[i].toBin());
914     }
915
916     // Get the requestId
917     operation.requestId = commands[commands.length - 1].requestId;
918   } else {
919     operation.requestId = commands.requestId;
920     buffer = commands.toBin();
921   }
922
923   // Set the buffers
924   operation.buffer = buffer;
925
926   // Set the options for the parsing
927   operation.promoteLongs = typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true;
928   operation.promoteValues = typeof options.promoteValues == 'boolean' ? options.promoteValues : true;
929   operation.promoteBuffers = typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false;
930   operation.raw = typeof options.raw == 'boolean' ? options.raw : false;
931   operation.immediateRelease = typeof options.immediateRelease == 'boolean' ? options.immediateRelease : false;
932   operation.documentsReturnedIn = options.documentsReturnedIn;
933   operation.command = typeof options.command == 'boolean' ? options.command : false;
934   operation.fullResult = typeof options.fullResult == 'boolean' ? options.fullResult : false;
935   // operation.requestId = options.requestId;
936
937   // Optional per operation socketTimeout
938   operation.socketTimeout = options.socketTimeout;
939   operation.monitoring = options.monitoring;
940
941   // We need to have a callback function unless the message returns no response
942   if(!(typeof cb == 'function') && !options.noResponse) {
943     throw new MongoError('write method must provide a callback');
944   }
945
946   // If we have a monitoring operation schedule as the very first operation
947   // Otherwise add to back of queue
948   if(options.monitoring) {
949     this.queue.unshift(operation);
950   } else {
951     this.queue.push(operation);
952   }
953
954   // Attempt to execute the operation
955   if(!self.executing) {
956     process.nextTick(function() {
957       _execute(self)();
958     });
959   }
960 }
961
962 // Remove connection method
963 function remove(connection, connections) {
964   for(var i = 0; i < connections.length; i++) {
965     if(connections[i] === connection) {
966       connections.splice(i, 1);
967       return true;
968     }
969   }
970 }
971
972 function removeConnection(self, connection) {
973   if(remove(connection, self.availableConnections)) return;
974   if(remove(connection, self.inUseConnections)) return;
975   if(remove(connection, self.connectingConnections)) return;
976   if(remove(connection, self.nonAuthenticatedConnections)) return;
977 }
978
979 // All event handlers
980 var handlers = ["close", "message", "error", "timeout", "parseError", "connect"];
981
982 function _createConnection(self) {
983   var connection = new Connection(messageHandler(self), self.options);
984
985   // Push the connection
986   self.connectingConnections.push(connection);
987
988   // Handle any errors
989   var tempErrorHandler = function(_connection) {
990     return function(err) {
991       // Destroy the connection
992       _connection.destroy();
993       // Remove the connection from the connectingConnections list
994       removeConnection(self, _connection);
995       // Start reconnection attempts
996       if(!self.reconnectId && self.options.reconnect) {
997         self.reconnectId = setTimeout(attemptReconnect(self), self.options.reconnectInterval);
998       }
999     }
1000   }
1001
1002   // Handle successful connection
1003   var tempConnectHandler = function(_connection) {
1004     return function() {
1005       // Destroyed state return
1006       if(self.state == DESTROYED || self.state == DESTROYING) {
1007         // Remove the connection from the list
1008         removeConnection(self, _connection);
1009         return _connection.destroy();
1010       }
1011
1012       // Destroy all event emitters
1013       handlers.forEach(function(e) {
1014         _connection.removeAllListeners(e);
1015       });
1016
1017       // Add the final handlers
1018       _connection.once('close', connectionFailureHandler(self, 'close'));
1019       _connection.once('error', connectionFailureHandler(self, 'error'));
1020       _connection.once('timeout', connectionFailureHandler(self, 'timeout'));
1021       _connection.once('parseError', connectionFailureHandler(self, 'parseError'));
1022
1023       // Signal
1024       reauthenticate(self, _connection, function(err) {
1025         if(self.state == DESTROYED || self.state == DESTROYING) {
1026           return _connection.destroy();
1027         }
1028         // Remove the connection from the connectingConnections list
1029         removeConnection(self, _connection);
1030
1031         // Handle error
1032         if(err) {
1033           return _connection.destroy();
1034         }
1035
1036         // If we are authenticating at the moment
1037         // Do not automatially put in available connections
1038         // As we need to apply the credentials first
1039         if(self.authenticating) {
1040           self.nonAuthenticatedConnections.push(_connection);
1041         } else {
1042           // Push to available
1043           self.availableConnections.push(_connection);
1044           // Execute any work waiting
1045           _execute(self)();
1046         }
1047       });
1048     }
1049   }
1050
1051   // Add all handlers
1052   connection.once('close', tempErrorHandler(connection));
1053   connection.once('error', tempErrorHandler(connection));
1054   connection.once('timeout', tempErrorHandler(connection));
1055   connection.once('parseError', tempErrorHandler(connection));
1056   connection.once('connect', tempConnectHandler(connection));
1057
1058   // Start connection
1059   connection.connect();
1060 }
1061
1062 function flushMonitoringOperations(queue) {
1063   for(var i = 0; i < queue.length; i++) {
1064     if(queue[i].monitoring) {
1065       var workItem = queue[i];
1066       queue.splice(i, 1);
1067       workItem.cb(new MongoError({ message: 'no connection available for monitoring', driver:true }));
1068     }
1069   }
1070 }
1071
1072 function _execute(self) {
1073   return function() {
1074     if(self.state == DESTROYED) return;
1075     // Already executing, skip
1076     if(self.executing) return;
1077     // Set pool as executing
1078     self.executing = true;
1079
1080     // Wait for auth to clear before continuing
1081     function waitForAuth(cb) {
1082       if(!self.authenticating) return cb();
1083       // Wait for a milisecond and try again
1084       setTimeout(function() {
1085         waitForAuth(cb);
1086       }, 1);
1087     }
1088
1089     // Block on any auth in process
1090     waitForAuth(function() {
1091       // As long as we have available connections
1092       while(true) {
1093         // Total availble connections
1094         var totalConnections = self.availableConnections.length
1095           + self.connectingConnections.length
1096           + self.inUseConnections.length;
1097
1098         // No available connections available, flush any monitoring ops
1099         if(self.availableConnections.length == 0) {
1100           // Flush any monitoring operations
1101           flushMonitoringOperations(self.queue);
1102           break;
1103         }
1104
1105         // No queue break
1106         if(self.queue.length == 0) {
1107           break;
1108         }
1109
1110         // Get a connection
1111         // var connection = self.availableConnections.pop();
1112         var connection = self.availableConnections[self.connectionIndex++ % self.availableConnections.length];
1113         // Is the connection connected
1114         if(connection.isConnected()) {
1115           // Get the next work item
1116           var workItem = self.queue.shift();
1117
1118           // Get actual binary commands
1119           var buffer = workItem.buffer;
1120
1121           // Set current status of authentication process
1122           workItem.authenticating = self.authenticating;
1123           workItem.authenticatingTimestamp = self.authenticatingTimestamp;
1124
1125           // Add current associated callback to the connection
1126           // connection.workItem = workItem
1127           connection.workItems.push(workItem);
1128
1129           // We have a custom socketTimeout
1130           if(!workItem.immediateRelease && typeof workItem.socketTimeout == 'number') {
1131             connection.setSocketTimeout(workItem.socketTimeout);
1132           }
1133
1134           // Put operation on the wire
1135           if(Array.isArray(buffer)) {
1136             for(var i = 0; i < buffer.length; i++) {
1137               connection.write(buffer[i])
1138             }
1139           } else {
1140             connection.write(buffer);
1141           }
1142
1143           if(workItem.immediateRelease && self.authenticating) {
1144             self.nonAuthenticatedConnections.push(connection);
1145           }
1146
1147           // Have we not reached the max connection size yet
1148           if(totalConnections < self.options.size
1149             && self.queue.length > 0) {
1150             // Create a new connection
1151             _createConnection(self);
1152           }
1153         } else {
1154           flushMonitoringOperations(self.queue);
1155         }
1156       }
1157     });
1158
1159     self.executing = false;
1160   }
1161 }
1162
1163 var connectionId = 0
1164 /**
1165  * A server connect event, used to verify that the connection is up and running
1166  *
1167  * @event Pool#connect
1168  * @type {Pool}
1169  */
1170
1171 /**
1172  * A server reconnect event, used to verify that pool reconnected.
1173  *
1174  * @event Pool#reconnect
1175  * @type {Pool}
1176  */
1177
1178 /**
1179  * The server connection closed, all pool connections closed
1180  *
1181  * @event Pool#close
1182  * @type {Pool}
1183  */
1184
1185 /**
1186  * The server connection caused an error, all pool connections closed
1187  *
1188  * @event Pool#error
1189  * @type {Pool}
1190  */
1191
1192 /**
1193  * The server connection timed out, all pool connections closed
1194  *
1195  * @event Pool#timeout
1196  * @type {Pool}
1197  */
1198
1199 /**
1200  * The driver experienced an invalid message, all pool connections closed
1201  *
1202  * @event Pool#parseError
1203  * @type {Pool}
1204  */
1205
1206 /**
1207  * The driver attempted to reconnect
1208  *
1209  * @event Pool#attemptReconnect
1210  * @type {Pool}
1211  */
1212
1213 /**
1214  * The driver exhausted all reconnect attempts
1215  *
1216  * @event Pool#reconnectFailed
1217  * @type {Pool}
1218  */
1219
1220 module.exports = Pool;