7ba62204e5fd83c7e109fdf4416374497f06938d
[aai/esr-gui.git] /
1 "use strict"
2
3 var inherits = require('util').inherits,
4   f = require('util').format,
5   EventEmitter = require('events').EventEmitter,
6   BSON = require('bson').native().BSON,
7   ReadPreference = require('./read_preference'),
8   BasicCursor = require('../cursor'),
9   Logger = require('../connection/logger'),
10   debugOptions = require('../connection/utils').debugOptions,
11   MongoError = require('../error'),
12   Server = require('./server'),
13   ReplSetState = require('./replset_state'),
14   assign = require('./shared').assign,
15   clone = require('./shared').clone,
16   createClientInfo = require('./shared').createClientInfo;
17
18 /**
19  * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
20  * used to construct connections.
21  *
22  * @example
23  * var Mongos = require('mongodb-core').Mongos
24  *   , ReadPreference = require('mongodb-core').ReadPreference
25  *   , assert = require('assert');
26  *
27  * var server = new Mongos([{host: 'localhost', port: 30000}]);
28  * // Wait for the connection event
29  * server.on('connect', function(server) {
30  *   server.destroy();
31  * });
32  *
33  * // Start connecting
34  * server.connect();
35  */
36
37 var MongoCR = require('../auth/mongocr')
38   , X509 = require('../auth/x509')
39   , Plain = require('../auth/plain')
40   , GSSAPI = require('../auth/gssapi')
41   , SSPI = require('../auth/sspi')
42   , ScramSHA1 = require('../auth/scram');
43
44 //
45 // States
46 var DISCONNECTED = 'disconnected';
47 var CONNECTING = 'connecting';
48 var CONNECTED = 'connected';
49 var DESTROYED = 'destroyed';
50
51 function stateTransition(self, newState) {
52   var legalTransitions = {
53     'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
54     'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
55     'connected': [CONNECTED, DISCONNECTED, DESTROYED],
56     'destroyed': [DESTROYED]
57   }
58
59   // Get current state
60   var legalStates = legalTransitions[self.state];
61   if(legalStates && legalStates.indexOf(newState) != -1) {
62     self.state = newState;
63   } else {
64     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
65       , self.id, self.state, newState, legalStates));
66   }
67 }
68
69 //
70 // ReplSet instance id
71 var id = 1;
72 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
73
74 /**
75  * Creates a new Mongos instance
76  * @class
77  * @param {array} seedlist A list of seeds for the replicaset
78  * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
79  * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
80  * @param {number} [options.size=5] Server connection pool size
81  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
82  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
83  * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
84  * @param {boolean} [options.noDelay=true] TCP Connection no delay
85  * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
86  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
87  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
88  * @param {boolean} [options.ssl=false] Use SSL for connection
89  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
90  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
91  * @param {Buffer} [options.cert] SSL Certificate binary buffer
92  * @param {Buffer} [options.key] SSL Key file binary buffer
93  * @param {string} [options.passphrase] SSL Certificate pass phrase
94  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
95  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
96  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
97  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
98  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
99  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
100  * @return {Mongos} A cursor instance
101  * @fires Mongos#connect
102  * @fires Mongos#reconnect
103  * @fires Mongos#joined
104  * @fires Mongos#left
105  * @fires Mongos#failed
106  * @fires Mongos#fullsetup
107  * @fires Mongos#all
108  * @fires Mongos#serverHeartbeatStarted
109  * @fires Mongos#serverHeartbeatSucceeded
110  * @fires Mongos#serverHeartbeatFailed
111  * @fires Mongos#topologyOpening
112  * @fires Mongos#topologyClosed
113  * @fires Mongos#topologyDescriptionChanged
114  */
115 var Mongos = function(seedlist, options) {
116   var self = this;
117   options = options || {};
118
119   // Get replSet Id
120   this.id = id++;
121
122   // Internal state
123   this.s = {
124     options: assign({}, options),
125     // BSON instance
126     bson: options.bson || new BSON(),
127     // Factory overrides
128     Cursor: options.cursorFactory || BasicCursor,
129     // Logger instance
130     logger: Logger('Mongos', options),
131     // Seedlist
132     seedlist: seedlist,
133     // Ha interval
134     haInterval: options.haInterval ? options.haInterval : 10000,
135     // Disconnect handler
136     disconnectHandler: options.disconnectHandler,
137     // Server selection index
138     index: 0,
139     // Connect function options passed in
140     connectOptions: {},
141     // Are we running in debug mode
142     debug: typeof options.debug == 'boolean' ? options.debug : false,
143     // localThresholdMS
144     localThresholdMS: options.localThresholdMS || 15,
145     // Client info
146     clientInfo: createClientInfo(options)
147   }
148
149   // Set the client info
150   this.s.options.clientInfo = createClientInfo(options);
151
152   // Log info warning if the socketTimeout < haInterval as it will cause
153   // a lot of recycled connections to happen.
154   if(this.s.logger.isWarn()
155     && this.s.options.socketTimeout != 0
156     && this.s.options.socketTimeout < this.s.haInterval) {
157       this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
158         , this.s.options.socketTimeout, this.s.haInterval));
159   }
160
161   // All the authProviders
162   this.authProviders = options.authProviders || {
163       'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
164     , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
165     , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
166   }
167
168   // Disconnected state
169   this.state = DISCONNECTED;
170
171   // Current proxies we are connecting to
172   this.connectingProxies = [];
173   // Currently connected proxies
174   this.connectedProxies = [];
175   // Disconnected proxies
176   this.disconnectedProxies = [];
177   // Are we authenticating
178   this.authenticating = false;
179   // Index of proxy to run operations against
180   this.index = 0;
181   // High availability timeout id
182   this.haTimeoutId = null;
183   // Last ismaster
184   this.ismaster = null;
185
186   // Add event listener
187   EventEmitter.call(this);
188 }
189
190 inherits(Mongos, EventEmitter);
191
192 Object.defineProperty(Mongos.prototype, 'type', {
193   enumerable:true, get: function() { return 'mongos'; }
194 });
195
196 /**
197  * Emit event if it exists
198  * @method
199  */
200 function emitSDAMEvent(self, event, description) {
201   if(self.listeners(event).length > 0) {
202     self.emit(event, description);
203   }
204 }
205
206 /**
207  * Initiate server connect
208  * @method
209  * @param {array} [options.auth=null] Array of auth options to apply on connect
210  */
211 Mongos.prototype.connect = function(options) {
212   var self = this;
213   // Add any connect level options to the internal state
214   this.s.connectOptions = options || {};
215   // Set connecting state
216   stateTransition(this, CONNECTING);
217   // Create server instances
218   var servers = this.s.seedlist.map(function(x) {
219     return new Server(assign({}, self.s.options, x, {
220       authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
221     }, {
222       clientInfo: clone(self.s.clientInfo)
223     }));
224   });
225
226   // Emit the topology opening event
227   emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
228
229   // Start all server connections
230   connectProxies(self, servers);
231 }
232
233 function handleEvent(self, event) {
234   return function(err) {
235     if(self.state == DESTROYED) return;
236     // Move to list of disconnectedProxies
237     moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
238     // Emit the left signal
239     self.emit('left', 'mongos', this);
240   }
241 }
242
243 function handleInitialConnectEvent(self, event) {
244   return function(err) {
245     // Destroy the instance
246     if(self.state == DESTROYED) {
247       // Move from connectingProxies
248       moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
249       return this.destroy();
250     }
251
252     // Check the type of server
253     if(event == 'connect') {
254       // Get last known ismaster
255       self.ismaster = this.lastIsMaster();
256
257       // Is this not a proxy, remove t
258       if(self.ismaster.msg == 'isdbgrid') {
259         // Add to the connectd list
260         for(var i = 0; i < self.connectedProxies.length; i++) {
261           if(self.connectedProxies[i].name == this.name) {
262             // Move from connectingProxies
263             moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
264             this.destroy();
265             return self.emit('failed', this);
266           }
267         }
268
269         // Remove the handlers
270         for(var i = 0; i < handlers.length; i++) {
271           this.removeAllListeners(handlers[i]);
272         }
273
274         // Add stable state handlers
275         this.on('error', handleEvent(self, 'error'));
276         this.on('close', handleEvent(self, 'close'));
277         this.on('timeout', handleEvent(self, 'timeout'));
278         this.on('parseError', handleEvent(self, 'parseError'));
279
280         // Move from connecting proxies connected
281         moveServerFrom(self.connectingProxies, self.connectedProxies, this);
282         // Emit the joined event
283         self.emit('joined', 'mongos', this);
284       } else {
285
286         // Print warning if we did not find a mongos proxy
287         if(self.s.logger.isWarn()) {
288           var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
289           // We have a standalone server
290           if(!self.ismaster.hosts) {
291             message = 'expected mongos proxy, but found standalone mongod for server %s';
292           }
293
294           self.s.logger.warn(f(message, this.name));
295         }
296
297         // This is not a mongos proxy, remove it completely
298         removeProxyFrom(self.connectingProxies, this);
299         // Emit the left event
300         self.emit('left', 'server', this);
301         // Emit failed event
302         self.emit('failed', this);
303       }
304     } else {
305       moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
306       // Emit the left event
307       self.emit('left', 'mongos', this);
308       // Emit failed event
309       self.emit('failed', this);
310     }
311
312     // Trigger topologyMonitor
313     if(self.connectingProxies.length == 0) {
314       // Emit connected if we are connected
315       if(self.connectedProxies.length > 0) {
316         // Set the state to connected
317         stateTransition(self, CONNECTED);
318         // Emit the connect event
319         self.emit('connect', self);
320         self.emit('fullsetup', self);
321         self.emit('all', self);
322       } else if(self.disconnectedProxies.length == 0) {
323         // Print warning if we did not find a mongos proxy
324         if(self.s.logger.isWarn()) {
325           self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset'));
326         }
327
328         // Emit the error that no proxies were found
329         return self.emit('error', new MongoError('no mongos proxies found in seed list'));
330       }
331
332       // Topology monitor
333       topologyMonitor(self, {firstConnect:true});
334     }
335   };
336 }
337
338 function connectProxies(self, servers) {
339   // Update connectingProxies
340   self.connectingProxies = self.connectingProxies.concat(servers);
341
342   // Index used to interleaf the server connects, avoiding
343   // runtime issues on io constrained vm's
344   var timeoutInterval = 0;
345
346   function connect(server, timeoutInterval) {
347     setTimeout(function() {
348       // Add event handlers
349       server.once('close', handleInitialConnectEvent(self, 'close'));
350       server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
351       server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
352       server.once('error', handleInitialConnectEvent(self, 'error'));
353       server.once('connect', handleInitialConnectEvent(self, 'connect'));
354       // SDAM Monitoring events
355       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
356       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
357       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
358       // Start connection
359       server.connect(self.s.connectOptions);
360     }, timeoutInterval);
361   }
362   // Start all the servers
363   while(servers.length > 0) {
364     connect(servers.shift(), timeoutInterval++);
365   }
366 }
367
368 function pickProxy(self) {
369   // Get the currently connected Proxies
370   var connectedProxies = self.connectedProxies.slice(0);
371
372   // Set lower bound
373   var lowerBoundLatency = Number.MAX_VALUE;
374
375   // Determine the lower bound for the Proxies
376   for(var i = 0; i < connectedProxies.length; i++) {
377     if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
378       lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
379     }
380   }
381
382   // Filter out the possible servers
383   connectedProxies = connectedProxies.filter(function(server) {
384     if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS))
385       && server.isConnected()) {
386       return true;
387     }
388   });
389
390   // We have no connectedProxies pick first of the connected ones
391   if(connectedProxies.length == 0) {
392     return self.connectedProxies[0];
393   }
394
395   // Get proxy
396   var proxy = connectedProxies[self.index % connectedProxies.length];
397   // Update the index
398   self.index = (self.index + 1) % connectedProxies.length;
399   // Return the proxy
400   return proxy;
401 }
402
403 function moveServerFrom(from, to, proxy) {
404   for(var i = 0; i < from.length; i++) {
405     if(from[i].name == proxy.name) {
406       from.splice(i, 1);
407     }
408   }
409
410   for(var i = 0; i < to.length; i++) {
411     if(to[i].name == proxy.name) {
412       to.splice(i, 1);
413     }
414   }
415
416   to.push(proxy);
417 }
418
419 function removeProxyFrom(from, proxy) {
420   for(var i = 0; i < from.length; i++) {
421     if(from[i].name == proxy.name) {
422       from.splice(i, 1);
423     }
424   }
425 }
426
427 function reconnectProxies(self, proxies, callback) {
428   // Count lefts
429   var count = proxies.length;
430
431   // Handle events
432   var _handleEvent = function(self, event) {
433     return function(err, r) {
434       var _self = this;
435       count = count - 1;
436
437       // Destroyed
438       if(self.state == DESTROYED) {
439         moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
440         return this.destroy();
441       }
442
443       if(event == 'connect' && !self.authenticating) {
444         // Destroyed
445         if(self.state == DESTROYED) {
446           moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
447           return _self.destroy();
448         }
449
450         // Remove the handlers
451         for(var i = 0; i < handlers.length; i++) {
452           _self.removeAllListeners(handlers[i]);
453         }
454
455         // Add stable state handlers
456         _self.on('error', handleEvent(self, 'error'));
457         _self.on('close', handleEvent(self, 'close'));
458         _self.on('timeout', handleEvent(self, 'timeout'));
459         _self.on('parseError', handleEvent(self, 'parseError'));
460
461         // Move to the connected servers
462         moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self);
463         // Emit joined event
464         self.emit('joined', 'mongos', _self);
465       } else if(event == 'connect' && self.authenticating) {
466         // Move from connectingProxies
467         moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
468         this.destroy();
469       }
470
471       // Are we done finish up callback
472       if(count == 0) {
473         callback();
474       }
475     }
476   }
477
478   // No new servers
479   if(count == 0) {
480     return callback();
481   }
482
483   // Execute method
484   function execute(_server, i) {
485     setTimeout(function() {
486       // Destroyed
487       if(self.state == DESTROYED) {
488         return;
489       }
490
491       // Create a new server instance
492       var server = new Server(assign({}, self.s.options, {
493         host: _server.name.split(':')[0],
494         port: parseInt(_server.name.split(':')[1], 10)
495       }, {
496         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
497       }, {
498         clientInfo: clone(self.s.clientInfo)
499       }));
500
501       // Add temp handlers
502       server.once('connect', _handleEvent(self, 'connect'));
503       server.once('close', _handleEvent(self, 'close'));
504       server.once('timeout', _handleEvent(self, 'timeout'));
505       server.once('error', _handleEvent(self, 'error'));
506       server.once('parseError', _handleEvent(self, 'parseError'));
507
508       // SDAM Monitoring events
509       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
510       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
511       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
512       server.connect(self.s.connectOptions);
513     }, i);
514   }
515
516   // Create new instances
517   for(var i = 0; i < proxies.length; i++) {
518     execute(proxies[i], i);
519   }
520 }
521
522 function topologyMonitor(self, options) {
523   options = options || {};
524
525   // Set momitoring timeout
526   self.haTimeoutId = setTimeout(function() {
527     if(self.state == DESTROYED) return;
528     // If we have a primary and a disconnect handler, execute
529     // buffered operations
530     if(self.isConnected() && self.s.disconnectHandler) {
531       self.s.disconnectHandler.execute();
532     }
533
534     // Get the connectingServers
535     var proxies = self.connectedProxies.slice(0);
536     // Get the count
537     var count = proxies.length;
538
539     // If the count is zero schedule a new fast
540     function pingServer(_self, _server, cb) {
541       // Measure running time
542       var start = new Date().getTime();
543
544       // Emit the server heartbeat start
545       emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
546
547       // Execute ismaster
548       _server.command('admin.$cmd', {ismaster:true}, {monitoring: true}, function(err, r) {
549         if(self.state == DESTROYED) {
550           // Move from connectingProxies
551           moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
552           _server.destroy();
553           return cb(err, r);
554         }
555
556         // Calculate latency
557         var latencyMS = new Date().getTime() - start;
558
559         // We had an error, remove it from the state
560         if(err) {
561           // Emit the server heartbeat failure
562           emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
563         } else {
564           // Update the server ismaster
565           _server.ismaster = r.result;
566           _server.lastIsMasterMS = latencyMS;
567
568           // Server heart beat event
569           emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
570         }
571
572         cb(err, r);
573       });
574     }
575
576     // No proxies initiate monitor again
577     if(proxies.length == 0) {
578       // Emit close event if any listeners registered
579       if(self.listeners("close").length > 0) {
580         self.emit('close', self);
581       }
582
583       // Attempt to connect to any unknown servers
584       return reconnectProxies(self, self.disconnectedProxies, function(err, cb) {
585         if(self.state == DESTROYED) return;
586
587         // Are we connected ? emit connect event
588         if(self.state == CONNECTING && options.firstConnect) {
589           self.emit('connect', self);
590           self.emit('fullsetup', self);
591           self.emit('all', self);
592         } else if(self.isConnected()) {
593           self.emit('reconnect', self);
594         } else if(!self.isConnected() && self.listeners("close").length > 0) {
595           self.emit('close', self);
596         }
597
598         // Perform topology monitor
599         topologyMonitor(self);
600       });
601     }
602
603     // Ping all servers
604     for(var i = 0; i < proxies.length; i++) {
605       pingServer(self, proxies[i], function(err, r) {
606         count = count - 1;
607
608         if(count == 0) {
609           if(self.state == DESTROYED) return;
610
611           // Attempt to connect to any unknown servers
612           reconnectProxies(self, self.disconnectedProxies, function(err, cb) {
613             if(self.state == DESTROYED) return;
614             // Perform topology monitor
615             topologyMonitor(self);
616           });
617         }
618       });
619     }
620   }, self.s.haInterval);
621 }
622
623 /**
624  * Returns the last known ismaster document for this server
625  * @method
626  * @return {object}
627  */
628 Mongos.prototype.lastIsMaster = function() {
629   return this.ismaster;
630 }
631
632 /**
633  * Unref all connections belong to this server
634  * @method
635  */
636 Mongos.prototype.unref = function(emitClose) {
637   // Transition state
638   stateTransition(this, DISCONNECTED);
639   // Get all proxies
640   var proxies = self.connectedProxies.concat(self.connectingProxies);
641   proxies.forEach(function(x) {
642     x.unref();
643   });
644
645   clearTimeout(this.haTimeoutId);
646 }
647
648 /**
649  * Destroy the server connection
650  * @method
651  */
652 Mongos.prototype.destroy = function(emitClose) {
653   // Transition state
654   stateTransition(this, DESTROYED);
655   // Get all proxies
656   var proxies = this.connectedProxies.concat(this.connectingProxies);
657   // Clear out any monitoring process
658   if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
659
660   // Destroy all connecting servers
661   proxies.forEach(function(x) {
662     x.destroy();
663   });
664
665   // Emit toplogy closing event
666   emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
667 }
668
669 /**
670  * Figure out if the server is connected
671  * @method
672  * @return {boolean}
673  */
674 Mongos.prototype.isConnected = function(options) {
675   return this.connectedProxies.length > 0;
676 }
677
678 /**
679  * Figure out if the server instance was destroyed by calling destroy
680  * @method
681  * @return {boolean}
682  */
683 Mongos.prototype.isDestroyed = function() {
684   return this.state == DESTROYED;
685 }
686
687 //
688 // Operations
689 //
690
691 // Execute write operation
692 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
693   if(typeof options == 'function') callback = options, options = {}, options = options || {};
694   // Ensure we have no options
695   options = options || {};
696   // Pick a server
697   var server = pickProxy(self);
698   // No server found error out
699   if(!server) return callback(new MongoError('no mongos proxy available'));
700   // Execute the command
701   server[op](ns, ops, options, callback);
702 }
703
704 /**
705  * Insert one or more documents
706  * @method
707  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
708  * @param {array} ops An array of documents to insert
709  * @param {boolean} [options.ordered=true] Execute in order or out of order
710  * @param {object} [options.writeConcern={}] Write concern for the operation
711  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
712  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
713  * @param {opResultCallback} callback A callback function
714  */
715 Mongos.prototype.insert = function(ns, ops, options, callback) {
716   if(typeof options == 'function') callback = options, options = {}, options = options || {};
717   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
718
719   // Not connected but we have a disconnecthandler
720   if(!this.isConnected() && this.s.disconnectHandler != null) {
721     return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
722   }
723
724   // No mongos proxy available
725   if(!this.isConnected()) {
726     return callback(new MongoError('no mongos proxy available'));
727   }
728
729   // Execute write operation
730   executeWriteOperation(this, 'insert', ns, ops, options, callback);
731 }
732
733 /**
734  * Perform one or more update operations
735  * @method
736  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
737  * @param {array} ops An array of updates
738  * @param {boolean} [options.ordered=true] Execute in order or out of order
739  * @param {object} [options.writeConcern={}] Write concern for the operation
740  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
741  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
742  * @param {opResultCallback} callback A callback function
743  */
744 Mongos.prototype.update = function(ns, ops, options, callback) {
745   if(typeof options == 'function') callback = options, options = {}, options = options || {};
746   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
747
748   // Not connected but we have a disconnecthandler
749   if(!this.isConnected() && this.s.disconnectHandler != null) {
750     return this.s.disconnectHandler.add('update', ns, ops, options, callback);
751   }
752
753   // No mongos proxy available
754   if(!this.isConnected()) {
755     return callback(new MongoError('no mongos proxy available'));
756   }
757
758   // Execute write operation
759   executeWriteOperation(this, 'update', ns, ops, options, callback);
760 }
761
762 /**
763  * Perform one or more remove operations
764  * @method
765  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
766  * @param {array} ops An array of removes
767  * @param {boolean} [options.ordered=true] Execute in order or out of order
768  * @param {object} [options.writeConcern={}] Write concern for the operation
769  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
770  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
771  * @param {opResultCallback} callback A callback function
772  */
773 Mongos.prototype.remove = function(ns, ops, options, callback) {
774   if(typeof options == 'function') callback = options, options = {}, options = options || {};
775   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
776
777   // Not connected but we have a disconnecthandler
778   if(!this.isConnected() && this.s.disconnectHandler != null) {
779     return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
780   }
781
782   // No mongos proxy available
783   if(!this.isConnected()) {
784     return callback(new MongoError('no mongos proxy available'));
785   }
786
787   // Execute write operation
788   executeWriteOperation(this, 'remove', ns, ops, options, callback);
789 }
790
791 /**
792  * Execute a command
793  * @method
794  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
795  * @param {object} cmd The command hash
796  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
797  * @param {Connection} [options.connection] Specify connection object to execute command against
798  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
799  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
800  * @param {opResultCallback} callback A callback function
801  */
802 Mongos.prototype.command = function(ns, cmd, options, callback) {
803   if(typeof options == 'function') callback = options, options = {}, options = options || {};
804   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
805   var self = this;
806
807   // Establish readPreference
808   var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
809
810   // Pick a proxy
811   var server = pickProxy(self);
812
813   // Topology is not connected, save the call in the provided store to be
814   // Executed at some point when the handler deems it's reconnected
815   if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
816     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
817   }
818
819   // No server returned we had an error
820   if(server == null) {
821     return callback(new MongoError('no mongos proxy available'));
822   }
823
824   // Execute the command
825   server.command(ns, cmd, options, callback);
826 }
827
828 /**
829  * Perform one or more remove operations
830  * @method
831  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
832  * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
833  * @param {object} [options.batchSize=0] Batchsize for the operation
834  * @param {array} [options.documents=[]] Initial documents list for cursor
835  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
836  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
837  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
838  * @param {opResultCallback} callback A callback function
839  */
840 Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
841   cursorOptions = cursorOptions || {};
842   var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
843   return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
844 }
845
846 /**
847  * Authenticate using a specified mechanism
848  * @method
849  * @param {string} mechanism The Auth mechanism we are invoking
850  * @param {string} db The db we are invoking the mechanism against
851  * @param {...object} param Parameters for the specific mechanism
852  * @param {authResultCallback} callback A callback function
853  */
854 Mongos.prototype.auth = function(mechanism, db) {
855   var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
856   var self = this;
857   var args = Array.prototype.slice.call(arguments, 2);
858   var callback = args.pop();
859
860   // If we don't have the mechanism fail
861   if(this.authProviders[mechanism] == null && mechanism != 'default') {
862     return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
863   }
864
865   // Are we already authenticating, throw
866   if(this.authenticating) {
867     return callback(new MongoError('authentication or logout allready in process'));
868   }
869
870   // Topology is not connected, save the call in the provided store to be
871   // Executed at some point when the handler deems it's reconnected
872   if(!self.isConnected() && self.s.disconnectHandler != null) {
873     return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
874   }
875
876   // Set to authenticating
877   this.authenticating = true;
878   // All errors
879   var errors = [];
880
881   // Get all the servers
882   var servers = this.connectedProxies.slice(0);
883   // No servers return
884   if(servers.length == 0) {
885     this.authenticating = false;
886     callback(null, true);
887   }
888
889   // Authenticate
890   function auth(server) {
891     // Arguments without a callback
892     var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
893     // Create arguments
894     var finalArguments = argsWithoutCallback.concat([function(err, r) {
895       count = count - 1;
896       // Save all the errors
897       if(err) errors.push({name: server.name, err: err});
898       // We are done
899       if(count == 0) {
900         // Auth is done
901         self.authenticating = false;
902
903         // Return the auth error
904         if(errors.length) return callback(MongoError.create({
905           message: 'authentication fail', errors: errors
906         }), false);
907
908         // Successfully authenticated session
909         callback(null, self);
910       }
911     }]);
912
913     // Execute the auth only against non arbiter servers
914     if(!server.lastIsMaster().arbiterOnly) {
915       server.auth.apply(server, finalArguments);
916     }
917   }
918
919   // Get total count
920   var count = servers.length;
921   // Authenticate against all servers
922   while(servers.length > 0) {
923     auth(servers.shift());
924   }
925 }
926
927 /**
928  * Logout from a database
929  * @method
930  * @param {string} db The db we are logging out from
931  * @param {authResultCallback} callback A callback function
932  */
933 Mongos.prototype.logout = function(dbName, callback) {
934   var self = this;
935   // Are we authenticating or logging out, throw
936   if(this.authenticating) {
937     throw new MongoError('authentication or logout allready in process');
938   }
939
940   // Ensure no new members are processed while logging out
941   this.authenticating = true;
942
943   // Remove from all auth providers (avoid any reaplication of the auth details)
944   var providers = Object.keys(this.authProviders);
945   for(var i = 0; i < providers.length; i++) {
946     this.authProviders[providers[i]].logout(dbName);
947   }
948
949   // Now logout all the servers
950   var servers = this.connectedProxies.slice(0);
951   var count = servers.length;
952   if(count == 0) return callback();
953   var errors = [];
954
955   // Execute logout on all server instances
956   for(var i = 0; i < servers.length; i++) {
957     servers[i].logout(dbName, function(err) {
958       count = count - 1;
959       if(err) errors.push({name: server.name, err: err});
960
961       if(count == 0) {
962         // Do not block new operations
963         self.authenticating = false;
964         // If we have one or more errors
965         if(errors.length) return callback(MongoError.create({
966           message: f('logout failed against db %s', dbName), errors: errors
967         }), false);
968
969         // No errors
970         callback();
971       }
972     });
973   }
974 }
975
976 /**
977  * Get server
978  * @method
979  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
980  * @return {Server}
981  */
982 Mongos.prototype.getServer = function() {
983   var server = pickProxy(this);
984   if(this.s.debug) this.emit('pickedServer', null, server);
985   return server;
986 }
987
988 /**
989  * All raw connections
990  * @method
991  * @return {Connection[]}
992  */
993 Mongos.prototype.connections = function() {
994   var connections = [];
995
996   for(var i = 0; i < this.connectedProxies.length; i++) {
997     connections = connections.concat(this.connectedProxies[i].connections());
998   }
999
1000   return connections;
1001 }
1002
1003 /**
1004  * A mongos connect event, used to verify that the connection is up and running
1005  *
1006  * @event Mongos#connect
1007  * @type {Mongos}
1008  */
1009
1010 /**
1011  * A mongos reconnect event, used to verify that the mongos topology has reconnected
1012  *
1013  * @event Mongos#reconnect
1014  * @type {Mongos}
1015  */
1016
1017 /**
1018  * A mongos fullsetup event, used to signal that all topology members have been contacted.
1019  *
1020  * @event Mongos#fullsetup
1021  * @type {Mongos}
1022  */
1023
1024 /**
1025  * A mongos all event, used to signal that all topology members have been contacted.
1026  *
1027  * @event Mongos#all
1028  * @type {Mongos}
1029  */
1030
1031 /**
1032  * A server member left the mongos list
1033  *
1034  * @event Mongos#left
1035  * @type {Mongos}
1036  * @param {string} type The type of member that left (mongos)
1037  * @param {Server} server The server object that left
1038  */
1039
1040 /**
1041  * A server member joined the mongos list
1042  *
1043  * @event Mongos#joined
1044  * @type {Mongos}
1045  * @param {string} type The type of member that left (mongos)
1046  * @param {Server} server The server object that joined
1047  */
1048
1049 /**
1050  * A server opening SDAM monitoring event
1051  *
1052  * @event Mongos#serverOpening
1053  * @type {object}
1054  */
1055
1056 /**
1057  * A server closed SDAM monitoring event
1058  *
1059  * @event Mongos#serverClosed
1060  * @type {object}
1061  */
1062
1063 /**
1064  * A server description SDAM change monitoring event
1065  *
1066  * @event Mongos#serverDescriptionChanged
1067  * @type {object}
1068  */
1069
1070 /**
1071  * A topology open SDAM event
1072  *
1073  * @event Mongos#topologyOpening
1074  * @type {object}
1075  */
1076
1077 /**
1078  * A topology closed SDAM event
1079  *
1080  * @event Mongos#topologyClosed
1081  * @type {object}
1082  */
1083
1084 /**
1085  * A topology structure SDAM change event
1086  *
1087  * @event Mongos#topologyDescriptionChanged
1088  * @type {object}
1089  */
1090
1091 /**
1092  * A topology serverHeartbeatStarted SDAM event
1093  *
1094  * @event Mongos#serverHeartbeatStarted
1095  * @type {object}
1096  */
1097
1098 /**
1099  * A topology serverHeartbeatFailed SDAM event
1100  *
1101  * @event Mongos#serverHeartbeatFailed
1102  * @type {object}
1103  */
1104
1105 /**
1106  * A topology serverHeartbeatSucceeded SDAM change event
1107  *
1108  * @event Mongos#serverHeartbeatSucceeded
1109  * @type {object}
1110  */
1111
1112 module.exports = Mongos;