3 var inherits = require('util').inherits,
4 f = require('util').format,
5 EventEmitter = require('events').EventEmitter,
6 BasicCursor = require('../cursor'),
7 Logger = require('../connection/logger'),
8 retrieveBSON = require('../connection/utils').retrieveBSON,
9 MongoError = require('../error'),
10 Server = require('./server'),
11 assign = require('./shared').assign,
12 clone = require('./shared').clone,
13 createClientInfo = require('./shared').createClientInfo;
15 var BSON = retrieveBSON();
18 * @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
19 * used to construct connections.
22 * var Mongos = require('mongodb-core').Mongos
23 * , ReadPreference = require('mongodb-core').ReadPreference
24 * , assert = require('assert');
26 * var server = new Mongos([{host: 'localhost', port: 30000}]);
27 * // Wait for the connection event
28 * server.on('connect', function(server) {
36 var MongoCR = require('../auth/mongocr')
37 , X509 = require('../auth/x509')
38 , Plain = require('../auth/plain')
39 , GSSAPI = require('../auth/gssapi')
40 , SSPI = require('../auth/sspi')
41 , ScramSHA1 = require('../auth/scram');
45 var DISCONNECTED = 'disconnected';
46 var CONNECTING = 'connecting';
47 var CONNECTED = 'connected';
48 var DESTROYED = 'destroyed';
50 function stateTransition(self, newState) {
51 var legalTransitions = {
52 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
53 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
54 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
55 'destroyed': [DESTROYED]
59 var legalStates = legalTransitions[self.state];
60 if(legalStates && legalStates.indexOf(newState) != -1) {
61 self.state = newState;
63 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
64 , self.id, self.state, newState, legalStates));
69 // ReplSet instance id
71 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
74 * Creates a new Mongos instance
76 * @param {array} seedlist A list of seeds for the replicaset
77 * @param {number} [options.haInterval=5000] The High availability period for replicaset inquiry
78 * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
79 * @param {number} [options.size=5] Server connection pool size
80 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
81 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
82 * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
83 * @param {boolean} [options.noDelay=true] TCP Connection no delay
84 * @param {number} [options.connectionTimeout=1000] TCP Connection timeout setting
85 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
86 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
87 * @param {boolean} [options.ssl=false] Use SSL for connection
88 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
89 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
90 * @param {Buffer} [options.cert] SSL Certificate binary buffer
91 * @param {Buffer} [options.key] SSL Key file binary buffer
92 * @param {string} [options.passphrase] SSL Certificate pass phrase
93 * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
94 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
95 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
96 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
97 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
98 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
99 * @return {Mongos} A cursor instance
100 * @fires Mongos#connect
101 * @fires Mongos#reconnect
102 * @fires Mongos#joined
104 * @fires Mongos#failed
105 * @fires Mongos#fullsetup
107 * @fires Mongos#serverHeartbeatStarted
108 * @fires Mongos#serverHeartbeatSucceeded
109 * @fires Mongos#serverHeartbeatFailed
110 * @fires Mongos#topologyOpening
111 * @fires Mongos#topologyClosed
112 * @fires Mongos#topologyDescriptionChanged
113 * @property {string} type the topology type.
114 * @property {string} parserType the parser type used (c++ or js).
116 var Mongos = function(seedlist, options) {
117 options = options || {};
124 options: assign({}, options),
126 bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
127 BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
128 BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
130 Cursor: options.cursorFactory || BasicCursor,
132 logger: Logger('Mongos', options),
136 haInterval: options.haInterval ? options.haInterval : 10000,
137 // Disconnect handler
138 disconnectHandler: options.disconnectHandler,
139 // Server selection index
141 // Connect function options passed in
143 // Are we running in debug mode
144 debug: typeof options.debug == 'boolean' ? options.debug : false,
146 localThresholdMS: options.localThresholdMS || 15,
148 clientInfo: createClientInfo(options)
151 // Set the client info
152 this.s.options.clientInfo = createClientInfo(options);
154 // Log info warning if the socketTimeout < haInterval as it will cause
155 // a lot of recycled connections to happen.
156 if(this.s.logger.isWarn()
157 && this.s.options.socketTimeout != 0
158 && this.s.options.socketTimeout < this.s.haInterval) {
159 this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
160 , this.s.options.socketTimeout, this.s.haInterval));
163 // All the authProviders
164 this.authProviders = options.authProviders || {
165 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
166 , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
167 , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
170 // Disconnected state
171 this.state = DISCONNECTED;
173 // Current proxies we are connecting to
174 this.connectingProxies = [];
175 // Currently connected proxies
176 this.connectedProxies = [];
177 // Disconnected proxies
178 this.disconnectedProxies = [];
179 // Are we authenticating
180 this.authenticating = false;
181 // Index of proxy to run operations against
183 // High availability timeout id
184 this.haTimeoutId = null;
186 this.ismaster = null;
188 // Add event listener
189 EventEmitter.call(this);
192 inherits(Mongos, EventEmitter);
194 Object.defineProperty(Mongos.prototype, 'type', {
195 enumerable:true, get: function() { return 'mongos'; }
198 Object.defineProperty(Mongos.prototype, 'parserType', {
199 enumerable:true, get: function() {
200 return BSON.native ? "c++" : "js";
205 * Emit event if it exists
208 function emitSDAMEvent(self, event, description) {
209 if(self.listeners(event).length > 0) {
210 self.emit(event, description);
215 * Initiate server connect
217 * @param {array} [options.auth=null] Array of auth options to apply on connect
219 Mongos.prototype.connect = function(options) {
221 // Add any connect level options to the internal state
222 this.s.connectOptions = options || {};
223 // Set connecting state
224 stateTransition(this, CONNECTING);
225 // Create server instances
226 var servers = this.s.seedlist.map(function(x) {
227 return new Server(assign({}, self.s.options, x, {
228 authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
230 clientInfo: clone(self.s.clientInfo)
234 // Emit the topology opening event
235 emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
237 // Start all server connections
238 connectProxies(self, servers);
241 function handleEvent(self) {
243 if(self.state == DESTROYED) return;
244 // Move to list of disconnectedProxies
245 moveServerFrom(self.connectedProxies, self.disconnectedProxies, this);
246 // Emit the left signal
247 self.emit('left', 'mongos', this);
251 function handleInitialConnectEvent(self, event) {
253 // Destroy the instance
254 if(self.state == DESTROYED) {
255 // Move from connectingProxies
256 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
257 return this.destroy();
260 // Check the type of server
261 if(event == 'connect') {
262 // Get last known ismaster
263 self.ismaster = this.lastIsMaster();
265 // Is this not a proxy, remove t
266 if(self.ismaster.msg == 'isdbgrid') {
267 // Add to the connectd list
268 for(var i = 0; i < self.connectedProxies.length; i++) {
269 if(self.connectedProxies[i].name == this.name) {
270 // Move from connectingProxies
271 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
273 return self.emit('failed', this);
277 // Remove the handlers
278 for(i = 0; i < handlers.length; i++) {
279 this.removeAllListeners(handlers[i]);
282 // Add stable state handlers
283 this.on('error', handleEvent(self, 'error'));
284 this.on('close', handleEvent(self, 'close'));
285 this.on('timeout', handleEvent(self, 'timeout'));
286 this.on('parseError', handleEvent(self, 'parseError'));
288 // Move from connecting proxies connected
289 moveServerFrom(self.connectingProxies, self.connectedProxies, this);
290 // Emit the joined event
291 self.emit('joined', 'mongos', this);
294 // Print warning if we did not find a mongos proxy
295 if(self.s.logger.isWarn()) {
296 var message = 'expected mongos proxy, but found replicaset member mongod for server %s';
297 // We have a standalone server
298 if(!self.ismaster.hosts) {
299 message = 'expected mongos proxy, but found standalone mongod for server %s';
302 self.s.logger.warn(f(message, this.name));
305 // This is not a mongos proxy, remove it completely
306 removeProxyFrom(self.connectingProxies, this);
307 // Emit the left event
308 self.emit('left', 'server', this);
310 self.emit('failed', this);
313 moveServerFrom(self.connectingProxies, self.disconnectedProxies, this);
314 // Emit the left event
315 self.emit('left', 'mongos', this);
317 self.emit('failed', this);
320 // Trigger topologyMonitor
321 if(self.connectingProxies.length == 0) {
322 // Emit connected if we are connected
323 if(self.connectedProxies.length > 0) {
324 // Set the state to connected
325 stateTransition(self, CONNECTED);
326 // Emit the connect event
327 self.emit('connect', self);
328 self.emit('fullsetup', self);
329 self.emit('all', self);
330 } else if(self.disconnectedProxies.length == 0) {
331 // Print warning if we did not find a mongos proxy
332 if(self.s.logger.isWarn()) {
333 self.s.logger.warn(f('no mongos proxies found in seed list, did you mean to connect to a replicaset'));
336 // Emit the error that no proxies were found
337 return self.emit('error', new MongoError('no mongos proxies found in seed list'));
341 topologyMonitor(self, {firstConnect:true});
346 function connectProxies(self, servers) {
347 // Update connectingProxies
348 self.connectingProxies = self.connectingProxies.concat(servers);
350 // Index used to interleaf the server connects, avoiding
351 // runtime issues on io constrained vm's
352 var timeoutInterval = 0;
354 function connect(server, timeoutInterval) {
355 setTimeout(function() {
356 // Add event handlers
357 server.once('close', handleInitialConnectEvent(self, 'close'));
358 server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
359 server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
360 server.once('error', handleInitialConnectEvent(self, 'error'));
361 server.once('connect', handleInitialConnectEvent(self, 'connect'));
362 // SDAM Monitoring events
363 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
364 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
365 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
367 server.connect(self.s.connectOptions);
370 // Start all the servers
371 while(servers.length > 0) {
372 connect(servers.shift(), timeoutInterval++);
376 function pickProxy(self) {
377 // Get the currently connected Proxies
378 var connectedProxies = self.connectedProxies.slice(0);
381 var lowerBoundLatency = Number.MAX_VALUE;
383 // Determine the lower bound for the Proxies
384 for(var i = 0; i < connectedProxies.length; i++) {
385 if(connectedProxies[i].lastIsMasterMS < lowerBoundLatency) {
386 lowerBoundLatency = connectedProxies[i].lastIsMasterMS;
390 // Filter out the possible servers
391 connectedProxies = connectedProxies.filter(function(server) {
392 if((server.lastIsMasterMS <= (lowerBoundLatency + self.s.localThresholdMS))
393 && server.isConnected()) {
398 // We have no connectedProxies pick first of the connected ones
399 if(connectedProxies.length == 0) {
400 return self.connectedProxies[0];
404 var proxy = connectedProxies[self.index % connectedProxies.length];
406 self.index = (self.index + 1) % connectedProxies.length;
411 function moveServerFrom(from, to, proxy) {
412 for(var i = 0; i < from.length; i++) {
413 if(from[i].name == proxy.name) {
418 for(i = 0; i < to.length; i++) {
419 if(to[i].name == proxy.name) {
427 function removeProxyFrom(from, proxy) {
428 for(var i = 0; i < from.length; i++) {
429 if(from[i].name == proxy.name) {
435 function reconnectProxies(self, proxies, callback) {
437 var count = proxies.length;
440 var _handleEvent = function(self, event) {
446 if(self.state == DESTROYED) {
447 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
448 return this.destroy();
451 if(event == 'connect' && !self.authenticating) {
453 if(self.state == DESTROYED) {
454 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
455 return _self.destroy();
458 // Remove the handlers
459 for(var i = 0; i < handlers.length; i++) {
460 _self.removeAllListeners(handlers[i]);
463 // Add stable state handlers
464 _self.on('error', handleEvent(self, 'error'));
465 _self.on('close', handleEvent(self, 'close'));
466 _self.on('timeout', handleEvent(self, 'timeout'));
467 _self.on('parseError', handleEvent(self, 'parseError'));
469 // Move to the connected servers
470 moveServerFrom(self.disconnectedProxies, self.connectedProxies, _self);
472 self.emit('joined', 'mongos', _self);
473 } else if(event == 'connect' && self.authenticating) {
474 // Move from connectingProxies
475 moveServerFrom(self.connectingProxies, self.disconnectedProxies, _self);
479 // Are we done finish up callback
492 function execute(_server, i) {
493 setTimeout(function() {
495 if(self.state == DESTROYED) {
499 // Create a new server instance
500 var server = new Server(assign({}, self.s.options, {
501 host: _server.name.split(':')[0],
502 port: parseInt(_server.name.split(':')[1], 10)
504 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
506 clientInfo: clone(self.s.clientInfo)
510 server.once('connect', _handleEvent(self, 'connect'));
511 server.once('close', _handleEvent(self, 'close'));
512 server.once('timeout', _handleEvent(self, 'timeout'));
513 server.once('error', _handleEvent(self, 'error'));
514 server.once('parseError', _handleEvent(self, 'parseError'));
516 // SDAM Monitoring events
517 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
518 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
519 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
520 server.connect(self.s.connectOptions);
524 // Create new instances
525 for(var i = 0; i < proxies.length; i++) {
526 execute(proxies[i], i);
530 function topologyMonitor(self, options) {
531 options = options || {};
533 // Set momitoring timeout
534 self.haTimeoutId = setTimeout(function() {
535 if(self.state == DESTROYED) return;
536 // If we have a primary and a disconnect handler, execute
537 // buffered operations
538 if(self.isConnected() && self.s.disconnectHandler) {
539 self.s.disconnectHandler.execute();
542 // Get the connectingServers
543 var proxies = self.connectedProxies.slice(0);
545 var count = proxies.length;
547 // If the count is zero schedule a new fast
548 function pingServer(_self, _server, cb) {
549 // Measure running time
550 var start = new Date().getTime();
552 // Emit the server heartbeat start
553 emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
556 _server.command('admin.$cmd', {
560 socketTimeout: self.s.options.connectionTimeout || 2000,
561 }, function(err, r) {
562 if(self.state == DESTROYED) {
563 // Move from connectingProxies
564 moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
570 var latencyMS = new Date().getTime() - start;
572 // We had an error, remove it from the state
574 // Emit the server heartbeat failure
575 emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
576 // Move from connected proxies to disconnected proxies
577 moveServerFrom(self.connectedProxies, self.disconnectedProxies, _server);
579 // Update the server ismaster
580 _server.ismaster = r.result;
581 _server.lastIsMasterMS = latencyMS;
583 // Server heart beat event
584 emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
591 // No proxies initiate monitor again
592 if(proxies.length == 0) {
593 // Emit close event if any listeners registered
594 if(self.listeners("close").length > 0 && self.state == CONNECTING) {
595 self.emit('error', new MongoError('no mongos proxy available'));
597 self.emit('close', self);
600 // Attempt to connect to any unknown servers
601 return reconnectProxies(self, self.disconnectedProxies, function() {
602 if(self.state == DESTROYED) return;
604 // Are we connected ? emit connect event
605 if(self.state == CONNECTING && options.firstConnect) {
606 self.emit('connect', self);
607 self.emit('fullsetup', self);
608 self.emit('all', self);
609 } else if(self.isConnected()) {
610 self.emit('reconnect', self);
611 } else if(!self.isConnected() && self.listeners("close").length > 0) {
612 self.emit('close', self);
615 // Perform topology monitor
616 topologyMonitor(self);
621 for(var i = 0; i < proxies.length; i++) {
622 pingServer(self, proxies[i], function() {
626 if(self.state == DESTROYED) return;
628 // Attempt to connect to any unknown servers
629 reconnectProxies(self, self.disconnectedProxies, function() {
630 if(self.state == DESTROYED) return;
631 // Perform topology monitor
632 topologyMonitor(self);
637 }, self.s.haInterval);
641 * Returns the last known ismaster document for this server
645 Mongos.prototype.lastIsMaster = function() {
646 return this.ismaster;
650 * Unref all connections belong to this server
653 Mongos.prototype.unref = function() {
655 stateTransition(this, DISCONNECTED);
657 var proxies = this.connectedProxies.concat(this.connectingProxies);
658 proxies.forEach(function(x) {
662 clearTimeout(this.haTimeoutId);
666 * Destroy the server connection
667 * @param {boolean} [options.force=false] Force destroy the pool
670 Mongos.prototype.destroy = function(options) {
672 stateTransition(this, DESTROYED);
674 var proxies = this.connectedProxies.concat(this.connectingProxies);
675 // Clear out any monitoring process
676 if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
678 // Destroy all connecting servers
679 proxies.forEach(function(x) {
683 // Emit toplogy closing event
684 emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
688 * Figure out if the server is connected
692 Mongos.prototype.isConnected = function() {
693 return this.connectedProxies.length > 0;
697 * Figure out if the server instance was destroyed by calling destroy
701 Mongos.prototype.isDestroyed = function() {
702 return this.state == DESTROYED;
709 // Execute write operation
710 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
711 if(typeof options == 'function') callback = options, options = {}, options = options || {};
712 // Ensure we have no options
713 options = options || {};
715 var server = pickProxy(self);
716 // No server found error out
717 if(!server) return callback(new MongoError('no mongos proxy available'));
718 // Execute the command
719 server[op](ns, ops, options, callback);
723 * Insert one or more documents
725 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
726 * @param {array} ops An array of documents to insert
727 * @param {boolean} [options.ordered=true] Execute in order or out of order
728 * @param {object} [options.writeConcern={}] Write concern for the operation
729 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
730 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
731 * @param {opResultCallback} callback A callback function
733 Mongos.prototype.insert = function(ns, ops, options, callback) {
734 if(typeof options == 'function') callback = options, options = {}, options = options || {};
735 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
737 // Not connected but we have a disconnecthandler
738 if(!this.isConnected() && this.s.disconnectHandler != null) {
739 return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
742 // No mongos proxy available
743 if(!this.isConnected()) {
744 return callback(new MongoError('no mongos proxy available'));
747 // Execute write operation
748 executeWriteOperation(this, 'insert', ns, ops, options, callback);
752 * Perform one or more update operations
754 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
755 * @param {array} ops An array of updates
756 * @param {boolean} [options.ordered=true] Execute in order or out of order
757 * @param {object} [options.writeConcern={}] Write concern for the operation
758 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
759 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
760 * @param {opResultCallback} callback A callback function
762 Mongos.prototype.update = function(ns, ops, options, callback) {
763 if(typeof options == 'function') callback = options, options = {}, options = options || {};
764 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
766 // Not connected but we have a disconnecthandler
767 if(!this.isConnected() && this.s.disconnectHandler != null) {
768 return this.s.disconnectHandler.add('update', ns, ops, options, callback);
771 // No mongos proxy available
772 if(!this.isConnected()) {
773 return callback(new MongoError('no mongos proxy available'));
776 // Execute write operation
777 executeWriteOperation(this, 'update', ns, ops, options, callback);
781 * Perform one or more remove operations
783 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
784 * @param {array} ops An array of removes
785 * @param {boolean} [options.ordered=true] Execute in order or out of order
786 * @param {object} [options.writeConcern={}] Write concern for the operation
787 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
788 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
789 * @param {opResultCallback} callback A callback function
791 Mongos.prototype.remove = function(ns, ops, options, callback) {
792 if(typeof options == 'function') callback = options, options = {}, options = options || {};
793 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
795 // Not connected but we have a disconnecthandler
796 if(!this.isConnected() && this.s.disconnectHandler != null) {
797 return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
800 // No mongos proxy available
801 if(!this.isConnected()) {
802 return callback(new MongoError('no mongos proxy available'));
805 // Execute write operation
806 executeWriteOperation(this, 'remove', ns, ops, options, callback);
812 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
813 * @param {object} cmd The command hash
814 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
815 * @param {Connection} [options.connection] Specify connection object to execute command against
816 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
817 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
818 * @param {opResultCallback} callback A callback function
820 Mongos.prototype.command = function(ns, cmd, options, callback) {
821 if(typeof options == 'function') callback = options, options = {}, options = options || {};
822 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
826 var server = pickProxy(self);
828 // Topology is not connected, save the call in the provided store to be
829 // Executed at some point when the handler deems it's reconnected
830 if((server == null || !server.isConnected()) && this.s.disconnectHandler != null) {
831 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
834 // No server returned we had an error
836 return callback(new MongoError('no mongos proxy available'));
839 // Execute the command
840 server.command(ns, cmd, options, callback);
844 * Perform one or more remove operations
846 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
847 * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
848 * @param {object} [options.batchSize=0] Batchsize for the operation
849 * @param {array} [options.documents=[]] Initial documents list for cursor
850 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
851 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
852 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
853 * @param {opResultCallback} callback A callback function
855 Mongos.prototype.cursor = function(ns, cmd, cursorOptions) {
856 cursorOptions = cursorOptions || {};
857 var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
858 return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
862 * Authenticate using a specified mechanism
864 * @param {string} mechanism The Auth mechanism we are invoking
865 * @param {string} db The db we are invoking the mechanism against
866 * @param {...object} param Parameters for the specific mechanism
867 * @param {authResultCallback} callback A callback function
869 Mongos.prototype.auth = function(mechanism, db) {
870 var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
872 var args = Array.prototype.slice.call(arguments, 2);
873 var callback = args.pop();
875 // If we don't have the mechanism fail
876 if(this.authProviders[mechanism] == null && mechanism != 'default') {
877 return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
880 // Are we already authenticating, throw
881 if(this.authenticating) {
882 return callback(new MongoError('authentication or logout allready in process'));
885 // Topology is not connected, save the call in the provided store to be
886 // Executed at some point when the handler deems it's reconnected
887 if(!self.isConnected() && self.s.disconnectHandler != null) {
888 return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
891 // Set to authenticating
892 this.authenticating = true;
896 // Get all the servers
897 var servers = this.connectedProxies.slice(0);
899 if(servers.length == 0) {
900 this.authenticating = false;
901 callback(null, true);
905 function auth(server) {
906 // Arguments without a callback
907 var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
909 var finalArguments = argsWithoutCallback.concat([function(err) {
911 // Save all the errors
912 if(err) errors.push({name: server.name, err: err});
916 self.authenticating = false;
918 // Return the auth error
919 if(errors.length) return callback(MongoError.create({
920 message: 'authentication fail', errors: errors
923 // Successfully authenticated session
924 callback(null, self);
928 // Execute the auth only against non arbiter servers
929 if(!server.lastIsMaster().arbiterOnly) {
930 server.auth.apply(server, finalArguments);
935 var count = servers.length;
936 // Authenticate against all servers
937 while(servers.length > 0) {
938 auth(servers.shift());
943 * Logout from a database
945 * @param {string} db The db we are logging out from
946 * @param {authResultCallback} callback A callback function
948 Mongos.prototype.logout = function(dbName, callback) {
950 // Are we authenticating or logging out, throw
951 if(this.authenticating) {
952 throw new MongoError('authentication or logout allready in process');
955 // Ensure no new members are processed while logging out
956 this.authenticating = true;
958 // Remove from all auth providers (avoid any reaplication of the auth details)
959 var providers = Object.keys(this.authProviders);
960 for(var i = 0; i < providers.length; i++) {
961 this.authProviders[providers[i]].logout(dbName);
964 // Now logout all the servers
965 var servers = this.connectedProxies.slice(0);
966 var count = servers.length;
967 if(count == 0) return callback();
970 function logoutServer(_server, cb) {
971 _server.logout(dbName, function(err) {
972 if(err) errors.push({name: _server.name, err: err});
977 // Execute logout on all server instances
978 for(i = 0; i < servers.length; i++) {
979 logoutServer(servers[i], function() {
983 // Do not block new operations
984 self.authenticating = false;
985 // If we have one or more errors
986 if(errors.length) return callback(MongoError.create({
987 message: f('logout failed against db %s', dbName), errors: errors
1000 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1003 Mongos.prototype.getServer = function() {
1004 var server = pickProxy(this);
1005 if(this.s.debug) this.emit('pickedServer', null, server);
1010 * All raw connections
1012 * @return {Connection[]}
1014 Mongos.prototype.connections = function() {
1015 var connections = [];
1017 for(var i = 0; i < this.connectedProxies.length; i++) {
1018 connections = connections.concat(this.connectedProxies[i].connections());
1025 * A mongos connect event, used to verify that the connection is up and running
1027 * @event Mongos#connect
1032 * A mongos reconnect event, used to verify that the mongos topology has reconnected
1034 * @event Mongos#reconnect
1039 * A mongos fullsetup event, used to signal that all topology members have been contacted.
1041 * @event Mongos#fullsetup
1046 * A mongos all event, used to signal that all topology members have been contacted.
1053 * A server member left the mongos list
1055 * @event Mongos#left
1057 * @param {string} type The type of member that left (mongos)
1058 * @param {Server} server The server object that left
1062 * A server member joined the mongos list
1064 * @event Mongos#joined
1066 * @param {string} type The type of member that left (mongos)
1067 * @param {Server} server The server object that joined
1071 * A server opening SDAM monitoring event
1073 * @event Mongos#serverOpening
1078 * A server closed SDAM monitoring event
1080 * @event Mongos#serverClosed
1085 * A server description SDAM change monitoring event
1087 * @event Mongos#serverDescriptionChanged
1092 * A topology open SDAM event
1094 * @event Mongos#topologyOpening
1099 * A topology closed SDAM event
1101 * @event Mongos#topologyClosed
1106 * A topology structure SDAM change event
1108 * @event Mongos#topologyDescriptionChanged
1113 * A topology serverHeartbeatStarted SDAM event
1115 * @event Mongos#serverHeartbeatStarted
1120 * A topology serverHeartbeatFailed SDAM event
1122 * @event Mongos#serverHeartbeatFailed
1127 * A topology serverHeartbeatSucceeded SDAM change event
1129 * @event Mongos#serverHeartbeatSucceeded
1133 module.exports = Mongos;