3 var inherits = require('util').inherits,
4 f = require('util').format,
5 EventEmitter = require('events').EventEmitter,
6 BSON = require('bson').native().BSON,
7 ReadPreference = require('./read_preference'),
8 BasicCursor = require('../cursor'),
9 Logger = require('../connection/logger'),
10 debugOptions = require('../connection/utils').debugOptions,
11 MongoError = require('../error'),
12 Server = require('./server'),
13 ReplSetState = require('./replset_state'),
14 assign = require('./shared').assign,
15 clone = require('./shared').clone,
16 createClientInfo = require('./shared').createClientInfo;
18 var MongoCR = require('../auth/mongocr')
19 , X509 = require('../auth/x509')
20 , Plain = require('../auth/plain')
21 , GSSAPI = require('../auth/gssapi')
22 , SSPI = require('../auth/sspi')
23 , ScramSHA1 = require('../auth/scram');
27 var DISCONNECTED = 'disconnected';
28 var CONNECTING = 'connecting';
29 var CONNECTED = 'connected';
30 var DESTROYED = 'destroyed';
32 function stateTransition(self, newState) {
33 var legalTransitions = {
34 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
35 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
36 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
37 'destroyed': [DESTROYED]
41 var legalStates = legalTransitions[self.state];
42 if(legalStates && legalStates.indexOf(newState) != -1) {
43 self.state = newState;
45 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
46 , self.id, self.state, newState, legalStates));
51 // ReplSet instance id
53 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
56 * Creates a new Replset instance
58 * @param {array} seedlist A list of seeds for the replicaset
59 * @param {boolean} options.setName The Replicaset set name
60 * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
61 * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
62 * @param {boolean} [options.emitError=false] Server will emit errors events
63 * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
64 * @param {number} [options.size=5] Server connection pool size
65 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
66 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
67 * @param {boolean} [options.noDelay=true] TCP Connection no delay
68 * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
69 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
70 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
71 * @param {boolean} [options.ssl=false] Use SSL for connection
72 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
73 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
74 * @param {Buffer} [options.cert] SSL Certificate binary buffer
75 * @param {Buffer} [options.key] SSL Key file binary buffer
76 * @param {string} [options.passphrase] SSL Certificate pass phrase
77 * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
78 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
79 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
80 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
81 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
82 * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
83 * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
84 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
85 * @return {ReplSet} A cursor instance
86 * @fires ReplSet#connect
88 * @fires ReplSet#joined
90 * @fires ReplSet#failed
91 * @fires ReplSet#fullsetup
93 * @fires ReplSet#error
94 * @fires ReplSet#serverHeartbeatStarted
95 * @fires ReplSet#serverHeartbeatSucceeded
96 * @fires ReplSet#serverHeartbeatFailed
97 * @fires ReplSet#topologyOpening
98 * @fires ReplSet#topologyClosed
99 * @fires ReplSet#topologyDescriptionChanged
101 var ReplSet = function(seedlist, options) {
103 options = options || {};
106 if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
108 if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
110 seedlist.forEach(function(e) {
111 if(typeof e.host != 'string' || typeof e.port != 'number')
112 throw new MongoError("seedlist entry must contain a host and port");
115 // Add event listener
116 EventEmitter.call(this);
121 // Get the localThresholdMS
122 var localThresholdMS = options.localThresholdMS || 15;
123 // Backward compatibility
124 if(options.acceptableLatency) localThresholdMS = options.acceptableLatency;
127 var logger = Logger('ReplSet', options);
131 options: assign({}, options),
133 bson: options.bson || new BSON(),
135 Cursor: options.cursorFactory || BasicCursor,
141 replicaSetState: new ReplSetState({
142 id: this.id, setName: options.setName,
143 acceptableLatency: localThresholdMS,
144 heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
147 // Current servers we are connecting to
148 connectingServers: [],
150 haInterval: options.haInterval ? options.haInterval : 10000,
151 // Minimum heartbeat frequency used if we detect a server close
152 minHeartbeatFrequencyMS: 500,
153 // Disconnect handler
154 disconnectHandler: options.disconnectHandler,
155 // Server selection index
157 // Connect function options passed in
159 // Are we running in debug mode
160 debug: typeof options.debug == 'boolean' ? options.debug : false,
162 clientInfo: createClientInfo(options)
165 // Add handler for topology change
166 this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); });
168 // Log info warning if the socketTimeout < haInterval as it will cause
169 // a lot of recycled connections to happen.
170 if(this.s.logger.isWarn()
171 && this.s.options.socketTimeout != 0
172 && this.s.options.socketTimeout < this.s.haInterval) {
173 this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
174 , this.s.options.socketTimeout, this.s.haInterval));
177 // All the authProviders
178 this.authProviders = options.authProviders || {
179 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
180 , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
181 , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
184 // Add forwarding of events from state handler
185 var types = ['joined', 'left'];
186 types.forEach(function(x) {
187 self.s.replicaSetState.on(x, function(t, s) {
193 this.initialConnectState = {
194 connect: false, fullsetup: false, all: false
197 // Disconnected state
198 this.state = DISCONNECTED;
199 this.haTimeoutId = null;
200 // Are we authenticating
201 this.authenticating = false;
203 this.ismaster = null;
206 inherits(ReplSet, EventEmitter);
208 Object.defineProperty(ReplSet.prototype, 'type', {
209 enumerable:true, get: function() { return 'replset'; }
212 function attemptReconnect(self) {
213 if(self.runningAttempReconnect) return;
215 self.runningAttempReconnect = true;
216 // Wait before execute
217 self.haTimeoutId = setTimeout(function() {
218 if(self.state == DESTROYED) return;
221 if(self.s.logger.isDebug()) {
222 self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id));
225 // Get all known hosts
226 var keys = Object.keys(self.s.replicaSetState.set);
227 var servers = keys.map(function(x) {
228 return new Server(assign({}, self.s.options, {
229 host: x.split(':')[0], port: parseInt(x.split(':')[1], 10)
231 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
233 clientInfo: clone(self.s.clientInfo)
237 // Create the list of servers
238 self.s.connectingServers = servers.slice(0);
240 // Handle all events coming from servers
241 function _handleEvent(self, event) {
242 return function(err) {
243 // Destroy the instance
244 if(self.state == DESTROYED) {
245 return this.destroy();
249 if(self.s.logger.isDebug()) {
250 self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event));
253 // Check if we are done
255 // Done with the reconnection attempt
256 if(self.s.connectingServers.length == 0) {
257 if(self.state == DESTROYED) return;
259 // If we have a primary and a disconnect handler, execute
260 // buffered operations
261 if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
262 self.s.disconnectHandler.execute();
263 } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
264 self.s.disconnectHandler.execute({ executePrimary:true });
265 } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
266 self.s.disconnectHandler.execute({ executeSecondary:true });
269 // Do we have a primary
270 if(self.s.replicaSetState.hasPrimary()) {
271 // Connect any missing servers
272 connectNewServers(self, self.s.replicaSetState.unknownServers, function(err, cb) {
274 if(self.s.logger.isDebug()) {
275 self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id));
279 self.runningAttempReconnect = false;
280 // Go back to normal topology monitoring
281 topologyMonitor(self);
284 if(self.listeners("close").length > 0) {
285 self.emit('close', self);
289 self.runningAttempReconnect = false;
290 // Attempt a new reconnect
291 attemptReconnect(self);
296 // Remove the server from our list
297 for(var i = 0; i < self.s.connectingServers.length; i++) {
298 if(self.s.connectingServers[i].equals(this)) {
299 self.s.connectingServers.splice(i, 1);
303 // Keep reference to server
307 if(self.s.logger.isDebug()) {
308 self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id));
311 // Connect and not authenticating
312 if(event == 'connect' && !self.authenticating) {
313 if(self.state == DESTROYED) {
314 return _self.destroy();
317 // Update the replicaset state
318 if(self.s.replicaSetState.update(_self)) {
319 // Primary lastIsMaster store it
320 if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
321 self.ismaster = _self.lastIsMaster();
324 // Remove the handlers
325 for(var i = 0; i < handlers.length; i++) {
326 _self.removeAllListeners(handlers[i]);
329 // Add stable state handlers
330 _self.on('error', handleEvent(self, 'error'));
331 _self.on('close', handleEvent(self, 'close'));
332 _self.on('timeout', handleEvent(self, 'timeout'));
333 _self.on('parseError', handleEvent(self, 'parseError'));
337 } else if(event == 'connect' && self.authenticating) {
345 // Index used to interleaf the server connects, avoiding
346 // runtime issues on io constrained vm's
347 var timeoutInterval = 0;
349 function connect(server, timeoutInterval) {
350 setTimeout(function() {
351 server.once('connect', _handleEvent(self, 'connect'));
352 server.once('close', _handleEvent(self, 'close'));
353 server.once('timeout', _handleEvent(self, 'timeout'));
354 server.once('error', _handleEvent(self, 'error'));
355 server.once('parseError', _handleEvent(self, 'parseError'));
357 // SDAM Monitoring events
358 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
359 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
360 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
362 server.connect(self.s.connectOptions);
366 // Connect all servers
367 while(servers.length > 0) {
368 connect(servers.shift(), timeoutInterval++);
370 }, self.s.minHeartbeatFrequencyMS);
373 function connectNewServers(self, servers, callback) {
375 var count = servers.length;
378 var _handleEvent = function(self, event) {
379 return function(err, r) {
384 if(self.state == DESTROYED) {
385 return this.destroy();
388 if(event == 'connect' && !self.authenticating) {
390 if(self.state == DESTROYED) {
391 return _self.destroy();
394 var result = self.s.replicaSetState.update(_self);
395 // Update the state with the new server
397 // Primary lastIsMaster store it
398 if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
399 self.ismaster = _self.lastIsMaster();
402 // Remove the handlers
403 for(var i = 0; i < handlers.length; i++) {
404 _self.removeAllListeners(handlers[i]);
407 // Add stable state handlers
408 _self.on('error', handleEvent(self, 'error'));
409 _self.on('close', handleEvent(self, 'close'));
410 _self.on('timeout', handleEvent(self, 'timeout'));
411 _self.on('parseError', handleEvent(self, 'parseError'));
415 } else if(event == 'connect' && self.authenticating) {
419 // Are we done finish up callback
420 if(count == 0) { callback(); }
425 if(count == 0) return callback();
428 function execute(_server, i) {
429 setTimeout(function() {
431 if(self.state == DESTROYED) {
435 // Create a new server instance
436 var server = new Server(assign({}, self.s.options, {
437 host: _server.split(':')[0],
438 port: parseInt(_server.split(':')[1], 10)
440 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
442 clientInfo: clone(self.s.clientInfo)
446 server.once('connect', _handleEvent(self, 'connect'));
447 server.once('close', _handleEvent(self, 'close'));
448 server.once('timeout', _handleEvent(self, 'timeout'));
449 server.once('error', _handleEvent(self, 'error'));
450 server.once('parseError', _handleEvent(self, 'parseError'));
452 // SDAM Monitoring events
453 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
454 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
455 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
456 server.connect(self.s.connectOptions);
460 // Create new instances
461 for(var i = 0; i < servers.length; i++) {
462 execute(servers[i], i);
466 function topologyMonitor(self, options) {
467 options = options || {};
469 // Set momitoring timeout
470 self.haTimeoutId = setTimeout(function() {
471 if(self.state == DESTROYED) return;
473 // Is this a on connect topology discovery
474 // Schedule a proper topology monitoring to happen
475 // To ensure any discovered servers do not timeout
476 // while waiting for the initial discovery to happen.
477 if(options.haInterval) {
478 topologyMonitor(self);
481 // If we have a primary and a disconnect handler, execute
482 // buffered operations
483 if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
484 self.s.disconnectHandler.execute();
485 } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
486 self.s.disconnectHandler.execute({ executePrimary:true });
487 } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
488 self.s.disconnectHandler.execute({ executeSecondary:true });
491 // Get the connectingServers
492 var connectingServers = self.s.replicaSetState.allServers();
494 if(self.s.logger.isDebug()) {
495 self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]'
497 , connectingServers.map(function(x) {
502 var count = connectingServers.length;
504 // If we have no servers connected
505 if(count == 0 && !options.haInterval) {
506 if(self.listeners("close").length > 0) {
507 self.emit('close', self);
510 return attemptReconnect(self);
513 // If the count is zero schedule a new fast
514 function pingServer(_self, _server, cb) {
515 // Measure running time
516 var start = new Date().getTime();
518 // Emit the server heartbeat start
519 emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
521 _server.command('admin.$cmd', {ismaster:true}, {monitoring: true}, function(err, r) {
522 if(self.state == DESTROYED) {
528 var latencyMS = new Date().getTime() - start;
530 // Set the last updatedTime
531 var hrTime = process.hrtime();
532 // Calculate the last update time
533 _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000);
535 // We had an error, remove it from the state
537 // Emit the server heartbeat failure
538 emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
540 // Update the server ismaster
541 _server.ismaster = r.result;
543 // Check if we have a lastWriteDate convert it to MS
544 // and store on the server instance for later use
545 if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) {
546 _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime();
549 // Do we have a brand new server
550 if(_server.lastIsMasterMS == -1) {
551 _server.lastIsMasterMS = latencyMS;
552 } else if(_server.lastIsMasterMS) {
553 // After the first measurement, average RTT MUST be computed using an
554 // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
555 // If the prior average is denoted old_rtt, then the new average (new_rtt) is
556 // computed from a new RTT measurement (x) using the following formula:
558 // new_rtt = alpha * x + (1 - alpha) * old_rtt
559 _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS;
562 if(_self.s.replicaSetState.update(_server)) {
563 // Primary lastIsMaster store it
564 if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) {
565 self.ismaster = _server.lastIsMaster();
569 // Server heart beat event
570 emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
573 // Calculate the stalness for this server
574 self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval);
581 // Connect any missing servers
582 function connectMissingServers() {
583 if(self.state == DESTROYED) return;
585 // Attempt to connect to any unknown servers
586 connectNewServers(self, self.s.replicaSetState.unknownServers, function(err, cb) {
587 if(self.state == DESTROYED) return;
589 // Check if we have an options.haInterval (meaning it was triggered from connect)
590 if(options.haInterval) {
591 // Do we have a primary and secondary
592 if(self.state == CONNECTING
593 && self.s.replicaSetState.hasPrimaryAndSecondary()) {
594 // Transition to connected
595 stateTransition(self, CONNECTED);
596 // Update initial state
597 self.initialConnectState.connect = true;
598 self.initialConnectState.fullsetup = true;
599 self.initialConnectState.all = true;
600 // Emit fullsetup and all events
601 process.nextTick(function() {
602 self.emit('connect', self);
603 self.emit('fullsetup', self);
604 self.emit('all', self);
606 } else if(self.state == CONNECTING
607 && self.s.replicaSetState.hasPrimary()) {
608 // Transition to connected
609 stateTransition(self, CONNECTED);
610 // Update initial state
611 self.initialConnectState.connect = true;
612 // Emit connected sign
613 process.nextTick(function() {
614 self.emit('connect', self);
616 } else if(self.state == CONNECTING
617 && self.s.replicaSetState.hasSecondary()
618 && self.s.options.secondaryOnlyConnectionAllowed) {
619 // Transition to connected
620 stateTransition(self, CONNECTED);
621 // Update initial state
622 self.initialConnectState.connect = true;
623 // Emit connected sign
624 process.nextTick(function() {
625 self.emit('connect', self);
627 } else if(self.state == CONNECTING) {
628 self.emit('error', new MongoError('no primary found in replicaset'));
629 // Destroy the topology
630 return self.destroy();
631 } else if(self.state == CONNECTED
632 && self.s.replicaSetState.hasPrimaryAndSecondary()
633 && !self.initialConnectState.fullsetup) {
634 self.initialConnectState.fullsetup = true;
635 // Emit fullsetup and all events
636 process.nextTick(function() {
637 self.emit('fullsetup', self);
638 self.emit('all', self);
643 if(!options.haInterval) topologyMonitor(self);
647 // No connectingServers but unknown servers
648 if(connectingServers.length == 0
649 && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) {
650 return connectMissingServers();
651 } else if(connectingServers.length == 0 && options.haInterval) {
653 return self.emit('error', new MongoError('no valid replicaset members found'));
657 for(var i = 0; i < connectingServers.length; i++) {
658 pingServer(self, connectingServers[i], function(err, r) {
662 connectMissingServers();
666 }, options.haInterval || self.s.haInterval)
669 function handleEvent(self, event) {
670 return function(err) {
671 if(self.state == DESTROYED) return;
673 if(self.s.logger.isDebug()) {
674 self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id));
677 self.s.replicaSetState.remove(this);
681 function handleInitialConnectEvent(self, event) {
682 return function(err) {
684 if(self.s.logger.isDebug()) {
685 self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id));
688 // Destroy the instance
689 if(self.state == DESTROYED) {
690 return this.destroy();
693 // Check the type of server
694 if(event == 'connect') {
696 var result = self.s.replicaSetState.update(this);
698 // Primary lastIsMaster store it
699 if(this.lastIsMaster() && this.lastIsMaster().ismaster) {
700 self.ismaster = this.lastIsMaster();
704 if(self.s.logger.isDebug()) {
705 self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set)));
708 // Remove the handlers
709 for(var i = 0; i < handlers.length; i++) {
710 this.removeAllListeners(handlers[i]);
713 // Add stable state handlers
714 this.on('error', handleEvent(self, 'error'));
715 this.on('close', handleEvent(self, 'close'));
716 this.on('timeout', handleEvent(self, 'timeout'));
717 this.on('parseError', handleEvent(self, 'parseError'));
718 } else if(result instanceof MongoError) {
721 return self.emit('error', result);
726 // Emit failure to connect
727 self.emit('failed', this);
728 // Remove from the state
729 self.s.replicaSetState.remove(this);
732 // Remove from the list from connectingServers
733 for(var i = 0; i < self.s.connectingServers.length; i++) {
734 if(self.s.connectingServers[i].equals(this)) {
735 self.s.connectingServers.splice(i, 1);
739 // Trigger topologyMonitor
740 if(self.s.connectingServers.length == 0) {
741 topologyMonitor(self, {haInterval: 1});
746 function connectServers(self, servers) {
747 // Update connectingServers
748 self.s.connectingServers = self.s.connectingServers.concat(servers);
750 // Index used to interleaf the server connects, avoiding
751 // runtime issues on io constrained vm's
752 var timeoutInterval = 0;
754 function connect(server, timeoutInterval) {
755 setTimeout(function() {
756 // Add the server to the state
757 if(self.s.replicaSetState.update(server)) {
758 // Primary lastIsMaster store it
759 if(server.lastIsMaster() && server.lastIsMaster().ismaster) {
760 self.ismaster = server.lastIsMaster();
764 // Add event handlers
765 server.once('close', handleInitialConnectEvent(self, 'close'));
766 server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
767 server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
768 server.once('error', handleInitialConnectEvent(self, 'error'));
769 server.once('connect', handleInitialConnectEvent(self, 'connect'));
770 // SDAM Monitoring events
771 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
772 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
773 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
775 server.connect(self.s.connectOptions);
779 // Start all the servers
780 while(servers.length > 0) {
781 connect(servers.shift(), timeoutInterval++);
786 * Emit event if it exists
789 function emitSDAMEvent(self, event, description) {
790 if(self.listeners(event).length > 0) {
791 self.emit(event, description);
796 * Initiate server connect
798 * @param {array} [options.auth=null] Array of auth options to apply on connect
800 ReplSet.prototype.connect = function(options) {
802 // Add any connect level options to the internal state
803 this.s.connectOptions = options || {};
804 // Set connecting state
805 stateTransition(this, CONNECTING);
806 // Create server instances
807 var servers = this.s.seedlist.map(function(x) {
808 return new Server(assign({}, self.s.options, x, {
809 authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
811 clientInfo: clone(self.s.clientInfo)
815 // Emit the topology opening event
816 emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
818 // Start all server connections
819 connectServers(self, servers);
823 * Destroy the server connection
826 ReplSet.prototype.destroy = function() {
828 stateTransition(this, DESTROYED);
829 // Clear out any monitoring process
830 if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
831 // Destroy the replicaset
832 this.s.replicaSetState.destroy();
834 // Destroy all connecting servers
835 this.s.connectingServers.forEach(function(x) {
839 // Emit toplogy closing event
840 emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
844 * Unref all connections belong to this server
847 ReplSet.prototype.unref = function() {
849 stateTransition(this, DISCONNECTED);
851 this.s.replicaSetState.allServers().forEach(function(x) {
855 clearTimeout(this.haTimeoutId);
859 * Returns the last known ismaster document for this server
863 ReplSet.prototype.lastIsMaster = function() {
864 return this.s.replicaSetState.primary
865 ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster;
869 * All raw connections
871 * @return {Connection[]}
873 ReplSet.prototype.connections = function() {
874 var servers = this.s.replicaSetState.allServers();
875 var connections = [];
876 for(var i = 0; i < servers.length; i++) {
877 connections = connections.concat(servers[i].connections());
884 * Figure out if the server is connected
886 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
889 ReplSet.prototype.isConnected = function(options) {
890 options = options || {};
892 // If we are authenticating signal not connected
893 // To avoid interleaving of operations
894 if(this.authenticating) return false;
896 // If we specified a read preference check if we are connected to something
897 // than can satisfy this
898 if(options.readPreference
899 && options.readPreference.equals(ReadPreference.secondary)) {
900 return this.s.replicaSetState.hasSecondary();
903 if(options.readPreference
904 && options.readPreference.equals(ReadPreference.primary)) {
905 return this.s.replicaSetState.hasPrimary();
908 if(options.readPreference
909 && options.readPreference.equals(ReadPreference.primaryPreferred)) {
910 return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
913 if(options.readPreference
914 && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
915 return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
918 if(this.s.secondaryOnlyConnectionAllowed
919 && this.s.replicaSetState.hasSecondary()) {
923 return this.s.replicaSetState.hasPrimary();
927 * Figure out if the replicaset instance was destroyed by calling destroy
931 ReplSet.prototype.isDestroyed = function() {
932 return this.state == DESTROYED;
938 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
941 ReplSet.prototype.getServer = function(options) {
942 // Ensure we have no options
943 options = options || {};
945 // Pick the right server baspickServerd on readPreference
946 var server = this.s.replicaSetState.pickServer(options.readPreference);
947 if(this.s.debug) this.emit('pickedServer', options.readPreference, server);
952 * Get all connected servers
956 ReplSet.prototype.getServers = function() {
957 return this.s.replicaSetState.allServers();
960 function basicReadPreferenceValidation(self, options) {
961 if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
962 throw new Error("readPreference must be an instance of ReadPreference");
967 // Execute write operation
968 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
969 if(typeof options == 'function') callback = options, options = {}, options = options || {};
970 // Ensure we have no options
971 options = options || {};
973 // No server returned we had an error
974 if(self.s.replicaSetState.primary == null) {
975 return callback(new MongoError("no primary server found"));
978 // Execute the command
979 self.s.replicaSetState.primary[op](ns, ops, options, callback);
983 * Insert one or more documents
985 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
986 * @param {array} ops An array of documents to insert
987 * @param {boolean} [options.ordered=true] Execute in order or out of order
988 * @param {object} [options.writeConcern={}] Write concern for the operation
989 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
990 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
991 * @param {opResultCallback} callback A callback function
993 ReplSet.prototype.insert = function(ns, ops, options, callback) {
994 if(typeof options == 'function') callback = options, options = {}, options = options || {};
995 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
997 // Not connected but we have a disconnecthandler
998 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
999 return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
1002 // Execute write operation
1003 executeWriteOperation(this, 'insert', ns, ops, options, callback);
1007 * Perform one or more update operations
1009 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1010 * @param {array} ops An array of updates
1011 * @param {boolean} [options.ordered=true] Execute in order or out of order
1012 * @param {object} [options.writeConcern={}] Write concern for the operation
1013 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1014 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1015 * @param {opResultCallback} callback A callback function
1017 ReplSet.prototype.update = function(ns, ops, options, callback) {
1018 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1019 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1021 // Not connected but we have a disconnecthandler
1022 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1023 return this.s.disconnectHandler.add('update', ns, ops, options, callback);
1026 // Execute write operation
1027 executeWriteOperation(this, 'update', ns, ops, options, callback);
1031 * Perform one or more remove operations
1033 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1034 * @param {array} ops An array of removes
1035 * @param {boolean} [options.ordered=true] Execute in order or out of order
1036 * @param {object} [options.writeConcern={}] Write concern for the operation
1037 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1038 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1039 * @param {opResultCallback} callback A callback function
1041 ReplSet.prototype.remove = function(ns, ops, options, callback) {
1042 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1043 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1045 // Not connected but we have a disconnecthandler
1046 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1047 return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
1050 // Execute write operation
1051 executeWriteOperation(this, 'remove', ns, ops, options, callback);
1057 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1058 * @param {object} cmd The command hash
1059 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1060 * @param {Connection} [options.connection] Specify connection object to execute command against
1061 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1062 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1063 * @param {opResultCallback} callback A callback function
1065 ReplSet.prototype.command = function(ns, cmd, options, callback) {
1066 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1067 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1070 // Establish readPreference
1071 var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
1073 // If the readPreference is primary and we have no primary, store it
1074 if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1075 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1076 } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) {
1077 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1078 } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1079 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1083 var server = this.s.replicaSetState.pickServer(readPreference);
1084 // We received an error, return it
1085 if(!(server instanceof Server)) return callback(server);
1087 if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
1089 // No server returned we had an error
1090 if(server == null) {
1091 return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference)));
1094 // Execute the command
1095 server.command(ns, cmd, options, callback);
1099 * Authenticate using a specified mechanism
1101 * @param {string} mechanism The Auth mechanism we are invoking
1102 * @param {string} db The db we are invoking the mechanism against
1103 * @param {...object} param Parameters for the specific mechanism
1104 * @param {authResultCallback} callback A callback function
1106 ReplSet.prototype.auth = function(mechanism, db) {
1107 var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
1109 var args = Array.prototype.slice.call(arguments, 2);
1110 var callback = args.pop();
1112 // If we don't have the mechanism fail
1113 if(this.authProviders[mechanism] == null && mechanism != 'default') {
1114 return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
1117 // Are we already authenticating, throw
1118 if(this.authenticating) {
1119 return callback(new MongoError('authentication or logout allready in process'));
1122 // Topology is not connected, save the call in the provided store to be
1123 // Executed at some point when the handler deems it's reconnected
1124 if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) {
1125 return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
1128 // Set to authenticating
1129 this.authenticating = true;
1133 // Get all the servers
1134 var servers = this.s.replicaSetState.allServers();
1135 // No servers return
1136 if(servers.length == 0) {
1137 this.authenticating = false;
1138 callback(null, true);
1142 function auth(server) {
1143 // Arguments without a callback
1144 var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
1146 var finalArguments = argsWithoutCallback.concat([function(err, r) {
1148 // Save all the errors
1149 if(err) errors.push({name: server.name, err: err});
1153 self.authenticating = false;
1155 // Return the auth error
1156 if(errors.length) return callback(MongoError.create({
1157 message: 'authentication fail', errors: errors
1160 // Successfully authenticated session
1161 callback(null, self);
1165 if(!server.lastIsMaster().arbiterOnly) {
1166 // Execute the auth only against non arbiter servers
1167 server.auth.apply(server, finalArguments);
1169 // If we are authenticating against an arbiter just ignore it
1170 finalArguments.pop()(null);
1175 var count = servers.length;
1176 // Authenticate against all servers
1177 while(servers.length > 0) {
1178 auth(servers.shift());
1183 * Logout from a database
1185 * @param {string} db The db we are logging out from
1186 * @param {authResultCallback} callback A callback function
1188 ReplSet.prototype.logout = function(dbName, callback) {
1190 // Are we authenticating or logging out, throw
1191 if(this.authenticating) {
1192 throw new MongoError('authentication or logout allready in process');
1195 // Ensure no new members are processed while logging out
1196 this.authenticating = true;
1198 // Remove from all auth providers (avoid any reaplication of the auth details)
1199 var providers = Object.keys(this.authProviders);
1200 for(var i = 0; i < providers.length; i++) {
1201 this.authProviders[providers[i]].logout(dbName);
1204 // Now logout all the servers
1205 var servers = this.s.replicaSetState.allServers();
1206 var count = servers.length;
1207 if(count == 0) return callback();
1210 // Execute logout on all server instances
1211 for(var i = 0; i < servers.length; i++) {
1212 servers[i].logout(dbName, function(err) {
1214 if(err) errors.push({name: server.name, err: err});
1217 // Do not block new operations
1218 self.authenticating = false;
1219 // If we have one or more errors
1220 if(errors.length) return callback(MongoError.create({
1221 message: f('logout failed against db %s', dbName), errors: errors
1232 * Perform one or more remove operations
1234 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1235 * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
1236 * @param {object} [options.batchSize=0] Batchsize for the operation
1237 * @param {array} [options.documents=[]] Initial documents list for cursor
1238 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1239 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1240 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1241 * @param {opResultCallback} callback A callback function
1243 ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
1244 cursorOptions = cursorOptions || {};
1245 var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
1246 return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
1250 * A replset connect event, used to verify that the connection is up and running
1252 * @event ReplSet#connect
1257 * A replset reconnect event, used to verify that the topology reconnected
1259 * @event ReplSet#reconnect
1264 * A replset fullsetup event, used to signal that all topology members have been contacted.
1266 * @event ReplSet#fullsetup
1271 * A replset all event, used to signal that all topology members have been contacted.
1273 * @event ReplSet#all
1278 * A replset failed event, used to signal that initial replset connection failed.
1280 * @event ReplSet#failed
1285 * A server member left the replicaset
1287 * @event ReplSet#left
1289 * @param {string} type The type of member that left (primary|secondary|arbiter)
1290 * @param {Server} server The server object that left
1294 * A server member joined the replicaset
1296 * @event ReplSet#joined
1298 * @param {string} type The type of member that joined (primary|secondary|arbiter)
1299 * @param {Server} server The server object that joined
1303 * A server opening SDAM monitoring event
1305 * @event ReplSet#serverOpening
1310 * A server closed SDAM monitoring event
1312 * @event ReplSet#serverClosed
1317 * A server description SDAM change monitoring event
1319 * @event ReplSet#serverDescriptionChanged
1324 * A topology open SDAM event
1326 * @event ReplSet#topologyOpening
1331 * A topology closed SDAM event
1333 * @event ReplSet#topologyClosed
1338 * A topology structure SDAM change event
1340 * @event ReplSet#topologyDescriptionChanged
1345 * A topology serverHeartbeatStarted SDAM event
1347 * @event ReplSet#serverHeartbeatStarted
1352 * A topology serverHeartbeatFailed SDAM event
1354 * @event ReplSet#serverHeartbeatFailed
1359 * A topology serverHeartbeatSucceeded SDAM change event
1361 * @event ReplSet#serverHeartbeatSucceeded
1365 module.exports = ReplSet;