d0ffcff00bb9d47c6d4ce383744e10909b446b51
[aai/esr-gui.git] /
1 "use strict"
2
3 var inherits = require('util').inherits,
4   f = require('util').format,
5   EventEmitter = require('events').EventEmitter,
6   BasicCursor = require('../cursor'),
7   Logger = require('../connection/logger'),
8   retrieveBSON = require('../connection/utils').retrieveBSON,
9   MongoError = require('../error'),
10   Server = require('./server'),
11   assign = require('./shared').assign,
12   clone = require('./shared').clone,
13   createClientInfo = require('./shared').createClientInfo;
14
15 var BSON = retrieveBSON();
16
17 /**
18  * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
19  * used to construct connections.
20  *
21  * @example
22  * var Mongos = require('mongodb-core').Mongos
23  *   , ReadPreference = require('mongodb-core').ReadPreference
24  *   , assert = require('assert');
25  *
26  * var server = new Mongos([{host: 'localhost', port: 30000}]);
27  * // Wait for the connection event
28  * server.on('connect', function(server) {
29  *   server.destroy();
30  * });
31  *
32  * // Start connecting
33  * server.connect();
34  */
35
36 var MongoCR = require('../auth/mongocr')
37   , X509 = require('../auth/x509')
38   , Plain = require('../auth/plain')
39   , GSSAPI = require('../auth/gssapi')
40   , SSPI = require('../auth/sspi')
41   , ScramSHA1 = require('../auth/scram');
42
43 //
44 // States
45 var DISCONNECTED = 'disconnected';
46 var CONNECTING = 'connecting';
47 var CONNECTED = 'connected';
48 var DESTROYED = 'destroyed';
49
50 function stateTransition(self, newState) {
51   var legalTransitions = {
52     'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
53     'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
54     'connected': [CONNECTED, DISCONNECTED, DESTROYED],
55     'destroyed': [DESTROYED]
56   }
57
58   // Get current state
59   var legalStates = legalTransitions[self.state];
60   if(legalStates && legalStates.indexOf(newState) != -1) {
61     self.state = newState;
62   } else {
63     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
64       , self.id, self.state, newState, legalStates));
65   }
66 }
67
68 //
69 // ReplSet instance id
70 var id = 1;
71 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
72
73 /**
74  * Creates a new Mongos instance
75  * @class
76  * @param {array} seedlist A list of seeds for the replicaset
77  * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
78  * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
79  * @param {number} [options.size=5] Server connection pool size
80  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
81  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
82  * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
83  * @param {boolean} [options.noDelay=true] TCP Connection no delay
84  * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
85  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
86  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
87  * @param {boolean} [options.ssl=false] Use SSL for connection
88  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
89  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
90  * @param {Buffer} [options.cert] SSL Certificate binary buffer
91  * @param {Buffer} [options.key] SSL Key file binary buffer
92  * @param {string} [options.passphrase] SSL Certificate pass phrase
93  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
94  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
95  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
96  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
97  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
98  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
99  * @return {Mongos} A cursor instance
100  * @fires Mongos#connect
101  * @fires Mongos#reconnect
102  * @fires Mongos#joined
103  * @fires Mongos#left
104  * @fires Mongos#failed
105  * @fires Mongos#fullsetup
106  * @fires Mongos#all
107  * @fires Mongos#serverHeartbeatStarted
108  * @fires Mongos#serverHeartbeatSucceeded
109  * @fires Mongos#serverHeartbeatFailed
110  * @fires Mongos#topologyOpening
111  * @fires Mongos#topologyClosed
112  * @fires Mongos#topologyDescriptionChanged
113  * @property {string} type the topology type.
114  * @property {string} parserType the parser type used (c++ or js).
115  */
116 var Mongos = function(seedlist, options) {
117   options = options || {};
118
119   // Get replSet Id
120   this.id = id++;
121
122   // Internal state
123   this.s = {
124     options: assign({}, options),
125     // BSON instance
126     bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
127       BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
128       BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
129     // Factory overrides
130     Cursor: options.cursorFactory || BasicCursor,
131     // Logger instance
132     logger: Logger('Mongos', options),
133     // Seedlist
134     seedlist: seedlist,
135     // Ha interval
136     haInterval: options.haInterval ? options.haInterval : 10000,
137     // Disconnect handler
138     disconnectHandler: options.disconnectHandler,
139     // Server selection index
140     index: 0,
141     // Connect function options passed in
142     connectOptions: {},
143     // Are we running in debug mode
144     debug: typeof options.debug == 'boolean' ? options.debug : false,
145     // localThresholdMS
146     localThresholdMS: options.localThresholdMS || 15,
147     // Client info
148     clientInfo: createClientInfo(options)
149   }
150
151   // Set the client info
152   this.s.options.clientInfo = createClientInfo(options);
153
154   // Log info warning if the socketTimeout < haInterval as it will cause
155   // a lot of recycled connections to happen.
156   if(this.s.logger.isWarn()
157     && this.s.options.socketTimeout != 0
158     && this.s.options.socketTimeout < this.s.haInterval) {
159       this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
160         , this.s.options.socketTimeout, this.s.haInterval));
161   }
162
163   // All the authProviders
164   this.authProviders = options.authProviders || {
165       'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
166     , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
167     , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
168   }
169
170   // Disconnected state
171   this.state = DISCONNECTED;
172
173   // Current proxies we are connecting to
174   this.connectingProxies = [];
175   // Currently connected proxies
176   this.connectedProxies = [];
177   // Disconnected proxies
178   this.disconnectedProxies = [];
179   // Are we authenticating
180   this.authenticating = false;
181   // Index of proxy to run operations against
182   this.index = 0;
183   // High availability timeout id
184   this.haTimeoutId = null;
185   // Last ismaster
186   this.ismaster = null;
187
188   // Add event listener
189   EventEmitter.call(this);
190 }
191
192 inherits(Mongos, EventEmitter);
193
194 Object.defineProperty(Mongos.prototype, 'type', {
195   enumerable:true, get: function() { return 'mongos'; }
196 });
197
198 Object.defineProperty(Mongos.prototype, 'parserType', {
199   enumerable:true, get: function() {
200     return BSON.native ? "c++" : "js";
201   }
202 });
203
204 /**
205  * Emit event if it exists
206  * @method
207  */
208 function emitSDAMEvent(self, event, description) {
209   if(self.listeners(event).length > 0) {
210     self.emit(event, description);
211   }
212 }
213
214 /**
215  * Initiate server connect
216  * @method
217  * @param {array} [options.auth=null] Array of auth options to apply on connect
218  */
219 Mongos.prototype.connect = function(options) {
220   var self = this;
221   // Add any connect level options to the internal state
222   this.s.connectOptions = options || {};
223   // Set connecting state
224   stateTransition(this, CONNECTING);
225   // Create server instances
226   var servers = this.s.seedlist.map(function(x) {
227     return new Server(assign({}, self.s.options, x, {
228       authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
229     }, {
230       clientInfo: clone(self.s.clientInfo)
231     }));
232   });
233
234   // Emit the topology opening event
235   emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
236
237   // Start all server connections
238   connectProxies(self, servers);
239 }
240
241 function handleEvent(self) {
242   return function() {
243     if(self.state == DESTROYED) return;
244     // Move to list of disconnectedProxies
245     moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
246     // Emit the left signal
247     self.emit('left', 'mongos', this);
248   }
249 }
250
251 function handleInitialConnectEvent(self, event) {
252   return function() {
253     // Destroy the instance
254     if(self.state == DESTROYED) {
255       // Move from connectingProxies
256       moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
257       return this.destroy();
258     }
259
260     // Check the type of server
261     if(event == 'connect') {
262       // Get last known ismaster
263       self.ismaster = this.lastIsMaster();
264
265       // Is this not a proxy, remove t
266       if(self.ismaster.msg == 'isdbgrid') {
267         // Add to the connectd list
268         for(var i = 0; i < self.connectedProxies.length; i++) {
269           if(self.connectedProxies[i].name == this.name) {
270             // Move from connectingProxies
271             moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
272             this.destroy();
273             return self.emit('failed', this);
274           }
275         }
276
277         // Remove the handlers
278         for(i = 0; i < handlers.length; i++) {
279           this.removeAllListeners(handlers[i]);
280         }
281
282         // Add stable state handlers
283         this.on('error', handleEvent(self, 'error'));
284         this.on('close', handleEvent(self, 'close'));
285         this.on('timeout', handleEvent(self, 'timeout'));
286         this.on('parseError', handleEvent(self, 'parseError'));
287
288         // Move from connecting proxies connected
289         moveServerFrom(self.connectingProxies, self.connectedProxies, this);
290         // Emit the joined event
291         self.emit('joined', 'mongos', this);
292       } else {
293
294         // Print warning if we did not find a mongos proxy
295         if(self.s.logger.isWarn()) {
296           var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
297           // We have a standalone server
298           if(!self.ismaster.hosts) {
299             message = 'expected mongos proxy, but found standalone mongod for server %s';
300           }
301
302           self.s.logger.warn(f(message, this.name));
303         }
304
305         // This is not a mongos proxy, remove it completely
306         removeProxyFrom(self.connectingProxies, this);
307         // Emit the left event
308         self.emit('left', 'server', this);
309         // Emit failed event
310         self.emit('failed', this);
311       }
312     } else {
313       moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
314       // Emit the left event
315       self.emit('left', 'mongos', this);
316       // Emit failed event
317       self.emit('failed', this);
318     }
319
320     // Trigger topologyMonitor
321     if(self.connectingProxies.length == 0) {
322       // Emit connected if we are connected
323       if(self.connectedProxies.length > 0) {
324         // Set the state to connected
325         stateTransition(self, CONNECTED);
326         // Emit the connect event
327         self.emit('connect', self);
328         self.emit('fullsetup', self);
329         self.emit('all', self);
330       } else if(self.disconnectedProxies.length == 0) {
331         // Print warning if we did not find a mongos proxy
332         if(self.s.logger.isWarn()) {
333           self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset'));
334         }
335
336         // Emit the error that no proxies were found
337         return self.emit('error', new MongoError('no mongos proxies found in seed list'));
338       }
339
340       // Topology monitor
341       topologyMonitor(self, {firstConnect:true});
342     }
343   };
344 }
345
346 function connectProxies(self, servers) {
347   // Update connectingProxies
348   self.connectingProxies = self.connectingProxies.concat(servers);
349
350   // Index used to interleaf the server connects, avoiding
351   // runtime issues on io constrained vm's
352   var timeoutInterval = 0;
353
354   function connect(server, timeoutInterval) {
355     setTimeout(function() {
356       // Add event handlers
357       server.once('close', handleInitialConnectEvent(self, 'close'));
358       server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
359       server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
360       server.once('error', handleInitialConnectEvent(self, 'error'));
361       server.once('connect', handleInitialConnectEvent(self, 'connect'));
362       // SDAM Monitoring events
363       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
364       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
365       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
366       // Start connection
367       server.connect(self.s.connectOptions);
368     }, timeoutInterval);
369   }
370   // Start all the servers
371   while(servers.length > 0) {
372     connect(servers.shift(), timeoutInterval++);
373   }
374 }
375
376 function pickProxy(self) {
377   // Get the currently connected Proxies
378   var connectedProxies = self.connectedProxies.slice(0);
379
380   // Set lower bound
381   var lowerBoundLatency = Number.MAX_VALUE;
382
383   // Determine the lower bound for the Proxies
384   for(var i = 0; i < connectedProxies.length; i++) {
385     if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
386       lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
387     }
388   }
389
390   // Filter out the possible servers
391   connectedProxies = connectedProxies.filter(function(server) {
392     if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS))
393       && server.isConnected()) {
394       return true;
395     }
396   });
397
398   // We have no connectedProxies pick first of the connected ones
399   if(connectedProxies.length == 0) {
400     return self.connectedProxies[0];
401   }
402
403   // Get proxy
404   var proxy = connectedProxies[self.index % connectedProxies.length];
405   // Update the index
406   self.index = (self.index + 1) % connectedProxies.length;
407   // Return the proxy
408   return proxy;
409 }
410
411 function moveServerFrom(from, to, proxy) {
412   for(var i = 0; i < from.length; i++) {
413     if(from[i].name == proxy.name) {
414       from.splice(i, 1);
415     }
416   }
417
418   for(i = 0; i < to.length; i++) {
419     if(to[i].name == proxy.name) {
420       to.splice(i, 1);
421     }
422   }
423
424   to.push(proxy);
425 }
426
427 function removeProxyFrom(from, proxy) {
428   for(var i = 0; i < from.length; i++) {
429     if(from[i].name == proxy.name) {
430       from.splice(i, 1);
431     }
432   }
433 }
434
435 function reconnectProxies(self, proxies, callback) {
436   // Count lefts
437   var count = proxies.length;
438
439   // Handle events
440   var _handleEvent = function(self, event) {
441     return function() {
442       var _self = this;
443       count = count - 1;
444
445       // Destroyed
446       if(self.state == DESTROYED) {
447         moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
448         return this.destroy();
449       }
450
451       if(event == 'connect' && !self.authenticating) {
452         // Destroyed
453         if(self.state == DESTROYED) {
454           moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
455           return _self.destroy();
456         }
457
458         // Remove the handlers
459         for(var i = 0; i < handlers.length; i++) {
460           _self.removeAllListeners(handlers[i]);
461         }
462
463         // Add stable state handlers
464         _self.on('error', handleEvent(self, 'error'));
465         _self.on('close', handleEvent(self, 'close'));
466         _self.on('timeout', handleEvent(self, 'timeout'));
467         _self.on('parseError', handleEvent(self, 'parseError'));
468
469         // Move to the connected servers
470         moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self);
471         // Emit joined event
472         self.emit('joined', 'mongos', _self);
473       } else if(event == 'connect' && self.authenticating) {
474         // Move from connectingProxies
475         moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
476         this.destroy();
477       }
478
479       // Are we done finish up callback
480       if(count == 0) {
481         callback();
482       }
483     }
484   }
485
486   // No new servers
487   if(count == 0) {
488     return callback();
489   }
490
491   // Execute method
492   function execute(_server, i) {
493     setTimeout(function() {
494       // Destroyed
495       if(self.state == DESTROYED) {
496         return;
497       }
498
499       // Create a new server instance
500       var server = new Server(assign({}, self.s.options, {
501         host: _server.name.split(':')[0],
502         port: parseInt(_server.name.split(':')[1], 10)
503       }, {
504         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
505       }, {
506         clientInfo: clone(self.s.clientInfo)
507       }));
508
509       // Add temp handlers
510       server.once('connect', _handleEvent(self, 'connect'));
511       server.once('close', _handleEvent(self, 'close'));
512       server.once('timeout', _handleEvent(self, 'timeout'));
513       server.once('error', _handleEvent(self, 'error'));
514       server.once('parseError', _handleEvent(self, 'parseError'));
515
516       // SDAM Monitoring events
517       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
518       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
519       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
520       server.connect(self.s.connectOptions);
521     }, i);
522   }
523
524   // Create new instances
525   for(var i = 0; i < proxies.length; i++) {
526     execute(proxies[i], i);
527   }
528 }
529
530 function topologyMonitor(self, options) {
531   options = options || {};
532
533   // Set momitoring timeout
534   self.haTimeoutId = setTimeout(function() {
535     if(self.state == DESTROYED) return;
536     // If we have a primary and a disconnect handler, execute
537     // buffered operations
538     if(self.isConnected() && self.s.disconnectHandler) {
539       self.s.disconnectHandler.execute();
540     }
541
542     // Get the connectingServers
543     var proxies = self.connectedProxies.slice(0);
544     // Get the count
545     var count = proxies.length;
546
547     // If the count is zero schedule a new fast
548     function pingServer(_self, _server, cb) {
549       // Measure running time
550       var start = new Date().getTime();
551
552       // Emit the server heartbeat start
553       emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
554
555       // Execute ismaster
556       _server.command('admin.$cmd', {
557         ismaster:true
558       }, {
559         monitoring: true,
560         socketTimeout: self.s.options.connectionTimeout || 2000,
561       }, function(err, r) {
562         if(self.state == DESTROYED) {
563           // Move from connectingProxies
564           moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
565           _server.destroy();
566           return cb(err, r);
567         }
568
569         // Calculate latency
570         var latencyMS = new Date().getTime() - start;
571
572         // We had an error, remove it from the state
573         if(err) {
574           // Emit the server heartbeat failure
575           emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
576           // Move from connected proxies to disconnected proxies
577           moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
578         } else {
579           // Update the server ismaster
580           _server.ismaster = r.result;
581           _server.lastIsMasterMS = latencyMS;
582
583           // Server heart beat event
584           emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
585         }
586
587         cb(err, r);
588       });
589     }
590
591     // No proxies initiate monitor again
592     if(proxies.length == 0) {
593       // Emit close event if any listeners registered
594       if(self.listeners("close").length > 0 && self.state == CONNECTING) {
595         self.emit('error', new MongoError('no mongos proxy available'));
596       } else {
597         self.emit('close', self);
598       }
599
600       // Attempt to connect to any unknown servers
601       return reconnectProxies(self, self.disconnectedProxies, function() {
602         if(self.state == DESTROYED) return;
603
604         // Are we connected ? emit connect event
605         if(self.state == CONNECTING && options.firstConnect) {
606           self.emit('connect', self);
607           self.emit('fullsetup', self);
608           self.emit('all', self);
609         } else if(self.isConnected()) {
610           self.emit('reconnect', self);
611         } else if(!self.isConnected() && self.listeners("close").length > 0) {
612           self.emit('close', self);
613         }
614
615         // Perform topology monitor
616         topologyMonitor(self);
617       });
618     }
619
620     // Ping all servers
621     for(var i = 0; i < proxies.length; i++) {
622       pingServer(self, proxies[i], function() {
623         count = count - 1;
624
625         if(count == 0) {
626           if(self.state == DESTROYED) return;
627
628           // Attempt to connect to any unknown servers
629           reconnectProxies(self, self.disconnectedProxies, function() {
630             if(self.state == DESTROYED) return;
631             // Perform topology monitor
632             topologyMonitor(self);
633           });
634         }
635       });
636     }
637   }, self.s.haInterval);
638 }
639
640 /**
641  * Returns the last known ismaster document for this server
642  * @method
643  * @return {object}
644  */
645 Mongos.prototype.lastIsMaster = function() {
646   return this.ismaster;
647 }
648
649 /**
650  * Unref all connections belong to this server
651  * @method
652  */
653 Mongos.prototype.unref = function() {
654   // Transition state
655   stateTransition(this, DISCONNECTED);
656   // Get all proxies
657   var proxies = this.connectedProxies.concat(this.connectingProxies);
658   proxies.forEach(function(x) {
659     x.unref();
660   });
661
662   clearTimeout(this.haTimeoutId);
663 }
664
665 /**
666  * Destroy the server connection
667  * @param {boolean} [options.force=false] Force destroy the pool
668  * @method
669  */
670 Mongos.prototype.destroy = function(options) {
671   // Transition state
672   stateTransition(this, DESTROYED);
673   // Get all proxies
674   var proxies = this.connectedProxies.concat(this.connectingProxies);
675   // Clear out any monitoring process
676   if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
677
678   // Destroy all connecting servers
679   proxies.forEach(function(x) {
680     x.destroy(options);
681   });
682
683   // Emit toplogy closing event
684   emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
685 }
686
687 /**
688  * Figure out if the server is connected
689  * @method
690  * @return {boolean}
691  */
692 Mongos.prototype.isConnected = function() {
693   return this.connectedProxies.length > 0;
694 }
695
696 /**
697  * Figure out if the server instance was destroyed by calling destroy
698  * @method
699  * @return {boolean}
700  */
701 Mongos.prototype.isDestroyed = function() {
702   return this.state == DESTROYED;
703 }
704
705 //
706 // Operations
707 //
708
709 // Execute write operation
710 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
711   if(typeof options == 'function') callback = options, options = {}, options = options || {};
712   // Ensure we have no options
713   options = options || {};
714   // Pick a server
715   var server = pickProxy(self);
716   // No server found error out
717   if(!server) return callback(new MongoError('no mongos proxy available'));
718   // Execute the command
719   server[op](ns, ops, options, callback);
720 }
721
722 /**
723  * Insert one or more documents
724  * @method
725  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
726  * @param {array} ops An array of documents to insert
727  * @param {boolean} [options.ordered=true] Execute in order or out of order
728  * @param {object} [options.writeConcern={}] Write concern for the operation
729  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
730  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
731  * @param {opResultCallback} callback A callback function
732  */
733 Mongos.prototype.insert = function(ns, ops, options, callback) {
734   if(typeof options == 'function') callback = options, options = {}, options = options || {};
735   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
736
737   // Not connected but we have a disconnecthandler
738   if(!this.isConnected() && this.s.disconnectHandler != null) {
739     return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
740   }
741
742   // No mongos proxy available
743   if(!this.isConnected()) {
744     return callback(new MongoError('no mongos proxy available'));
745   }
746
747   // Execute write operation
748   executeWriteOperation(this, 'insert', ns, ops, options, callback);
749 }
750
751 /**
752  * Perform one or more update operations
753  * @method
754  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
755  * @param {array} ops An array of updates
756  * @param {boolean} [options.ordered=true] Execute in order or out of order
757  * @param {object} [options.writeConcern={}] Write concern for the operation
758  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
759  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
760  * @param {opResultCallback} callback A callback function
761  */
762 Mongos.prototype.update = function(ns, ops, options, callback) {
763   if(typeof options == 'function') callback = options, options = {}, options = options || {};
764   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
765
766   // Not connected but we have a disconnecthandler
767   if(!this.isConnected() && this.s.disconnectHandler != null) {
768     return this.s.disconnectHandler.add('update', ns, ops, options, callback);
769   }
770
771   // No mongos proxy available
772   if(!this.isConnected()) {
773     return callback(new MongoError('no mongos proxy available'));
774   }
775
776   // Execute write operation
777   executeWriteOperation(this, 'update', ns, ops, options, callback);
778 }
779
780 /**
781  * Perform one or more remove operations
782  * @method
783  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
784  * @param {array} ops An array of removes
785  * @param {boolean} [options.ordered=true] Execute in order or out of order
786  * @param {object} [options.writeConcern={}] Write concern for the operation
787  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
788  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
789  * @param {opResultCallback} callback A callback function
790  */
791 Mongos.prototype.remove = function(ns, ops, options, callback) {
792   if(typeof options == 'function') callback = options, options = {}, options = options || {};
793   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
794
795   // Not connected but we have a disconnecthandler
796   if(!this.isConnected() && this.s.disconnectHandler != null) {
797     return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
798   }
799
800   // No mongos proxy available
801   if(!this.isConnected()) {
802     return callback(new MongoError('no mongos proxy available'));
803   }
804
805   // Execute write operation
806   executeWriteOperation(this, 'remove', ns, ops, options, callback);
807 }
808
809 /**
810  * Execute a command
811  * @method
812  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
813  * @param {object} cmd The command hash
814  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
815  * @param {Connection} [options.connection] Specify connection object to execute command against
816  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
817  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
818  * @param {opResultCallback} callback A callback function
819  */
820 Mongos.prototype.command = function(ns, cmd, options, callback) {
821   if(typeof options == 'function') callback = options, options = {}, options = options || {};
822   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
823   var self = this;
824
825   // Pick a proxy
826   var server = pickProxy(self);
827
828   // Topology is not connected, save the call in the provided store to be
829   // Executed at some point when the handler deems it's reconnected
830   if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
831     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
832   }
833
834   // No server returned we had an error
835   if(server == null) {
836     return callback(new MongoError('no mongos proxy available'));
837   }
838
839   // Execute the command
840   server.command(ns, cmd, options, callback);
841 }
842
843 /**
844  * Perform one or more remove operations
845  * @method
846  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
847  * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
848  * @param {object} [options.batchSize=0] Batchsize for the operation
849  * @param {array} [options.documents=[]] Initial documents list for cursor
850  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
851  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
852  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
853  * @param {opResultCallback} callback A callback function
854  */
855 Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
856   cursorOptions = cursorOptions || {};
857   var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
858   return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
859 }
860
861 /**
862  * Authenticate using a specified mechanism
863  * @method
864  * @param {string} mechanism The Auth mechanism we are invoking
865  * @param {string} db The db we are invoking the mechanism against
866  * @param {...object} param Parameters for the specific mechanism
867  * @param {authResultCallback} callback A callback function
868  */
869 Mongos.prototype.auth = function(mechanism, db) {
870   var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
871   var self = this;
872   var args = Array.prototype.slice.call(arguments, 2);
873   var callback = args.pop();
874
875   // If we don't have the mechanism fail
876   if(this.authProviders[mechanism] == null && mechanism != 'default') {
877     return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
878   }
879
880   // Are we already authenticating, throw
881   if(this.authenticating) {
882     return callback(new MongoError('authentication or logout allready in process'));
883   }
884
885   // Topology is not connected, save the call in the provided store to be
886   // Executed at some point when the handler deems it's reconnected
887   if(!self.isConnected() && self.s.disconnectHandler != null) {
888     return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
889   }
890
891   // Set to authenticating
892   this.authenticating = true;
893   // All errors
894   var errors = [];
895
896   // Get all the servers
897   var servers = this.connectedProxies.slice(0);
898   // No servers return
899   if(servers.length == 0) {
900     this.authenticating = false;
901     callback(null, true);
902   }
903
904   // Authenticate
905   function auth(server) {
906     // Arguments without a callback
907     var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
908     // Create arguments
909     var finalArguments = argsWithoutCallback.concat([function(err) {
910       count = count - 1;
911       // Save all the errors
912       if(err) errors.push({name: server.name, err: err});
913       // We are done
914       if(count == 0) {
915         // Auth is done
916         self.authenticating = false;
917
918         // Return the auth error
919         if(errors.length) return callback(MongoError.create({
920           message: 'authentication fail', errors: errors
921         }), false);
922
923         // Successfully authenticated session
924         callback(null, self);
925       }
926     }]);
927
928     // Execute the auth only against non arbiter servers
929     if(!server.lastIsMaster().arbiterOnly) {
930       server.auth.apply(server, finalArguments);
931     }
932   }
933
934   // Get total count
935   var count = servers.length;
936   // Authenticate against all servers
937   while(servers.length > 0) {
938     auth(servers.shift());
939   }
940 }
941
942 /**
943  * Logout from a database
944  * @method
945  * @param {string} db The db we are logging out from
946  * @param {authResultCallback} callback A callback function
947  */
948 Mongos.prototype.logout = function(dbName, callback) {
949   var self = this;
950   // Are we authenticating or logging out, throw
951   if(this.authenticating) {
952     throw new MongoError('authentication or logout allready in process');
953   }
954
955   // Ensure no new members are processed while logging out
956   this.authenticating = true;
957
958   // Remove from all auth providers (avoid any reaplication of the auth details)
959   var providers = Object.keys(this.authProviders);
960   for(var i = 0; i < providers.length; i++) {
961     this.authProviders[providers[i]].logout(dbName);
962   }
963
964   // Now logout all the servers
965   var servers = this.connectedProxies.slice(0);
966   var count = servers.length;
967   if(count == 0) return callback();
968   var errors = [];
969
970   function logoutServer(_server, cb) {
971     _server.logout(dbName, function(err) {
972       if(err) errors.push({name: _server.name, err: err});
973       cb();
974     });
975   }
976
977   // Execute logout on all server instances
978   for(i = 0; i < servers.length; i++) {
979     logoutServer(servers[i], function() {
980       count = count - 1;
981
982       if(count == 0) {
983         // Do not block new operations
984         self.authenticating = false;
985         // If we have one or more errors
986         if(errors.length) return callback(MongoError.create({
987           message: f('logout failed against db %s', dbName), errors: errors
988         }), false);
989
990         // No errors
991         callback();
992       }
993     })
994   }
995 }
996
997 /**
998  * Get server
999  * @method
1000  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1001  * @return {Server}
1002  */
1003 Mongos.prototype.getServer = function() {
1004   var server = pickProxy(this);
1005   if(this.s.debug) this.emit('pickedServer', null, server);
1006   return server;
1007 }
1008
1009 /**
1010  * All raw connections
1011  * @method
1012  * @return {Connection[]}
1013  */
1014 Mongos.prototype.connections = function() {
1015   var connections = [];
1016
1017   for(var i = 0; i < this.connectedProxies.length; i++) {
1018     connections = connections.concat(this.connectedProxies[i].connections());
1019   }
1020
1021   return connections;
1022 }
1023
1024 /**
1025  * A mongos connect event, used to verify that the connection is up and running
1026  *
1027  * @event Mongos#connect
1028  * @type {Mongos}
1029  */
1030
1031 /**
1032  * A mongos reconnect event, used to verify that the mongos topology has reconnected
1033  *
1034  * @event Mongos#reconnect
1035  * @type {Mongos}
1036  */
1037
1038 /**
1039  * A mongos fullsetup event, used to signal that all topology members have been contacted.
1040  *
1041  * @event Mongos#fullsetup
1042  * @type {Mongos}
1043  */
1044
1045 /**
1046  * A mongos all event, used to signal that all topology members have been contacted.
1047  *
1048  * @event Mongos#all
1049  * @type {Mongos}
1050  */
1051
1052 /**
1053  * A server member left the mongos list
1054  *
1055  * @event Mongos#left
1056  * @type {Mongos}
1057  * @param {string} type The type of member that left (mongos)
1058  * @param {Server} server The server object that left
1059  */
1060
1061 /**
1062  * A server member joined the mongos list
1063  *
1064  * @event Mongos#joined
1065  * @type {Mongos}
1066  * @param {string} type The type of member that left (mongos)
1067  * @param {Server} server The server object that joined
1068  */
1069
1070 /**
1071  * A server opening SDAM monitoring event
1072  *
1073  * @event Mongos#serverOpening
1074  * @type {object}
1075  */
1076
1077 /**
1078  * A server closed SDAM monitoring event
1079  *
1080  * @event Mongos#serverClosed
1081  * @type {object}
1082  */
1083
1084 /**
1085  * A server description SDAM change monitoring event
1086  *
1087  * @event Mongos#serverDescriptionChanged
1088  * @type {object}
1089  */
1090
1091 /**
1092  * A topology open SDAM event
1093  *
1094  * @event Mongos#topologyOpening
1095  * @type {object}
1096  */
1097
1098 /**
1099  * A topology closed SDAM event
1100  *
1101  * @event Mongos#topologyClosed
1102  * @type {object}
1103  */
1104
1105 /**
1106  * A topology structure SDAM change event
1107  *
1108  * @event Mongos#topologyDescriptionChanged
1109  * @type {object}
1110  */
1111
1112 /**
1113  * A topology serverHeartbeatStarted SDAM event
1114  *
1115  * @event Mongos#serverHeartbeatStarted
1116  * @type {object}
1117  */
1118
1119 /**
1120  * A topology serverHeartbeatFailed SDAM event
1121  *
1122  * @event Mongos#serverHeartbeatFailed
1123  * @type {object}
1124  */
1125
1126 /**
1127  * A topology serverHeartbeatSucceeded SDAM change event
1128  *
1129  * @event Mongos#serverHeartbeatSucceeded
1130  * @type {object}
1131  */
1132
1133 module.exports = Mongos;