3 var inherits = require('util').inherits,
4 f = require('util').format,
5 EventEmitter = require('events').EventEmitter,
6 ReadPreference = require('./read_preference'),
7 BasicCursor = require('../cursor'),
8 retrieveBSON = require('../connection/utils').retrieveBSON,
9 Logger = require('../connection/logger'),
10 MongoError = require('../error'),
11 Server = require('./server'),
12 ReplSetState = require('./replset_state'),
13 assign = require('./shared').assign,
14 clone = require('./shared').clone,
15 createClientInfo = require('./shared').createClientInfo;
17 var MongoCR = require('../auth/mongocr')
18 , X509 = require('../auth/x509')
19 , Plain = require('../auth/plain')
20 , GSSAPI = require('../auth/gssapi')
21 , SSPI = require('../auth/sspi')
22 , ScramSHA1 = require('../auth/scram');
24 var BSON = retrieveBSON();
28 var DISCONNECTED = 'disconnected';
29 var CONNECTING = 'connecting';
30 var CONNECTED = 'connected';
31 var DESTROYED = 'destroyed';
33 function stateTransition(self, newState) {
34 var legalTransitions = {
35 'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
36 'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
37 'connected': [CONNECTED, DISCONNECTED, DESTROYED],
38 'destroyed': [DESTROYED]
42 var legalStates = legalTransitions[self.state];
43 if(legalStates && legalStates.indexOf(newState) != -1) {
44 self.state = newState;
46 self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
47 , self.id, self.state, newState, legalStates));
52 // ReplSet instance id
54 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
57 * Creates a new Replset instance
59 * @param {array} seedlist A list of seeds for the replicaset
60 * @param {boolean} options.setName The Replicaset set name
61 * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
62 * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
63 * @param {boolean} [options.emitError=false] Server will emit errors events
64 * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
65 * @param {number} [options.size=5] Server connection pool size
66 * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
67 * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
68 * @param {boolean} [options.noDelay=true] TCP Connection no delay
69 * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
70 * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
71 * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
72 * @param {boolean} [options.ssl=false] Use SSL for connection
73 * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
74 * @param {Buffer} [options.ca] SSL Certificate store binary buffer
75 * @param {Buffer} [options.cert] SSL Certificate binary buffer
76 * @param {Buffer} [options.key] SSL Key file binary buffer
77 * @param {string} [options.passphrase] SSL Certificate pass phrase
78 * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
79 * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
80 * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
81 * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
82 * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
83 * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
84 * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
85 * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
86 * @return {ReplSet} A cursor instance
87 * @fires ReplSet#connect
89 * @fires ReplSet#joined
91 * @fires ReplSet#failed
92 * @fires ReplSet#fullsetup
94 * @fires ReplSet#error
95 * @fires ReplSet#serverHeartbeatStarted
96 * @fires ReplSet#serverHeartbeatSucceeded
97 * @fires ReplSet#serverHeartbeatFailed
98 * @fires ReplSet#topologyOpening
99 * @fires ReplSet#topologyClosed
100 * @fires ReplSet#topologyDescriptionChanged
101 * @property {string} type the topology type.
102 * @property {string} parserType the parser type used (c++ or js).
104 var ReplSet = function(seedlist, options) {
106 options = options || {};
109 if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
111 if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
113 seedlist.forEach(function(e) {
114 if(typeof e.host != 'string' || typeof e.port != 'number')
115 throw new MongoError("seedlist entry must contain a host and port");
118 // Add event listener
119 EventEmitter.call(this);
124 // Get the localThresholdMS
125 var localThresholdMS = options.localThresholdMS || 15;
126 // Backward compatibility
127 if(options.acceptableLatency) localThresholdMS = options.acceptableLatency;
130 var logger = Logger('ReplSet', options);
134 options: assign({}, options),
136 bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
137 BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
138 BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
140 Cursor: options.cursorFactory || BasicCursor,
146 replicaSetState: new ReplSetState({
147 id: this.id, setName: options.setName,
148 acceptableLatency: localThresholdMS,
149 heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
152 // Current servers we are connecting to
153 connectingServers: [],
155 haInterval: options.haInterval ? options.haInterval : 10000,
156 // Minimum heartbeat frequency used if we detect a server close
157 minHeartbeatFrequencyMS: 500,
158 // Disconnect handler
159 disconnectHandler: options.disconnectHandler,
160 // Server selection index
162 // Connect function options passed in
164 // Are we running in debug mode
165 debug: typeof options.debug == 'boolean' ? options.debug : false,
167 clientInfo: createClientInfo(options)
170 // Add handler for topology change
171 this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); });
173 // Log info warning if the socketTimeout < haInterval as it will cause
174 // a lot of recycled connections to happen.
175 if(this.s.logger.isWarn()
176 && this.s.options.socketTimeout != 0
177 && this.s.options.socketTimeout < this.s.haInterval) {
178 this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
179 , this.s.options.socketTimeout, this.s.haInterval));
182 // All the authProviders
183 this.authProviders = options.authProviders || {
184 'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
185 , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
186 , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
189 // Add forwarding of events from state handler
190 var types = ['joined', 'left'];
191 types.forEach(function(x) {
192 self.s.replicaSetState.on(x, function(t, s) {
193 if(self.state === CONNECTED && x === 'joined' && t == 'primary') {
194 self.emit('reconnect', self);
202 this.initialConnectState = {
203 connect: false, fullsetup: false, all: false
206 // Disconnected state
207 this.state = DISCONNECTED;
208 this.haTimeoutId = null;
209 // Are we authenticating
210 this.authenticating = false;
212 this.ismaster = null;
215 inherits(ReplSet, EventEmitter);
217 Object.defineProperty(ReplSet.prototype, 'type', {
218 enumerable:true, get: function() { return 'replset'; }
221 Object.defineProperty(ReplSet.prototype, 'parserType', {
222 enumerable:true, get: function() {
223 return BSON.native ? "c++" : "js";
227 function attemptReconnect(self) {
228 if(self.runningAttempReconnect) return;
230 self.runningAttempReconnect = true;
231 // Wait before execute
232 self.haTimeoutId = setTimeout(function() {
233 if(self.state == DESTROYED) return;
236 if(self.s.logger.isDebug()) {
237 self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id));
240 // Get all known hosts
241 var keys = Object.keys(self.s.replicaSetState.set);
242 var servers = keys.map(function(x) {
243 return new Server(assign({}, self.s.options, {
244 host: x.split(':')[0], port: parseInt(x.split(':')[1], 10)
246 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
248 clientInfo: clone(self.s.clientInfo)
252 // Create the list of servers
253 self.s.connectingServers = servers.slice(0);
255 // Handle all events coming from servers
256 function _handleEvent(self, event) {
258 // Destroy the instance
259 if(self.state == DESTROYED) {
260 return this.destroy();
264 if(self.s.logger.isDebug()) {
265 self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event));
268 // Check if we are done
270 // Done with the reconnection attempt
271 if(self.s.connectingServers.length == 0) {
272 if(self.state == DESTROYED) return;
274 // If we have a primary and a disconnect handler, execute
275 // buffered operations
276 if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
277 self.s.disconnectHandler.execute();
278 } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
279 self.s.disconnectHandler.execute({ executePrimary:true });
280 } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
281 self.s.disconnectHandler.execute({ executeSecondary:true });
284 // Do we have a primary
285 if(self.s.replicaSetState.hasPrimary()) {
286 // Emit reconnect as new primary was discovered
287 self.emit('reconnect', self);
289 // Connect any missing servers
290 connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
292 if(self.s.logger.isDebug()) {
293 self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id));
297 self.runningAttempReconnect = false;
299 // Go back to normal topology monitoring
300 // Schedule a topology monitoring sweep
301 setTimeout(function() {
302 topologyMonitor(self);
303 }, self.s.haInterval);
306 if(self.listeners("close").length > 0) {
307 self.emit('close', self);
311 self.runningAttempReconnect = false;
312 // Attempt a new reconnect
313 attemptReconnect(self);
318 // Remove the server from our list
319 for(var i = 0; i < self.s.connectingServers.length; i++) {
320 if(self.s.connectingServers[i].equals(this)) {
321 self.s.connectingServers.splice(i, 1);
325 // Keep reference to server
329 if(self.s.logger.isDebug()) {
330 self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id));
333 // Connect and not authenticating
334 if(event == 'connect' && !self.authenticating) {
335 if(self.state == DESTROYED) {
336 return _self.destroy();
339 // Update the replicaset state
340 if(self.s.replicaSetState.update(_self)) {
341 // Primary lastIsMaster store it
342 if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
343 self.ismaster = _self.lastIsMaster();
346 // Remove the handlers
347 for(i = 0; i < handlers.length; i++) {
348 _self.removeAllListeners(handlers[i]);
351 // Add stable state handlers
352 _self.on('error', handleEvent(self, 'error'));
353 _self.on('close', handleEvent(self, 'close'));
354 _self.on('timeout', handleEvent(self, 'timeout'));
355 _self.on('parseError', handleEvent(self, 'parseError'));
359 } else if(event == 'connect' && self.authenticating) {
367 // Index used to interleaf the server connects, avoiding
368 // runtime issues on io constrained vm's
369 var timeoutInterval = 0;
371 function connect(server, timeoutInterval) {
372 setTimeout(function() {
373 server.once('connect', _handleEvent(self, 'connect'));
374 server.once('close', _handleEvent(self, 'close'));
375 server.once('timeout', _handleEvent(self, 'timeout'));
376 server.once('error', _handleEvent(self, 'error'));
377 server.once('parseError', _handleEvent(self, 'parseError'));
379 // SDAM Monitoring events
380 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
381 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
382 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
384 server.connect(self.s.connectOptions);
388 // Connect all servers
389 while(servers.length > 0) {
390 connect(servers.shift(), timeoutInterval++);
392 }, self.s.minHeartbeatFrequencyMS);
395 function connectNewServers(self, servers, callback) {
397 var count = servers.length;
400 var _handleEvent = function(self, event) {
406 if(self.state == DESTROYED) {
407 return this.destroy();
410 if(event == 'connect' && !self.authenticating) {
412 if(self.state == DESTROYED) {
413 return _self.destroy();
416 var result = self.s.replicaSetState.update(_self);
417 // Update the state with the new server
419 // Primary lastIsMaster store it
420 if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
421 self.ismaster = _self.lastIsMaster();
424 // Remove the handlers
425 for(var i = 0; i < handlers.length; i++) {
426 _self.removeAllListeners(handlers[i]);
429 // Add stable state handlers
430 _self.on('error', handleEvent(self, 'error'));
431 _self.on('close', handleEvent(self, 'close'));
432 _self.on('timeout', handleEvent(self, 'timeout'));
433 _self.on('parseError', handleEvent(self, 'parseError'));
437 } else if(event == 'connect' && self.authenticating) {
441 // Are we done finish up callback
442 if(count == 0) { callback(); }
447 if(count == 0) return callback();
450 function execute(_server, i) {
451 setTimeout(function() {
453 if(self.state == DESTROYED) {
457 // Create a new server instance
458 var server = new Server(assign({}, self.s.options, {
459 host: _server.split(':')[0],
460 port: parseInt(_server.split(':')[1], 10)
462 authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
464 clientInfo: clone(self.s.clientInfo)
468 server.once('connect', _handleEvent(self, 'connect'));
469 server.once('close', _handleEvent(self, 'close'));
470 server.once('timeout', _handleEvent(self, 'timeout'));
471 server.once('error', _handleEvent(self, 'error'));
472 server.once('parseError', _handleEvent(self, 'parseError'));
474 // SDAM Monitoring events
475 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
476 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
477 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
478 server.connect(self.s.connectOptions);
482 // Create new instances
483 for(var i = 0; i < servers.length; i++) {
484 execute(servers[i], i);
488 function topologyMonitor(self, options) {
489 options = options || {};
491 // Set momitoring timeout
492 self.haTimeoutId = setTimeout(function() {
493 if(self.state == DESTROYED) return;
495 // Is this a on connect topology discovery
496 // Schedule a proper topology monitoring to happen
497 // To ensure any discovered servers do not timeout
498 // while waiting for the initial discovery to happen.
499 if(options.haInterval) {
500 topologyMonitor(self);
503 // If we have a primary and a disconnect handler, execute
504 // buffered operations
505 if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
506 self.s.disconnectHandler.execute();
507 } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
508 self.s.disconnectHandler.execute({ executePrimary:true });
509 } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
510 self.s.disconnectHandler.execute({ executeSecondary:true });
513 // Get the connectingServers
514 var connectingServers = self.s.replicaSetState.allServers();
516 if(self.s.logger.isDebug()) {
517 self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]'
519 , connectingServers.map(function(x) {
524 var count = connectingServers.length;
526 // If we have no servers connected
527 if(count == 0 && !options.haInterval) {
528 if(self.listeners("close").length > 0) {
529 self.emit('close', self);
532 return attemptReconnect(self);
535 // If the count is zero schedule a new fast
536 function pingServer(_self, _server, cb) {
537 // Measure running time
538 var start = new Date().getTime();
540 // Emit the server heartbeat start
541 emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
543 // Set the socketTimeout for a monitoring message to a low number
544 // Ensuring ismaster calls are timed out quickly
545 _server.command('admin.$cmd', {
549 socketTimeout: self.s.options.connectionTimeout || 2000,
550 }, function(err, r) {
551 if(self.state == DESTROYED) {
557 var latencyMS = new Date().getTime() - start;
559 // Set the last updatedTime
560 var hrTime = process.hrtime();
561 // Calculate the last update time
562 _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000);
564 // We had an error, remove it from the state
566 // Emit the server heartbeat failure
567 emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
568 // Remove server from the state
569 _self.s.replicaSetState.remove(_server);
571 // Update the server ismaster
572 _server.ismaster = r.result;
574 // Check if we have a lastWriteDate convert it to MS
575 // and store on the server instance for later use
576 if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) {
577 _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime();
580 // Do we have a brand new server
581 if(_server.lastIsMasterMS == -1) {
582 _server.lastIsMasterMS = latencyMS;
583 } else if(_server.lastIsMasterMS) {
584 // After the first measurement, average RTT MUST be computed using an
585 // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
586 // If the prior average is denoted old_rtt, then the new average (new_rtt) is
587 // computed from a new RTT measurement (x) using the following formula:
589 // new_rtt = alpha * x + (1 - alpha) * old_rtt
590 _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS;
593 if(_self.s.replicaSetState.update(_server)) {
594 // Primary lastIsMaster store it
595 if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) {
596 self.ismaster = _server.lastIsMaster();
600 // Server heart beat event
601 emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
604 // Calculate the stalness for this server
605 self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval);
612 // Connect any missing servers
613 function connectMissingServers() {
614 if(self.state == DESTROYED) return;
616 // Attempt to connect to any unknown servers
617 connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
618 if(self.state == DESTROYED) return;
620 // Check if we have an options.haInterval (meaning it was triggered from connect)
621 if(options.haInterval) {
622 // Do we have a primary and secondary
623 if(self.state == CONNECTING
624 && self.s.replicaSetState.hasPrimaryAndSecondary()) {
625 // Transition to connected
626 stateTransition(self, CONNECTED);
627 // Update initial state
628 self.initialConnectState.connect = true;
629 self.initialConnectState.fullsetup = true;
630 self.initialConnectState.all = true;
631 // Emit fullsetup and all events
632 process.nextTick(function() {
633 self.emit('connect', self);
634 self.emit('fullsetup', self);
635 self.emit('all', self);
637 } else if(self.state == CONNECTING
638 && self.s.replicaSetState.hasPrimary()) {
639 // Transition to connected
640 stateTransition(self, CONNECTED);
641 // Update initial state
642 self.initialConnectState.connect = true;
643 // Emit connected sign
644 process.nextTick(function() {
645 self.emit('connect', self);
647 } else if(self.state == CONNECTING
648 && self.s.replicaSetState.hasSecondary()
649 && self.s.options.secondaryOnlyConnectionAllowed) {
650 // Transition to connected
651 stateTransition(self, CONNECTED);
652 // Update initial state
653 self.initialConnectState.connect = true;
654 // Emit connected sign
655 process.nextTick(function() {
656 self.emit('connect', self);
658 } else if(self.state == CONNECTING) {
659 self.emit('error', new MongoError('no primary found in replicaset'));
660 // Destroy the topology
661 return self.destroy();
662 } else if(self.state == CONNECTED
663 && self.s.replicaSetState.hasPrimaryAndSecondary()
664 && !self.initialConnectState.fullsetup) {
665 self.initialConnectState.fullsetup = true;
666 // Emit fullsetup and all events
667 process.nextTick(function() {
668 self.emit('fullsetup', self);
669 self.emit('all', self);
674 if(!options.haInterval) topologyMonitor(self);
678 // No connectingServers but unknown servers
679 if(connectingServers.length == 0
680 && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) {
681 return connectMissingServers();
682 } else if(connectingServers.length == 0 && options.haInterval) {
684 return self.emit('error', new MongoError('no valid replicaset members found'));
688 for(var i = 0; i < connectingServers.length; i++) {
689 pingServer(self, connectingServers[i], function() {
693 connectMissingServers();
697 }, options.haInterval || self.s.haInterval)
700 function handleEvent(self, event) {
702 if(self.state == DESTROYED) return;
704 if(self.s.logger.isDebug()) {
705 self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id));
708 self.s.replicaSetState.remove(this);
712 function handleInitialConnectEvent(self, event) {
715 if(self.s.logger.isDebug()) {
716 self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id));
719 // Destroy the instance
720 if(self.state == DESTROYED) {
721 return this.destroy();
724 // Check the type of server
725 if(event == 'connect') {
727 var result = self.s.replicaSetState.update(this);
729 // Primary lastIsMaster store it
730 if(this.lastIsMaster() && this.lastIsMaster().ismaster) {
731 self.ismaster = this.lastIsMaster();
735 if(self.s.logger.isDebug()) {
736 self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set)));
739 // Remove the handlers
740 for(var i = 0; i < handlers.length; i++) {
741 this.removeAllListeners(handlers[i]);
744 // Add stable state handlers
745 this.on('error', handleEvent(self, 'error'));
746 this.on('close', handleEvent(self, 'close'));
747 this.on('timeout', handleEvent(self, 'timeout'));
748 this.on('parseError', handleEvent(self, 'parseError'));
749 } else if(result instanceof MongoError) {
752 return self.emit('error', result);
757 // Emit failure to connect
758 self.emit('failed', this);
759 // Remove from the state
760 self.s.replicaSetState.remove(this);
763 // Remove from the list from connectingServers
764 for(i = 0; i < self.s.connectingServers.length; i++) {
765 if(self.s.connectingServers[i].equals(this)) {
766 self.s.connectingServers.splice(i, 1);
770 // Trigger topologyMonitor
771 if(self.s.connectingServers.length == 0) {
772 topologyMonitor(self, {haInterval: 1});
777 function connectServers(self, servers) {
778 // Update connectingServers
779 self.s.connectingServers = self.s.connectingServers.concat(servers);
781 // Index used to interleaf the server connects, avoiding
782 // runtime issues on io constrained vm's
783 var timeoutInterval = 0;
785 function connect(server, timeoutInterval) {
786 setTimeout(function() {
787 // Add the server to the state
788 if(self.s.replicaSetState.update(server)) {
789 // Primary lastIsMaster store it
790 if(server.lastIsMaster() && server.lastIsMaster().ismaster) {
791 self.ismaster = server.lastIsMaster();
795 // Add event handlers
796 server.once('close', handleInitialConnectEvent(self, 'close'));
797 server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
798 server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
799 server.once('error', handleInitialConnectEvent(self, 'error'));
800 server.once('connect', handleInitialConnectEvent(self, 'connect'));
801 // SDAM Monitoring events
802 server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
803 server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
804 server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
806 server.connect(self.s.connectOptions);
810 // Start all the servers
811 while(servers.length > 0) {
812 connect(servers.shift(), timeoutInterval++);
817 * Emit event if it exists
820 function emitSDAMEvent(self, event, description) {
821 if(self.listeners(event).length > 0) {
822 self.emit(event, description);
827 * Initiate server connect
829 * @param {array} [options.auth=null] Array of auth options to apply on connect
831 ReplSet.prototype.connect = function(options) {
833 // Add any connect level options to the internal state
834 this.s.connectOptions = options || {};
835 // Set connecting state
836 stateTransition(this, CONNECTING);
837 // Create server instances
838 var servers = this.s.seedlist.map(function(x) {
839 return new Server(assign({}, self.s.options, x, {
840 authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
842 clientInfo: clone(self.s.clientInfo)
846 // Error out as high availbility interval must be < than socketTimeout
847 if(this.s.options.socketTimeout > 0 && this.s.options.socketTimeout <= this.s.options.haInterval) {
848 return self.emit('error', new MongoError(f("haInterval [%s] MS must be set to less than socketTimeout [%s] MS"
849 , this.s.options.haInterval, this.s.options.socketTimeout)));
852 // Emit the topology opening event
853 emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
854 // Start all server connections
855 connectServers(self, servers);
859 * Destroy the server connection
860 * @param {boolean} [options.force=false] Force destroy the pool
863 ReplSet.prototype.destroy = function(options) {
864 options = options || {};
866 stateTransition(this, DESTROYED);
867 // Clear out any monitoring process
868 if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
869 // Destroy the replicaset
870 this.s.replicaSetState.destroy(options);
872 // Destroy all connecting servers
873 this.s.connectingServers.forEach(function(x) {
877 // Emit toplogy closing event
878 emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
882 * Unref all connections belong to this server
885 ReplSet.prototype.unref = function() {
887 stateTransition(this, DISCONNECTED);
889 this.s.replicaSetState.allServers().forEach(function(x) {
893 clearTimeout(this.haTimeoutId);
897 * Returns the last known ismaster document for this server
901 ReplSet.prototype.lastIsMaster = function() {
902 return this.s.replicaSetState.primary
903 ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster;
907 * All raw connections
909 * @return {Connection[]}
911 ReplSet.prototype.connections = function() {
912 var servers = this.s.replicaSetState.allServers();
913 var connections = [];
914 for(var i = 0; i < servers.length; i++) {
915 connections = connections.concat(servers[i].connections());
922 * Figure out if the server is connected
924 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
927 ReplSet.prototype.isConnected = function(options) {
928 options = options || {};
930 // If we are authenticating signal not connected
931 // To avoid interleaving of operations
932 if(this.authenticating) return false;
934 // If we specified a read preference check if we are connected to something
935 // than can satisfy this
936 if(options.readPreference
937 && options.readPreference.equals(ReadPreference.secondary)) {
938 return this.s.replicaSetState.hasSecondary();
941 if(options.readPreference
942 && options.readPreference.equals(ReadPreference.primary)) {
943 return this.s.replicaSetState.hasPrimary();
946 if(options.readPreference
947 && options.readPreference.equals(ReadPreference.primaryPreferred)) {
948 return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
951 if(options.readPreference
952 && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
953 return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
956 if(this.s.secondaryOnlyConnectionAllowed
957 && this.s.replicaSetState.hasSecondary()) {
961 return this.s.replicaSetState.hasPrimary();
965 * Figure out if the replicaset instance was destroyed by calling destroy
969 ReplSet.prototype.isDestroyed = function() {
970 return this.state == DESTROYED;
976 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
979 ReplSet.prototype.getServer = function(options) {
980 // Ensure we have no options
981 options = options || {};
983 // Pick the right server baspickServerd on readPreference
984 var server = this.s.replicaSetState.pickServer(options.readPreference);
985 if(this.s.debug) this.emit('pickedServer', options.readPreference, server);
990 * Get all connected servers
994 ReplSet.prototype.getServers = function() {
995 return this.s.replicaSetState.allServers();
999 // Execute write operation
1000 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
1001 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1002 // Ensure we have no options
1003 options = options || {};
1005 // No server returned we had an error
1006 if(self.s.replicaSetState.primary == null) {
1007 return callback(new MongoError("no primary server found"));
1010 // Execute the command
1011 self.s.replicaSetState.primary[op](ns, ops, options, callback);
1015 * Insert one or more documents
1017 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1018 * @param {array} ops An array of documents to insert
1019 * @param {boolean} [options.ordered=true] Execute in order or out of order
1020 * @param {object} [options.writeConcern={}] Write concern for the operation
1021 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1022 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1023 * @param {opResultCallback} callback A callback function
1025 ReplSet.prototype.insert = function(ns, ops, options, callback) {
1026 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1027 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1029 // Not connected but we have a disconnecthandler
1030 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1031 return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
1034 // Execute write operation
1035 executeWriteOperation(this, 'insert', ns, ops, options, callback);
1039 * Perform one or more update operations
1041 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1042 * @param {array} ops An array of updates
1043 * @param {boolean} [options.ordered=true] Execute in order or out of order
1044 * @param {object} [options.writeConcern={}] Write concern for the operation
1045 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1046 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1047 * @param {opResultCallback} callback A callback function
1049 ReplSet.prototype.update = function(ns, ops, options, callback) {
1050 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1051 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1053 // Not connected but we have a disconnecthandler
1054 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1055 return this.s.disconnectHandler.add('update', ns, ops, options, callback);
1058 // Execute write operation
1059 executeWriteOperation(this, 'update', ns, ops, options, callback);
1063 * Perform one or more remove operations
1065 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1066 * @param {array} ops An array of removes
1067 * @param {boolean} [options.ordered=true] Execute in order or out of order
1068 * @param {object} [options.writeConcern={}] Write concern for the operation
1069 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1070 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1071 * @param {opResultCallback} callback A callback function
1073 ReplSet.prototype.remove = function(ns, ops, options, callback) {
1074 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1075 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1077 // Not connected but we have a disconnecthandler
1078 if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1079 return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
1082 // Execute write operation
1083 executeWriteOperation(this, 'remove', ns, ops, options, callback);
1089 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1090 * @param {object} cmd The command hash
1091 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1092 * @param {Connection} [options.connection] Specify connection object to execute command against
1093 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1094 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1095 * @param {opResultCallback} callback A callback function
1097 ReplSet.prototype.command = function(ns, cmd, options, callback) {
1098 if(typeof options == 'function') callback = options, options = {}, options = options || {};
1099 if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1102 // Establish readPreference
1103 var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
1105 // If the readPreference is primary and we have no primary, store it
1106 if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1107 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1108 } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) {
1109 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1110 } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1111 return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1115 var server = this.s.replicaSetState.pickServer(readPreference);
1116 // We received an error, return it
1117 if(!(server instanceof Server)) return callback(server);
1119 if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
1121 // No server returned we had an error
1122 if(server == null) {
1123 return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference)));
1126 // Execute the command
1127 server.command(ns, cmd, options, callback);
1131 * Authenticate using a specified mechanism
1133 * @param {string} mechanism The Auth mechanism we are invoking
1134 * @param {string} db The db we are invoking the mechanism against
1135 * @param {...object} param Parameters for the specific mechanism
1136 * @param {authResultCallback} callback A callback function
1138 ReplSet.prototype.auth = function(mechanism, db) {
1139 var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
1141 var args = Array.prototype.slice.call(arguments, 2);
1142 var callback = args.pop();
1144 // If we don't have the mechanism fail
1145 if(this.authProviders[mechanism] == null && mechanism != 'default') {
1146 return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
1149 // Are we already authenticating, throw
1150 if(this.authenticating) {
1151 return callback(new MongoError('authentication or logout allready in process'));
1154 // Topology is not connected, save the call in the provided store to be
1155 // Executed at some point when the handler deems it's reconnected
1156 if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) {
1157 return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
1160 // Set to authenticating
1161 this.authenticating = true;
1165 // Get all the servers
1166 var servers = this.s.replicaSetState.allServers();
1167 // No servers return
1168 if(servers.length == 0) {
1169 this.authenticating = false;
1170 callback(null, true);
1174 function auth(server) {
1175 // Arguments without a callback
1176 var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
1178 var finalArguments = argsWithoutCallback.concat([function(err) {
1180 // Save all the errors
1181 if(err) errors.push({name: server.name, err: err});
1185 self.authenticating = false;
1187 // Return the auth error
1188 if(errors.length) return callback(MongoError.create({
1189 message: 'authentication fail', errors: errors
1192 // Successfully authenticated session
1193 callback(null, self);
1197 if(!server.lastIsMaster().arbiterOnly) {
1198 // Execute the auth only against non arbiter servers
1199 server.auth.apply(server, finalArguments);
1201 // If we are authenticating against an arbiter just ignore it
1202 finalArguments.pop()(null);
1207 var count = servers.length;
1208 // Authenticate against all servers
1209 while(servers.length > 0) {
1210 auth(servers.shift());
1215 * Logout from a database
1217 * @param {string} db The db we are logging out from
1218 * @param {authResultCallback} callback A callback function
1220 ReplSet.prototype.logout = function(dbName, callback) {
1222 // Are we authenticating or logging out, throw
1223 if(this.authenticating) {
1224 throw new MongoError('authentication or logout allready in process');
1227 // Ensure no new members are processed while logging out
1228 this.authenticating = true;
1230 // Remove from all auth providers (avoid any reaplication of the auth details)
1231 var providers = Object.keys(this.authProviders);
1232 for(var i = 0; i < providers.length; i++) {
1233 this.authProviders[providers[i]].logout(dbName);
1236 // Now logout all the servers
1237 var servers = this.s.replicaSetState.allServers();
1238 var count = servers.length;
1239 if(count == 0) return callback();
1242 function logoutServer(_server, cb) {
1243 _server.logout(dbName, function(err) {
1244 if(err) errors.push({name: _server.name, err: err});
1249 // Execute logout on all server instances
1250 for(i = 0; i < servers.length; i++) {
1251 logoutServer(servers[i], function() {
1255 // Do not block new operations
1256 self.authenticating = false;
1257 // If we have one or more errors
1258 if(errors.length) return callback(MongoError.create({
1259 message: f('logout failed against db %s', dbName), errors: errors
1270 * Perform one or more remove operations
1272 * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1273 * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
1274 * @param {object} [options.batchSize=0] Batchsize for the operation
1275 * @param {array} [options.documents=[]] Initial documents list for cursor
1276 * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1277 * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1278 * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1279 * @param {opResultCallback} callback A callback function
1281 ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
1282 cursorOptions = cursorOptions || {};
1283 var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
1284 return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
1288 * A replset connect event, used to verify that the connection is up and running
1290 * @event ReplSet#connect
1295 * A replset reconnect event, used to verify that the topology reconnected
1297 * @event ReplSet#reconnect
1302 * A replset fullsetup event, used to signal that all topology members have been contacted.
1304 * @event ReplSet#fullsetup
1309 * A replset all event, used to signal that all topology members have been contacted.
1311 * @event ReplSet#all
1316 * A replset failed event, used to signal that initial replset connection failed.
1318 * @event ReplSet#failed
1323 * A server member left the replicaset
1325 * @event ReplSet#left
1327 * @param {string} type The type of member that left (primary|secondary|arbiter)
1328 * @param {Server} server The server object that left
1332 * A server member joined the replicaset
1334 * @event ReplSet#joined
1336 * @param {string} type The type of member that joined (primary|secondary|arbiter)
1337 * @param {Server} server The server object that joined
1341 * A server opening SDAM monitoring event
1343 * @event ReplSet#serverOpening
1348 * A server closed SDAM monitoring event
1350 * @event ReplSet#serverClosed
1355 * A server description SDAM change monitoring event
1357 * @event ReplSet#serverDescriptionChanged
1362 * A topology open SDAM event
1364 * @event ReplSet#topologyOpening
1369 * A topology closed SDAM event
1371 * @event ReplSet#topologyClosed
1376 * A topology structure SDAM change event
1378 * @event ReplSet#topologyDescriptionChanged
1383 * A topology serverHeartbeatStarted SDAM event
1385 * @event ReplSet#serverHeartbeatStarted
1390 * A topology serverHeartbeatFailed SDAM event
1392 * @event ReplSet#serverHeartbeatFailed
1397 * A topology serverHeartbeatSucceeded SDAM change event
1399 * @event ReplSet#serverHeartbeatSucceeded
1403 module.exports = ReplSet;