3 var inherits = require('util').inherits,
4 f = require('util').format,
5 EventEmitter = require('events').EventEmitter,
6 BSON = require('bson').native().BSON,
7 ReadPreference = require('./read_preference'),
8 BasicCursor = require('../cursor'),
9 Logger = require('../connection/logger'),
10 debugOptions = require('../connection/utils').debugOptions,
11 MongoError = require('../error'),
12 Server = require('./server'),
13 ReplSetState = require('./replset_state'),
14 assign = require('./shared').assign,
15 clone = require('./shared').clone,
16 createClientInfo = require('./shared').createClientInfo;
19 * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
20 * used to construct connections.
23 * var Mongos = require('mongodb-core').Mongos
24 * , ReadPreference = require('mongodb-core').ReadPreference
25 * , assert = require('assert');
27 * var server = new Mongos([{host: 'localhost', port: 30000}]);
28 * // Wait for the connection event
29 * server.on('connect', function(server) {
37 var MongoCR = require('../auth/mongocr')
38 , X509 = require('../auth/x509')
39 , Plain = require('../auth/plain')
40 , GSSAPI = require('../auth/gssapi')
41 , SSPI = require('../auth/sspi')
42 , ScramSHA1 = require('../auth/scram');
46 var DISCONNECTED = 'disconnected';
47 var CONNECTING = 'connecting';
48 var CONNECTED = 'connected';
49 var DESTROYED = 'destroyed';
51 function stateTransition(self, newState) {
52 var legalTransitions = {
53 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
54 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
55 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
56 'destroyed': [DESTROYED]
60 var legalStates = legalTransitions[self.state];
61 if(legalStates && legalStates.indexOf(newState) != -1) {
62 self.state = newState;
64 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
65 , self.id, self.state, newState, legalStates));
70 // ReplSet instance id
72 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
75 * Creates a new Mongos instance
77 * @param {array} seedlist A list of seeds for the replicaset
78 * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
79 * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
80 * @param {number} [options.size=5] Server connection pool size
81 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
82 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
83 * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
84 * @param {boolean} [options.noDelay=true] TCP Connection no delay
85 * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
86 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
87 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
88 * @param {boolean} [options.ssl=false] Use SSL for connection
89 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
90 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
91 * @param {Buffer} [options.cert] SSL Certificate binary buffer
92 * @param {Buffer} [options.key] SSL Key file binary buffer
93 * @param {string} [options.passphrase] SSL Certificate pass phrase
94 * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
95 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
96 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
97 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
98 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
99 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
100 * @return {Mongos} A cursor instance
101 * @fires Mongos#connect
102 * @fires Mongos#reconnect
103 * @fires Mongos#joined
105 * @fires Mongos#failed
106 * @fires Mongos#fullsetup
108 * @fires Mongos#serverHeartbeatStarted
109 * @fires Mongos#serverHeartbeatSucceeded
110 * @fires Mongos#serverHeartbeatFailed
111 * @fires Mongos#topologyOpening
112 * @fires Mongos#topologyClosed
113 * @fires Mongos#topologyDescriptionChanged
115 var Mongos = function(seedlist, options) {
117 options = options || {};
124 options: assign({}, options),
126 bson: options.bson || new BSON(),
128 Cursor: options.cursorFactory || BasicCursor,
130 logger: Logger('Mongos', options),
134 haInterval: options.haInterval ? options.haInterval : 10000,
135 // Disconnect handler
136 disconnectHandler: options.disconnectHandler,
137 // Server selection index
139 // Connect function options passed in
141 // Are we running in debug mode
142 debug: typeof options.debug == 'boolean' ? options.debug : false,
144 localThresholdMS: options.localThresholdMS || 15,
146 clientInfo: createClientInfo(options)
149 // Set the client info
150 this.s.options.clientInfo = createClientInfo(options);
152 // Log info warning if the socketTimeout < haInterval as it will cause
153 // a lot of recycled connections to happen.
154 if(this.s.logger.isWarn()
155 && this.s.options.socketTimeout != 0
156 && this.s.options.socketTimeout < this.s.haInterval) {
157 this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
158 , this.s.options.socketTimeout, this.s.haInterval));
161 // All the authProviders
162 this.authProviders = options.authProviders || {
163 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
164 , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
165 , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
168 // Disconnected state
169 this.state = DISCONNECTED;
171 // Current proxies we are connecting to
172 this.connectingProxies = [];
173 // Currently connected proxies
174 this.connectedProxies = [];
175 // Disconnected proxies
176 this.disconnectedProxies = [];
177 // Are we authenticating
178 this.authenticating = false;
179 // Index of proxy to run operations against
181 // High availability timeout id
182 this.haTimeoutId = null;
184 this.ismaster = null;
186 // Add event listener
187 EventEmitter.call(this);
190 inherits(Mongos, EventEmitter);
192 Object.defineProperty(Mongos.prototype, 'type', {
193 enumerable:true, get: function() { return 'mongos'; }
197 * Emit event if it exists
200 function emitSDAMEvent(self, event, description) {
201 if(self.listeners(event).length > 0) {
202 self.emit(event, description);
207 * Initiate server connect
209 * @param {array} [options.auth=null] Array of auth options to apply on connect
211 Mongos.prototype.connect = function(options) {
213 // Add any connect level options to the internal state
214 this.s.connectOptions = options || {};
215 // Set connecting state
216 stateTransition(this, CONNECTING);
217 // Create server instances
218 var servers = this.s.seedlist.map(function(x) {
219 return new Server(assign({}, self.s.options, x, {
220 authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
222 clientInfo: clone(self.s.clientInfo)
226 // Emit the topology opening event
227 emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
229 // Start all server connections
230 connectProxies(self, servers);
233 function handleEvent(self, event) {
234 return function(err) {
235 if(self.state == DESTROYED) return;
236 // Move to list of disconnectedProxies
237 moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
238 // Emit the left signal
239 self.emit('left', 'mongos', this);
243 function handleInitialConnectEvent(self, event) {
244 return function(err) {
245 // Destroy the instance
246 if(self.state == DESTROYED) {
247 // Move from connectingProxies
248 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
249 return this.destroy();
252 // Check the type of server
253 if(event == 'connect') {
254 // Get last known ismaster
255 self.ismaster = this.lastIsMaster();
257 // Is this not a proxy, remove t
258 if(self.ismaster.msg == 'isdbgrid') {
259 // Add to the connectd list
260 for(var i = 0; i < self.connectedProxies.length; i++) {
261 if(self.connectedProxies[i].name == this.name) {
262 // Move from connectingProxies
263 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
265 return self.emit('failed', this);
269 // Remove the handlers
270 for(var i = 0; i < handlers.length; i++) {
271 this.removeAllListeners(handlers[i]);
274 // Add stable state handlers
275 this.on('error', handleEvent(self, 'error'));
276 this.on('close', handleEvent(self, 'close'));
277 this.on('timeout', handleEvent(self, 'timeout'));
278 this.on('parseError', handleEvent(self, 'parseError'));
280 // Move from connecting proxies connected
281 moveServerFrom(self.connectingProxies, self.connectedProxies, this);
282 // Emit the joined event
283 self.emit('joined', 'mongos', this);
286 // Print warning if we did not find a mongos proxy
287 if(self.s.logger.isWarn()) {
288 var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
289 // We have a standalone server
290 if(!self.ismaster.hosts) {
291 message = 'expected mongos proxy, but found standalone mongod for server %s';
294 self.s.logger.warn(f(message, this.name));
297 // This is not a mongos proxy, remove it completely
298 removeProxyFrom(self.connectingProxies, this);
299 // Emit the left event
300 self.emit('left', 'server', this);
302 self.emit('failed', this);
305 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
306 // Emit the left event
307 self.emit('left', 'mongos', this);
309 self.emit('failed', this);
312 // Trigger topologyMonitor
313 if(self.connectingProxies.length == 0) {
314 // Emit connected if we are connected
315 if(self.connectedProxies.length > 0) {
316 // Set the state to connected
317 stateTransition(self, CONNECTED);
318 // Emit the connect event
319 self.emit('connect', self);
320 self.emit('fullsetup', self);
321 self.emit('all', self);
322 } else if(self.disconnectedProxies.length == 0) {
323 // Print warning if we did not find a mongos proxy
324 if(self.s.logger.isWarn()) {
325 self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset'));
328 // Emit the error that no proxies were found
329 return self.emit('error', new MongoError('no mongos proxies found in seed list'));
333 topologyMonitor(self, {firstConnect:true});
338 function connectProxies(self, servers) {
339 // Update connectingProxies
340 self.connectingProxies = self.connectingProxies.concat(servers);
342 // Index used to interleaf the server connects, avoiding
343 // runtime issues on io constrained vm's
344 var timeoutInterval = 0;
346 function connect(server, timeoutInterval) {
347 setTimeout(function() {
348 // Add event handlers
349 server.once('close', handleInitialConnectEvent(self, 'close'));
350 server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
351 server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
352 server.once('error', handleInitialConnectEvent(self, 'error'));
353 server.once('connect', handleInitialConnectEvent(self, 'connect'));
354 // SDAM Monitoring events
355 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
356 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
357 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
359 server.connect(self.s.connectOptions);
362 // Start all the servers
363 while(servers.length > 0) {
364 connect(servers.shift(), timeoutInterval++);
368 function pickProxy(self) {
369 // Get the currently connected Proxies
370 var connectedProxies = self.connectedProxies.slice(0);
373 var lowerBoundLatency = Number.MAX_VALUE;
375 // Determine the lower bound for the Proxies
376 for(var i = 0; i < connectedProxies.length; i++) {
377 if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
378 lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
382 // Filter out the possible servers
383 connectedProxies = connectedProxies.filter(function(server) {
384 if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS))
385 && server.isConnected()) {
390 // We have no connectedProxies pick first of the connected ones
391 if(connectedProxies.length == 0) {
392 return self.connectedProxies[0];
396 var proxy = connectedProxies[self.index % connectedProxies.length];
398 self.index = (self.index + 1) % connectedProxies.length;
403 function moveServerFrom(from, to, proxy) {
404 for(var i = 0; i < from.length; i++) {
405 if(from[i].name == proxy.name) {
410 for(var i = 0; i < to.length; i++) {
411 if(to[i].name == proxy.name) {
419 function removeProxyFrom(from, proxy) {
420 for(var i = 0; i < from.length; i++) {
421 if(from[i].name == proxy.name) {
427 function reconnectProxies(self, proxies, callback) {
429 var count = proxies.length;
432 var _handleEvent = function(self, event) {
433 return function(err, r) {
438 if(self.state == DESTROYED) {
439 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
440 return this.destroy();
443 if(event == 'connect' && !self.authenticating) {
445 if(self.state == DESTROYED) {
446 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
447 return _self.destroy();
450 // Remove the handlers
451 for(var i = 0; i < handlers.length; i++) {
452 _self.removeAllListeners(handlers[i]);
455 // Add stable state handlers
456 _self.on('error', handleEvent(self, 'error'));
457 _self.on('close', handleEvent(self, 'close'));
458 _self.on('timeout', handleEvent(self, 'timeout'));
459 _self.on('parseError', handleEvent(self, 'parseError'));
461 // Move to the connected servers
462 moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self);
464 self.emit('joined', 'mongos', _self);
465 } else if(event == 'connect' && self.authenticating) {
466 // Move from connectingProxies
467 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
471 // Are we done finish up callback
484 function execute(_server, i) {
485 setTimeout(function() {
487 if(self.state == DESTROYED) {
491 // Create a new server instance
492 var server = new Server(assign({}, self.s.options, {
493 host: _server.name.split(':')[0],
494 port: parseInt(_server.name.split(':')[1], 10)
496 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
498 clientInfo: clone(self.s.clientInfo)
502 server.once('connect', _handleEvent(self, 'connect'));
503 server.once('close', _handleEvent(self, 'close'));
504 server.once('timeout', _handleEvent(self, 'timeout'));
505 server.once('error', _handleEvent(self, 'error'));
506 server.once('parseError', _handleEvent(self, 'parseError'));
508 // SDAM Monitoring events
509 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
510 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
511 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
512 server.connect(self.s.connectOptions);
516 // Create new instances
517 for(var i = 0; i < proxies.length; i++) {
518 execute(proxies[i], i);
522 function topologyMonitor(self, options) {
523 options = options || {};
525 // Set momitoring timeout
526 self.haTimeoutId = setTimeout(function() {
527 if(self.state == DESTROYED) return;
528 // If we have a primary and a disconnect handler, execute
529 // buffered operations
530 if(self.isConnected() && self.s.disconnectHandler) {
531 self.s.disconnectHandler.execute();
534 // Get the connectingServers
535 var proxies = self.connectedProxies.slice(0);
537 var count = proxies.length;
539 // If the count is zero schedule a new fast
540 function pingServer(_self, _server, cb) {
541 // Measure running time
542 var start = new Date().getTime();
544 // Emit the server heartbeat start
545 emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
548 _server.command('admin.$cmd', {ismaster:true}, {monitoring: true}, function(err, r) {
549 if(self.state == DESTROYED) {
550 // Move from connectingProxies
551 moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
557 var latencyMS = new Date().getTime() - start;
559 // We had an error, remove it from the state
561 // Emit the server heartbeat failure
562 emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
564 // Update the server ismaster
565 _server.ismaster = r.result;
566 _server.lastIsMasterMS = latencyMS;
568 // Server heart beat event
569 emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
576 // No proxies initiate monitor again
577 if(proxies.length == 0) {
578 // Emit close event if any listeners registered
579 if(self.listeners("close").length > 0) {
580 self.emit('close', self);
583 // Attempt to connect to any unknown servers
584 return reconnectProxies(self, self.disconnectedProxies, function(err, cb) {
585 if(self.state == DESTROYED) return;
587 // Are we connected ? emit connect event
588 if(self.state == CONNECTING && options.firstConnect) {
589 self.emit('connect', self);
590 self.emit('fullsetup', self);
591 self.emit('all', self);
592 } else if(self.isConnected()) {
593 self.emit('reconnect', self);
594 } else if(!self.isConnected() && self.listeners("close").length > 0) {
595 self.emit('close', self);
598 // Perform topology monitor
599 topologyMonitor(self);
604 for(var i = 0; i < proxies.length; i++) {
605 pingServer(self, proxies[i], function(err, r) {
609 if(self.state == DESTROYED) return;
611 // Attempt to connect to any unknown servers
612 reconnectProxies(self, self.disconnectedProxies, function(err, cb) {
613 if(self.state == DESTROYED) return;
614 // Perform topology monitor
615 topologyMonitor(self);
620 }, self.s.haInterval);
624 * Returns the last known ismaster document for this server
628 Mongos.prototype.lastIsMaster = function() {
629 return this.ismaster;
633 * Unref all connections belong to this server
636 Mongos.prototype.unref = function(emitClose) {
638 stateTransition(this, DISCONNECTED);
640 var proxies = self.connectedProxies.concat(self.connectingProxies);
641 proxies.forEach(function(x) {
645 clearTimeout(this.haTimeoutId);
649 * Destroy the server connection
652 Mongos.prototype.destroy = function(emitClose) {
654 stateTransition(this, DESTROYED);
656 var proxies = this.connectedProxies.concat(this.connectingProxies);
657 // Clear out any monitoring process
658 if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
660 // Destroy all connecting servers
661 proxies.forEach(function(x) {
665 // Emit toplogy closing event
666 emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
670 * Figure out if the server is connected
674 Mongos.prototype.isConnected = function(options) {
675 return this.connectedProxies.length > 0;
679 * Figure out if the server instance was destroyed by calling destroy
683 Mongos.prototype.isDestroyed = function() {
684 return this.state == DESTROYED;
691 // Execute write operation
692 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
693 if(typeof options == 'function') callback = options, options = {}, options = options || {};
694 // Ensure we have no options
695 options = options || {};
697 var server = pickProxy(self);
698 // No server found error out
699 if(!server) return callback(new MongoError('no mongos proxy available'));
700 // Execute the command
701 server[op](ns, ops, options, callback);
705 * Insert one or more documents
707 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
708 * @param {array} ops An array of documents to insert
709 * @param {boolean} [options.ordered=true] Execute in order or out of order
710 * @param {object} [options.writeConcern={}] Write concern for the operation
711 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
712 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
713 * @param {opResultCallback} callback A callback function
715 Mongos.prototype.insert = function(ns, ops, options, callback) {
716 if(typeof options == 'function') callback = options, options = {}, options = options || {};
717 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
719 // Not connected but we have a disconnecthandler
720 if(!this.isConnected() && this.s.disconnectHandler != null) {
721 return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
724 // No mongos proxy available
725 if(!this.isConnected()) {
726 return callback(new MongoError('no mongos proxy available'));
729 // Execute write operation
730 executeWriteOperation(this, 'insert', ns, ops, options, callback);
734 * Perform one or more update operations
736 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
737 * @param {array} ops An array of updates
738 * @param {boolean} [options.ordered=true] Execute in order or out of order
739 * @param {object} [options.writeConcern={}] Write concern for the operation
740 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
741 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
742 * @param {opResultCallback} callback A callback function
744 Mongos.prototype.update = function(ns, ops, options, callback) {
745 if(typeof options == 'function') callback = options, options = {}, options = options || {};
746 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
748 // Not connected but we have a disconnecthandler
749 if(!this.isConnected() && this.s.disconnectHandler != null) {
750 return this.s.disconnectHandler.add('update', ns, ops, options, callback);
753 // No mongos proxy available
754 if(!this.isConnected()) {
755 return callback(new MongoError('no mongos proxy available'));
758 // Execute write operation
759 executeWriteOperation(this, 'update', ns, ops, options, callback);
763 * Perform one or more remove operations
765 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
766 * @param {array} ops An array of removes
767 * @param {boolean} [options.ordered=true] Execute in order or out of order
768 * @param {object} [options.writeConcern={}] Write concern for the operation
769 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
770 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
771 * @param {opResultCallback} callback A callback function
773 Mongos.prototype.remove = function(ns, ops, options, callback) {
774 if(typeof options == 'function') callback = options, options = {}, options = options || {};
775 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
777 // Not connected but we have a disconnecthandler
778 if(!this.isConnected() && this.s.disconnectHandler != null) {
779 return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
782 // No mongos proxy available
783 if(!this.isConnected()) {
784 return callback(new MongoError('no mongos proxy available'));
787 // Execute write operation
788 executeWriteOperation(this, 'remove', ns, ops, options, callback);
794 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
795 * @param {object} cmd The command hash
796 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
797 * @param {Connection} [options.connection] Specify connection object to execute command against
798 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
799 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
800 * @param {opResultCallback} callback A callback function
802 Mongos.prototype.command = function(ns, cmd, options, callback) {
803 if(typeof options == 'function') callback = options, options = {}, options = options || {};
804 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
807 // Establish readPreference
808 var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
811 var server = pickProxy(self);
813 // Topology is not connected, save the call in the provided store to be
814 // Executed at some point when the handler deems it's reconnected
815 if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
816 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
819 // No server returned we had an error
821 return callback(new MongoError('no mongos proxy available'));
824 // Execute the command
825 server.command(ns, cmd, options, callback);
829 * Perform one or more remove operations
831 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
832 * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
833 * @param {object} [options.batchSize=0] Batchsize for the operation
834 * @param {array} [options.documents=[]] Initial documents list for cursor
835 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
836 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
837 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
838 * @param {opResultCallback} callback A callback function
840 Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
841 cursorOptions = cursorOptions || {};
842 var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
843 return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
847 * Authenticate using a specified mechanism
849 * @param {string} mechanism The Auth mechanism we are invoking
850 * @param {string} db The db we are invoking the mechanism against
851 * @param {...object} param Parameters for the specific mechanism
852 * @param {authResultCallback} callback A callback function
854 Mongos.prototype.auth = function(mechanism, db) {
855 var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
857 var args = Array.prototype.slice.call(arguments, 2);
858 var callback = args.pop();
860 // If we don't have the mechanism fail
861 if(this.authProviders[mechanism] == null && mechanism != 'default') {
862 return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
865 // Are we already authenticating, throw
866 if(this.authenticating) {
867 return callback(new MongoError('authentication or logout allready in process'));
870 // Topology is not connected, save the call in the provided store to be
871 // Executed at some point when the handler deems it's reconnected
872 if(!self.isConnected() && self.s.disconnectHandler != null) {
873 return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
876 // Set to authenticating
877 this.authenticating = true;
881 // Get all the servers
882 var servers = this.connectedProxies.slice(0);
884 if(servers.length == 0) {
885 this.authenticating = false;
886 callback(null, true);
890 function auth(server) {
891 // Arguments without a callback
892 var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
894 var finalArguments = argsWithoutCallback.concat([function(err, r) {
896 // Save all the errors
897 if(err) errors.push({name: server.name, err: err});
901 self.authenticating = false;
903 // Return the auth error
904 if(errors.length) return callback(MongoError.create({
905 message: 'authentication fail', errors: errors
908 // Successfully authenticated session
909 callback(null, self);
913 // Execute the auth only against non arbiter servers
914 if(!server.lastIsMaster().arbiterOnly) {
915 server.auth.apply(server, finalArguments);
920 var count = servers.length;
921 // Authenticate against all servers
922 while(servers.length > 0) {
923 auth(servers.shift());
928 * Logout from a database
930 * @param {string} db The db we are logging out from
931 * @param {authResultCallback} callback A callback function
933 Mongos.prototype.logout = function(dbName, callback) {
935 // Are we authenticating or logging out, throw
936 if(this.authenticating) {
937 throw new MongoError('authentication or logout allready in process');
940 // Ensure no new members are processed while logging out
941 this.authenticating = true;
943 // Remove from all auth providers (avoid any reaplication of the auth details)
944 var providers = Object.keys(this.authProviders);
945 for(var i = 0; i < providers.length; i++) {
946 this.authProviders[providers[i]].logout(dbName);
949 // Now logout all the servers
950 var servers = this.connectedProxies.slice(0);
951 var count = servers.length;
952 if(count == 0) return callback();
955 // Execute logout on all server instances
956 for(var i = 0; i < servers.length; i++) {
957 servers[i].logout(dbName, function(err) {
959 if(err) errors.push({name: server.name, err: err});
962 // Do not block new operations
963 self.authenticating = false;
964 // If we have one or more errors
965 if(errors.length) return callback(MongoError.create({
966 message: f('logout failed against db %s', dbName), errors: errors
979 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
982 Mongos.prototype.getServer = function() {
983 var server = pickProxy(this);
984 if(this.s.debug) this.emit('pickedServer', null, server);
989 * All raw connections
991 * @return {Connection[]}
993 Mongos.prototype.connections = function() {
994 var connections = [];
996 for(var i = 0; i < this.connectedProxies.length; i++) {
997 connections = connections.concat(this.connectedProxies[i].connections());
1004 * A mongos connect event, used to verify that the connection is up and running
1006 * @event Mongos#connect
1011 * A mongos reconnect event, used to verify that the mongos topology has reconnected
1013 * @event Mongos#reconnect
1018 * A mongos fullsetup event, used to signal that all topology members have been contacted.
1020 * @event Mongos#fullsetup
1025 * A mongos all event, used to signal that all topology members have been contacted.
1032 * A server member left the mongos list
1034 * @event Mongos#left
1036 * @param {string} type The type of member that left (mongos)
1037 * @param {Server} server The server object that left
1041 * A server member joined the mongos list
1043 * @event Mongos#joined
1045 * @param {string} type The type of member that left (mongos)
1046 * @param {Server} server The server object that joined
1050 * A server opening SDAM monitoring event
1052 * @event Mongos#serverOpening
1057 * A server closed SDAM monitoring event
1059 * @event Mongos#serverClosed
1064 * A server description SDAM change monitoring event
1066 * @event Mongos#serverDescriptionChanged
1071 * A topology open SDAM event
1073 * @event Mongos#topologyOpening
1078 * A topology closed SDAM event
1080 * @event Mongos#topologyClosed
1085 * A topology structure SDAM change event
1087 * @event Mongos#topologyDescriptionChanged
1092 * A topology serverHeartbeatStarted SDAM event
1094 * @event Mongos#serverHeartbeatStarted
1099 * A topology serverHeartbeatFailed SDAM event
1101 * @event Mongos#serverHeartbeatFailed
1106 * A topology serverHeartbeatSucceeded SDAM change event
1108 * @event Mongos#serverHeartbeatSucceeded
1112 module.exports = Mongos;