0082da8ac07e9abf4f1574ade08fa38c2de0e673
[aai/esr-gui.git] /
1 "use strict"
2
3 var inherits = require('util').inherits,
4   f = require('util').format,
5   EventEmitter = require('events').EventEmitter,
6   ReadPreference = require('./read_preference'),
7   BasicCursor = require('../cursor'),
8   retrieveBSON = require('../connection/utils').retrieveBSON,
9   Logger = require('../connection/logger'),
10   MongoError = require('../error'),
11   Server = require('./server'),
12   ReplSetState = require('./replset_state'),
13   assign = require('./shared').assign,
14   clone = require('./shared').clone,
15   createClientInfo = require('./shared').createClientInfo;
16
17 var MongoCR = require('../auth/mongocr')
18   , X509 = require('../auth/x509')
19   , Plain = require('../auth/plain')
20   , GSSAPI = require('../auth/gssapi')
21   , SSPI = require('../auth/sspi')
22   , ScramSHA1 = require('../auth/scram');
23
24 var BSON = retrieveBSON();
25
26 //
27 // States
28 var DISCONNECTED = 'disconnected';
29 var CONNECTING = 'connecting';
30 var CONNECTED = 'connected';
31 var DESTROYED = 'destroyed';
32
33 function stateTransition(self, newState) {
34   var legalTransitions = {
35     'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
36     'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
37     'connected': [CONNECTED, DISCONNECTED, DESTROYED],
38     'destroyed': [DESTROYED]
39   }
40
41   // Get current state
42   var legalStates = legalTransitions[self.state];
43   if(legalStates && legalStates.indexOf(newState) != -1) {
44     self.state = newState;
45   } else {
46     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
47       , self.id, self.state, newState, legalStates));
48   }
49 }
50
51 //
52 // ReplSet instance id
53 var id = 1;
54 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
55
56 /**
57  * Creates a new Replset instance
58  * @class
59  * @param {array} seedlist A list of seeds for the replicaset
60  * @param {boolean} options.setName The Replicaset set name
61  * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
62  * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
63  * @param {boolean} [options.emitError=false] Server will emit errors events
64  * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
65  * @param {number} [options.size=5] Server connection pool size
66  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
67  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
68  * @param {boolean} [options.noDelay=true] TCP Connection no delay
69  * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
70  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
71  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
72  * @param {boolean} [options.ssl=false] Use SSL for connection
73  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
74  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
75  * @param {Buffer} [options.cert] SSL Certificate binary buffer
76  * @param {Buffer} [options.key] SSL Key file binary buffer
77  * @param {string} [options.passphrase] SSL Certificate pass phrase
78  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
79  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
80  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
81  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
82  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
83  * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
84  * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
85  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
86  * @return {ReplSet} A cursor instance
87  * @fires ReplSet#connect
88  * @fires ReplSet#ha
89  * @fires ReplSet#joined
90  * @fires ReplSet#left
91  * @fires ReplSet#failed
92  * @fires ReplSet#fullsetup
93  * @fires ReplSet#all
94  * @fires ReplSet#error
95  * @fires ReplSet#serverHeartbeatStarted
96  * @fires ReplSet#serverHeartbeatSucceeded
97  * @fires ReplSet#serverHeartbeatFailed
98  * @fires ReplSet#topologyOpening
99  * @fires ReplSet#topologyClosed
100  * @fires ReplSet#topologyDescriptionChanged
101  * @property {string} type the topology type.
102  * @property {string} parserType the parser type used (c++ or js).
103  */
104 var ReplSet = function(seedlist, options) {
105   var self = this;
106   options = options || {};
107
108   // Validate seedlist
109   if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
110   // Validate list
111   if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
112   // Validate entries
113   seedlist.forEach(function(e) {
114     if(typeof e.host != 'string' || typeof e.port != 'number')
115       throw new MongoError("seedlist entry must contain a host and port");
116   });
117
118   // Add event listener
119   EventEmitter.call(this);
120
121   // Get replSet Id
122   this.id = id++;
123
124   // Get the localThresholdMS
125   var localThresholdMS = options.localThresholdMS || 15;
126   // Backward compatibility
127   if(options.acceptableLatency) localThresholdMS = options.acceptableLatency;
128
129   // Create a logger
130   var logger = Logger('ReplSet', options);
131
132   // Internal state
133   this.s = {
134     options: assign({}, options),
135     // BSON instance
136     bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
137       BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
138       BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
139     // Factory overrides
140     Cursor: options.cursorFactory || BasicCursor,
141     // Logger instance
142     logger: logger,
143     // Seedlist
144     seedlist: seedlist,
145     // Replicaset state
146     replicaSetState: new ReplSetState({
147       id: this.id, setName: options.setName,
148       acceptableLatency: localThresholdMS,
149       heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
150       logger: logger
151     }),
152     // Current servers we are connecting to
153     connectingServers: [],
154     // Ha interval
155     haInterval: options.haInterval ? options.haInterval : 10000,
156     // Minimum heartbeat frequency used if we detect a server close
157     minHeartbeatFrequencyMS: 500,
158     // Disconnect handler
159     disconnectHandler: options.disconnectHandler,
160     // Server selection index
161     index: 0,
162     // Connect function options passed in
163     connectOptions: {},
164     // Are we running in debug mode
165     debug: typeof options.debug == 'boolean' ? options.debug : false,
166     // Client info
167     clientInfo: createClientInfo(options)
168   }
169
170   // Add handler for topology change
171   this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); });
172
173   // Log info warning if the socketTimeout < haInterval as it will cause
174   // a lot of recycled connections to happen.
175   if(this.s.logger.isWarn()
176     && this.s.options.socketTimeout != 0
177     && this.s.options.socketTimeout < this.s.haInterval) {
178       this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
179         , this.s.options.socketTimeout, this.s.haInterval));
180   }
181
182   // All the authProviders
183   this.authProviders = options.authProviders || {
184       'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
185     , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
186     , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
187   }
188
189   // Add forwarding of events from state handler
190   var types = ['joined', 'left'];
191   types.forEach(function(x) {
192     self.s.replicaSetState.on(x, function(t, s) {
193       if(self.state === CONNECTED && x === 'joined' && t == 'primary') {
194         self.emit('reconnect', self);
195       }
196
197       self.emit(x, t, s);
198     });
199   });
200
201   // Connect stat
202   this.initialConnectState = {
203     connect: false, fullsetup: false, all: false
204   }
205
206   // Disconnected state
207   this.state = DISCONNECTED;
208   this.haTimeoutId = null;
209   // Are we authenticating
210   this.authenticating = false;
211   // Last ismaster
212   this.ismaster = null;
213 }
214
215 inherits(ReplSet, EventEmitter);
216
217 Object.defineProperty(ReplSet.prototype, 'type', {
218   enumerable:true, get: function() { return 'replset'; }
219 });
220
221 Object.defineProperty(ReplSet.prototype, 'parserType', {
222   enumerable:true, get: function() {
223     return BSON.native ? "c++" : "js";
224   }
225 });
226
227 function attemptReconnect(self) {
228   if(self.runningAttempReconnect) return;
229   // Set as running
230   self.runningAttempReconnect = true;
231   // Wait before execute
232   self.haTimeoutId = setTimeout(function() {
233     if(self.state == DESTROYED) return;
234
235     // Debug log
236     if(self.s.logger.isDebug()) {
237       self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id));
238     }
239
240     // Get all known hosts
241     var keys = Object.keys(self.s.replicaSetState.set);
242     var servers = keys.map(function(x) {
243       return new Server(assign({}, self.s.options, {
244         host: x.split(':')[0], port: parseInt(x.split(':')[1], 10)
245       }, {
246         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
247       }, {
248         clientInfo: clone(self.s.clientInfo)
249       }));
250     });
251
252     // Create the list of servers
253     self.s.connectingServers = servers.slice(0);
254
255     // Handle all events coming from servers
256     function _handleEvent(self, event) {
257       return function() {
258         // Destroy the instance
259         if(self.state == DESTROYED) {
260           return this.destroy();
261         }
262
263         // Debug log
264         if(self.s.logger.isDebug()) {
265           self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event));
266         }
267
268         // Check if we are done
269         function done() {
270           // Done with the reconnection attempt
271           if(self.s.connectingServers.length == 0) {
272             if(self.state == DESTROYED) return;
273
274             // If we have a primary and a disconnect handler, execute
275             // buffered operations
276             if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
277               self.s.disconnectHandler.execute();
278             } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
279               self.s.disconnectHandler.execute({ executePrimary:true });
280             } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
281               self.s.disconnectHandler.execute({ executeSecondary:true });
282             }
283
284             // Do we have a primary
285             if(self.s.replicaSetState.hasPrimary()) {
286               // Emit reconnect as new primary was discovered
287               self.emit('reconnect', self);
288
289               // Connect any missing servers
290               connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
291                 // Debug log
292                 if(self.s.logger.isDebug()) {
293                   self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id));
294                 }
295
296                 // Reset the running
297                 self.runningAttempReconnect = false;
298
299                 // Go back to normal topology monitoring
300                 // Schedule a topology monitoring sweep
301                 setTimeout(function() {
302                   topologyMonitor(self);
303                 }, self.s.haInterval);
304               });
305             } else {
306               if(self.listeners("close").length > 0) {
307                 self.emit('close', self);
308               }
309
310               // Reset the running
311               self.runningAttempReconnect = false;
312               // Attempt a new reconnect
313               attemptReconnect(self);
314             }
315           }
316         }
317
318         // Remove the server from our list
319         for(var i = 0; i < self.s.connectingServers.length; i++) {
320           if(self.s.connectingServers[i].equals(this)) {
321             self.s.connectingServers.splice(i, 1);
322           }
323         }
324
325         // Keep reference to server
326         var _self = this;
327
328         // Debug log
329         if(self.s.logger.isDebug()) {
330           self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id));
331         }
332
333         // Connect and not authenticating
334         if(event == 'connect' && !self.authenticating) {
335           if(self.state == DESTROYED) {
336             return _self.destroy();
337           }
338
339           // Update the replicaset state
340           if(self.s.replicaSetState.update(_self)) {
341             // Primary lastIsMaster store it
342             if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
343               self.ismaster = _self.lastIsMaster();
344             }
345
346             // Remove the handlers
347             for(i = 0; i < handlers.length; i++) {
348               _self.removeAllListeners(handlers[i]);
349             }
350
351             // Add stable state handlers
352             _self.on('error', handleEvent(self, 'error'));
353             _self.on('close', handleEvent(self, 'close'));
354             _self.on('timeout', handleEvent(self, 'timeout'));
355             _self.on('parseError', handleEvent(self, 'parseError'));
356           } else {
357             _self.destroy();
358           }
359         } else if(event == 'connect' && self.authenticating) {
360           this.destroy();
361         }
362
363         done();
364       }
365     }
366
367     // Index used to interleaf the server connects, avoiding
368     // runtime issues on io constrained vm's
369     var timeoutInterval = 0;
370
371     function connect(server, timeoutInterval) {
372       setTimeout(function() {
373         server.once('connect', _handleEvent(self, 'connect'));
374         server.once('close', _handleEvent(self, 'close'));
375         server.once('timeout', _handleEvent(self, 'timeout'));
376         server.once('error', _handleEvent(self, 'error'));
377         server.once('parseError', _handleEvent(self, 'parseError'));
378
379         // SDAM Monitoring events
380         server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
381         server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
382         server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
383
384         server.connect(self.s.connectOptions);
385       }, timeoutInterval);
386     }
387
388     // Connect all servers
389     while(servers.length > 0) {
390       connect(servers.shift(), timeoutInterval++);
391     }
392   }, self.s.minHeartbeatFrequencyMS);
393 }
394
395 function connectNewServers(self, servers, callback) {
396   // Count lefts
397   var count = servers.length;
398
399   // Handle events
400   var _handleEvent = function(self, event) {
401     return function() {
402       var _self = this;
403       count = count - 1;
404
405       // Destroyed
406       if(self.state == DESTROYED) {
407         return this.destroy();
408       }
409
410       if(event == 'connect' && !self.authenticating) {
411         // Destroyed
412         if(self.state == DESTROYED) {
413           return _self.destroy();
414         }
415
416         var result = self.s.replicaSetState.update(_self);
417         // Update the state with the new server
418         if(result) {
419           // Primary lastIsMaster store it
420           if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
421             self.ismaster = _self.lastIsMaster();
422           }
423
424           // Remove the handlers
425           for(var i = 0; i < handlers.length; i++) {
426             _self.removeAllListeners(handlers[i]);
427           }
428
429           // Add stable state handlers
430           _self.on('error', handleEvent(self, 'error'));
431           _self.on('close', handleEvent(self, 'close'));
432           _self.on('timeout', handleEvent(self, 'timeout'));
433           _self.on('parseError', handleEvent(self, 'parseError'));
434         } else {
435           _self.destroy();
436         }
437       } else if(event == 'connect' && self.authenticating) {
438         this.destroy();
439       }
440
441       // Are we done finish up callback
442       if(count == 0) { callback(); }
443     }
444   }
445
446   // No new servers
447   if(count == 0) return callback();
448
449   // Execute method
450   function execute(_server, i) {
451     setTimeout(function() {
452       // Destroyed
453       if(self.state == DESTROYED) {
454         return;
455       }
456
457       // Create a new server instance
458       var server = new Server(assign({}, self.s.options, {
459         host: _server.split(':')[0],
460         port: parseInt(_server.split(':')[1], 10)
461       }, {
462         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
463       }, {
464         clientInfo: clone(self.s.clientInfo)
465       }));
466
467       // Add temp handlers
468       server.once('connect', _handleEvent(self, 'connect'));
469       server.once('close', _handleEvent(self, 'close'));
470       server.once('timeout', _handleEvent(self, 'timeout'));
471       server.once('error', _handleEvent(self, 'error'));
472       server.once('parseError', _handleEvent(self, 'parseError'));
473
474       // SDAM Monitoring events
475       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
476       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
477       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
478       server.connect(self.s.connectOptions);
479     }, i);
480   }
481
482   // Create new instances
483   for(var i = 0; i < servers.length; i++) {
484     execute(servers[i], i);
485   }
486 }
487
488 function topologyMonitor(self, options) {
489   options = options || {};
490
491   // Set momitoring timeout
492   self.haTimeoutId = setTimeout(function() {
493     if(self.state == DESTROYED) return;
494
495     // Is this a on connect topology discovery
496     // Schedule a proper topology monitoring to happen
497     // To ensure any discovered servers do not timeout
498     // while waiting for the initial discovery to happen.
499     if(options.haInterval) {
500       topologyMonitor(self);
501     }
502
503     // If we have a primary and a disconnect handler, execute
504     // buffered operations
505     if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
506       self.s.disconnectHandler.execute();
507     } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
508       self.s.disconnectHandler.execute({ executePrimary:true });
509     } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
510       self.s.disconnectHandler.execute({ executeSecondary:true });
511     }
512
513     // Get the connectingServers
514     var connectingServers = self.s.replicaSetState.allServers();
515     // Debug log
516     if(self.s.logger.isDebug()) {
517       self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]'
518         , self.id
519         , connectingServers.map(function(x) {
520           return x.name;
521         })));
522     }
523     // Get the count
524     var count = connectingServers.length;
525
526     // If we have no servers connected
527     if(count == 0 && !options.haInterval) {
528       if(self.listeners("close").length > 0) {
529         self.emit('close', self);
530       }
531
532       return attemptReconnect(self);
533     }
534
535     // If the count is zero schedule a new fast
536     function pingServer(_self, _server, cb) {
537       // Measure running time
538       var start = new Date().getTime();
539
540       // Emit the server heartbeat start
541       emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
542       // Execute ismaster
543       // Set the socketTimeout for a monitoring message to a low number
544       // Ensuring ismaster calls are timed out quickly
545       _server.command('admin.$cmd', {
546         ismaster:true
547       }, {
548         monitoring: true,
549         socketTimeout: self.s.options.connectionTimeout || 2000,
550       }, function(err, r) {
551         if(self.state == DESTROYED) {
552           _server.destroy();
553           return cb(err, r);
554         }
555
556         // Calculate latency
557         var latencyMS = new Date().getTime() - start;
558
559         // Set the last updatedTime
560         var hrTime = process.hrtime();
561         // Calculate the last update time
562         _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000);
563
564         // We had an error, remove it from the state
565         if(err) {
566           // Emit the server heartbeat failure
567           emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
568           // Remove server from the state
569           _self.s.replicaSetState.remove(_server);
570         } else {
571           // Update the server ismaster
572           _server.ismaster = r.result;
573
574           // Check if we have a lastWriteDate convert it to MS
575           // and store on the server instance for later use
576           if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) {
577             _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime();
578           }
579
580           // Do we have a brand new server
581           if(_server.lastIsMasterMS == -1) {
582             _server.lastIsMasterMS = latencyMS;
583           } else if(_server.lastIsMasterMS) {
584             // After the first measurement, average RTT MUST be computed using an
585             // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
586             // If the prior average is denoted old_rtt, then the new average (new_rtt) is
587             // computed from a new RTT measurement (x) using the following formula:
588             // alpha = 0.2
589             // new_rtt = alpha * x + (1 - alpha) * old_rtt
590             _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS;
591           }
592
593           if(_self.s.replicaSetState.update(_server)) {
594             // Primary lastIsMaster store it
595             if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) {
596               self.ismaster = _server.lastIsMaster();
597             }
598           }
599
600           // Server heart beat event
601           emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
602         }
603
604         // Calculate the stalness for this server
605         self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval);
606
607         // Callback
608         cb(err, r);
609       });
610     }
611
612     // Connect any missing servers
613     function connectMissingServers() {
614       if(self.state == DESTROYED) return;
615
616       // Attempt to connect to any unknown servers
617       connectNewServers(self, self.s.replicaSetState.unknownServers, function() {
618         if(self.state == DESTROYED) return;
619
620         // Check if we have an options.haInterval (meaning it was triggered from connect)
621         if(options.haInterval) {
622           // Do we have a primary and secondary
623           if(self.state == CONNECTING
624             && self.s.replicaSetState.hasPrimaryAndSecondary()) {
625             // Transition to connected
626             stateTransition(self, CONNECTED);
627             // Update initial state
628             self.initialConnectState.connect = true;
629             self.initialConnectState.fullsetup = true;
630             self.initialConnectState.all = true;
631             // Emit fullsetup and all events
632             process.nextTick(function() {
633               self.emit('connect', self);
634               self.emit('fullsetup', self);
635               self.emit('all', self);
636             });
637           } else if(self.state == CONNECTING
638             && self.s.replicaSetState.hasPrimary()) {
639               // Transition to connected
640               stateTransition(self, CONNECTED);
641               // Update initial state
642               self.initialConnectState.connect = true;
643               // Emit connected sign
644               process.nextTick(function() {
645                 self.emit('connect', self);
646               });
647           } else if(self.state == CONNECTING
648             && self.s.replicaSetState.hasSecondary()
649             && self.s.options.secondaryOnlyConnectionAllowed) {
650               // Transition to connected
651               stateTransition(self, CONNECTED);
652               // Update initial state
653               self.initialConnectState.connect = true;
654               // Emit connected sign
655               process.nextTick(function() {
656                 self.emit('connect', self);
657               });
658           } else if(self.state == CONNECTING) {
659             self.emit('error', new MongoError('no primary found in replicaset'));
660             // Destroy the topology
661             return self.destroy();
662           } else if(self.state == CONNECTED
663             && self.s.replicaSetState.hasPrimaryAndSecondary()
664             && !self.initialConnectState.fullsetup) {
665               self.initialConnectState.fullsetup = true;
666             // Emit fullsetup and all events
667             process.nextTick(function() {
668               self.emit('fullsetup', self);
669               self.emit('all', self);
670             });
671           }
672         }
673
674         if(!options.haInterval) topologyMonitor(self);
675       });
676     }
677
678     // No connectingServers but unknown servers
679     if(connectingServers.length == 0
680       && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) {
681         return connectMissingServers();
682     } else if(connectingServers.length == 0 && options.haInterval) {
683       self.destroy();
684       return self.emit('error', new MongoError('no valid replicaset members found'));
685     }
686
687     // Ping all servers
688     for(var i = 0; i < connectingServers.length; i++) {
689       pingServer(self, connectingServers[i], function() {
690         count = count - 1;
691
692         if(count == 0) {
693           connectMissingServers();
694         }
695       });
696     }
697   }, options.haInterval || self.s.haInterval)
698 }
699
700 function handleEvent(self, event) {
701   return function() {
702     if(self.state == DESTROYED) return;
703     // Debug log
704     if(self.s.logger.isDebug()) {
705       self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id));
706     }
707
708     self.s.replicaSetState.remove(this);
709   }
710 }
711
712 function handleInitialConnectEvent(self, event) {
713   return function() {
714     // Debug log
715     if(self.s.logger.isDebug()) {
716       self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id));
717     }
718
719     // Destroy the instance
720     if(self.state == DESTROYED) {
721       return this.destroy();
722     }
723
724     // Check the type of server
725     if(event == 'connect') {
726       // Update the state
727       var result = self.s.replicaSetState.update(this);
728       if(result == true) {
729         // Primary lastIsMaster store it
730         if(this.lastIsMaster() && this.lastIsMaster().ismaster) {
731           self.ismaster = this.lastIsMaster();
732         }
733
734         // Debug log
735         if(self.s.logger.isDebug()) {
736           self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set)));
737         }
738
739         // Remove the handlers
740         for(var i = 0; i < handlers.length; i++) {
741           this.removeAllListeners(handlers[i]);
742         }
743
744         // Add stable state handlers
745         this.on('error', handleEvent(self, 'error'));
746         this.on('close', handleEvent(self, 'close'));
747         this.on('timeout', handleEvent(self, 'timeout'));
748         this.on('parseError', handleEvent(self, 'parseError'));
749       } else if(result instanceof MongoError) {
750         this.destroy();
751         self.destroy();
752         return self.emit('error', result);
753       } else {
754         this.destroy();
755       }
756     } else {
757       // Emit failure to connect
758       self.emit('failed', this);
759       // Remove from the state
760       self.s.replicaSetState.remove(this);
761     }
762
763     // Remove from the list from connectingServers
764     for(i = 0; i < self.s.connectingServers.length; i++) {
765       if(self.s.connectingServers[i].equals(this)) {
766         self.s.connectingServers.splice(i, 1);
767       }
768     }
769
770     // Trigger topologyMonitor
771     if(self.s.connectingServers.length == 0) {
772       topologyMonitor(self, {haInterval: 1});
773     }
774   };
775 }
776
777 function connectServers(self, servers) {
778   // Update connectingServers
779   self.s.connectingServers = self.s.connectingServers.concat(servers);
780
781   // Index used to interleaf the server connects, avoiding
782   // runtime issues on io constrained vm's
783   var timeoutInterval = 0;
784
785   function connect(server, timeoutInterval) {
786     setTimeout(function() {
787       // Add the server to the state
788       if(self.s.replicaSetState.update(server)) {
789         // Primary lastIsMaster store it
790         if(server.lastIsMaster() && server.lastIsMaster().ismaster) {
791           self.ismaster = server.lastIsMaster();
792         }
793       }
794
795       // Add event handlers
796       server.once('close', handleInitialConnectEvent(self, 'close'));
797       server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
798       server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
799       server.once('error', handleInitialConnectEvent(self, 'error'));
800       server.once('connect', handleInitialConnectEvent(self, 'connect'));
801       // SDAM Monitoring events
802       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
803       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
804       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
805       // Start connection
806       server.connect(self.s.connectOptions);
807     }, timeoutInterval);
808   }
809
810   // Start all the servers
811   while(servers.length > 0) {
812     connect(servers.shift(), timeoutInterval++);
813   }
814 }
815
816 /**
817  * Emit event if it exists
818  * @method
819  */
820 function emitSDAMEvent(self, event, description) {
821   if(self.listeners(event).length > 0) {
822     self.emit(event, description);
823   }
824 }
825
826 /**
827  * Initiate server connect
828  * @method
829  * @param {array} [options.auth=null] Array of auth options to apply on connect
830  */
831 ReplSet.prototype.connect = function(options) {
832   var self = this;
833   // Add any connect level options to the internal state
834   this.s.connectOptions = options || {};
835   // Set connecting state
836   stateTransition(this, CONNECTING);
837   // Create server instances
838   var servers = this.s.seedlist.map(function(x) {
839     return new Server(assign({}, self.s.options, x, {
840       authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
841     }, {
842       clientInfo: clone(self.s.clientInfo)
843     }));
844   });
845
846   // Error out as high availbility interval must be < than socketTimeout
847   if(this.s.options.socketTimeout > 0 && this.s.options.socketTimeout <= this.s.options.haInterval) {
848     return self.emit('error', new MongoError(f("haInterval [%s] MS must be set to less than socketTimeout [%s] MS"
849       , this.s.options.haInterval, this.s.options.socketTimeout)));
850   }
851
852   // Emit the topology opening event
853   emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
854   // Start all server connections
855   connectServers(self, servers);
856 }
857
858 /**
859  * Destroy the server connection
860  * @param {boolean} [options.force=false] Force destroy the pool
861  * @method
862  */
863 ReplSet.prototype.destroy = function(options) {
864   options = options || {};
865   // Transition state
866   stateTransition(this, DESTROYED);
867   // Clear out any monitoring process
868   if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
869   // Destroy the replicaset
870   this.s.replicaSetState.destroy(options);
871
872   // Destroy all connecting servers
873   this.s.connectingServers.forEach(function(x) {
874     x.destroy(options);
875   });
876
877   // Emit toplogy closing event
878   emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
879 }
880
881 /**
882  * Unref all connections belong to this server
883  * @method
884  */
885 ReplSet.prototype.unref = function() {
886   // Transition state
887   stateTransition(this, DISCONNECTED);
888
889   this.s.replicaSetState.allServers().forEach(function(x) {
890     x.unref();
891   });
892
893   clearTimeout(this.haTimeoutId);
894 }
895
896 /**
897  * Returns the last known ismaster document for this server
898  * @method
899  * @return {object}
900  */
901 ReplSet.prototype.lastIsMaster = function() {
902   return this.s.replicaSetState.primary
903     ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster;
904 }
905
906 /**
907  * All raw connections
908  * @method
909  * @return {Connection[]}
910  */
911 ReplSet.prototype.connections = function() {
912   var servers = this.s.replicaSetState.allServers();
913   var connections = [];
914   for(var i = 0; i < servers.length; i++) {
915     connections = connections.concat(servers[i].connections());
916   }
917
918   return connections;
919 }
920
921 /**
922  * Figure out if the server is connected
923  * @method
924  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
925  * @return {boolean}
926  */
927 ReplSet.prototype.isConnected = function(options) {
928   options = options || {};
929
930   // If we are authenticating signal not connected
931   // To avoid interleaving of operations
932   if(this.authenticating) return false;
933
934   // If we specified a read preference check if we are connected to something
935   // than can satisfy this
936   if(options.readPreference
937     && options.readPreference.equals(ReadPreference.secondary)) {
938     return this.s.replicaSetState.hasSecondary();
939   }
940
941   if(options.readPreference
942     && options.readPreference.equals(ReadPreference.primary)) {
943     return this.s.replicaSetState.hasPrimary();
944   }
945
946   if(options.readPreference
947     && options.readPreference.equals(ReadPreference.primaryPreferred)) {
948     return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
949   }
950
951   if(options.readPreference
952     && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
953     return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
954   }
955
956   if(this.s.secondaryOnlyConnectionAllowed
957     && this.s.replicaSetState.hasSecondary()) {
958       return true;
959   }
960
961   return this.s.replicaSetState.hasPrimary();
962 }
963
964 /**
965  * Figure out if the replicaset instance was destroyed by calling destroy
966  * @method
967  * @return {boolean}
968  */
969 ReplSet.prototype.isDestroyed = function() {
970   return this.state == DESTROYED;
971 }
972
973 /**
974  * Get server
975  * @method
976  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
977  * @return {Server}
978  */
979 ReplSet.prototype.getServer = function(options) {
980   // Ensure we have no options
981   options = options || {};
982
983   // Pick the right server baspickServerd on readPreference
984   var server = this.s.replicaSetState.pickServer(options.readPreference);
985   if(this.s.debug) this.emit('pickedServer', options.readPreference, server);
986   return server;
987 }
988
989 /**
990  * Get all connected servers
991  * @method
992  * @return {Server[]}
993  */
994 ReplSet.prototype.getServers = function() {
995   return this.s.replicaSetState.allServers();
996 }
997
998 //
999 // Execute write operation
1000 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
1001   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1002   // Ensure we have no options
1003   options = options || {};
1004
1005   // No server returned we had an error
1006   if(self.s.replicaSetState.primary == null) {
1007     return callback(new MongoError("no primary server found"));
1008   }
1009
1010   // Execute the command
1011   self.s.replicaSetState.primary[op](ns, ops, options, callback);
1012 }
1013
1014 /**
1015  * Insert one or more documents
1016  * @method
1017  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1018  * @param {array} ops An array of documents to insert
1019  * @param {boolean} [options.ordered=true] Execute in order or out of order
1020  * @param {object} [options.writeConcern={}] Write concern for the operation
1021  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1022  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1023  * @param {opResultCallback} callback A callback function
1024  */
1025 ReplSet.prototype.insert = function(ns, ops, options, callback) {
1026   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1027   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1028
1029   // Not connected but we have a disconnecthandler
1030   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1031     return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
1032   }
1033
1034   // Execute write operation
1035   executeWriteOperation(this, 'insert', ns, ops, options, callback);
1036 }
1037
1038 /**
1039  * Perform one or more update operations
1040  * @method
1041  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1042  * @param {array} ops An array of updates
1043  * @param {boolean} [options.ordered=true] Execute in order or out of order
1044  * @param {object} [options.writeConcern={}] Write concern for the operation
1045  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1046  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1047  * @param {opResultCallback} callback A callback function
1048  */
1049 ReplSet.prototype.update = function(ns, ops, options, callback) {
1050   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1051   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1052
1053   // Not connected but we have a disconnecthandler
1054   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1055     return this.s.disconnectHandler.add('update', ns, ops, options, callback);
1056   }
1057
1058   // Execute write operation
1059   executeWriteOperation(this, 'update', ns, ops, options, callback);
1060 }
1061
1062 /**
1063  * Perform one or more remove operations
1064  * @method
1065  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1066  * @param {array} ops An array of removes
1067  * @param {boolean} [options.ordered=true] Execute in order or out of order
1068  * @param {object} [options.writeConcern={}] Write concern for the operation
1069  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1070  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1071  * @param {opResultCallback} callback A callback function
1072  */
1073 ReplSet.prototype.remove = function(ns, ops, options, callback) {
1074   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1075   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1076
1077   // Not connected but we have a disconnecthandler
1078   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1079     return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
1080   }
1081
1082   // Execute write operation
1083   executeWriteOperation(this, 'remove', ns, ops, options, callback);
1084 }
1085
1086 /**
1087  * Execute a command
1088  * @method
1089  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1090  * @param {object} cmd The command hash
1091  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1092  * @param {Connection} [options.connection] Specify connection object to execute command against
1093  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1094  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1095  * @param {opResultCallback} callback A callback function
1096  */
1097 ReplSet.prototype.command = function(ns, cmd, options, callback) {
1098   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1099   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1100   var self = this;
1101
1102   // Establish readPreference
1103   var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
1104
1105   // If the readPreference is primary and we have no primary, store it
1106   if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1107     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1108   } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) {
1109     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1110   } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1111     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1112   }
1113
1114   // Pick a server
1115   var server = this.s.replicaSetState.pickServer(readPreference);
1116   // We received an error, return it
1117   if(!(server instanceof Server)) return callback(server);
1118   // Emit debug event
1119   if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
1120
1121   // No server returned we had an error
1122   if(server == null) {
1123     return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference)));
1124   }
1125
1126   // Execute the command
1127   server.command(ns, cmd, options, callback);
1128 }
1129
1130 /**
1131  * Authenticate using a specified mechanism
1132  * @method
1133  * @param {string} mechanism The Auth mechanism we are invoking
1134  * @param {string} db The db we are invoking the mechanism against
1135  * @param {...object} param Parameters for the specific mechanism
1136  * @param {authResultCallback} callback A callback function
1137  */
1138 ReplSet.prototype.auth = function(mechanism, db) {
1139   var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
1140   var self = this;
1141   var args = Array.prototype.slice.call(arguments, 2);
1142   var callback = args.pop();
1143
1144   // If we don't have the mechanism fail
1145   if(this.authProviders[mechanism] == null && mechanism != 'default') {
1146     return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
1147   }
1148
1149   // Are we already authenticating, throw
1150   if(this.authenticating) {
1151     return callback(new MongoError('authentication or logout allready in process'));
1152   }
1153
1154   // Topology is not connected, save the call in the provided store to be
1155   // Executed at some point when the handler deems it's reconnected
1156   if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) {
1157     return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
1158   }
1159
1160   // Set to authenticating
1161   this.authenticating = true;
1162   // All errors
1163   var errors = [];
1164
1165   // Get all the servers
1166   var servers = this.s.replicaSetState.allServers();
1167   // No servers return
1168   if(servers.length == 0) {
1169     this.authenticating = false;
1170     callback(null, true);
1171   }
1172
1173   // Authenticate
1174   function auth(server) {
1175     // Arguments without a callback
1176     var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
1177     // Create arguments
1178     var finalArguments = argsWithoutCallback.concat([function(err) {
1179       count = count - 1;
1180       // Save all the errors
1181       if(err) errors.push({name: server.name, err: err});
1182       // We are done
1183       if(count == 0) {
1184         // Auth is done
1185         self.authenticating = false;
1186
1187         // Return the auth error
1188         if(errors.length) return callback(MongoError.create({
1189           message: 'authentication fail', errors: errors
1190         }), false);
1191
1192         // Successfully authenticated session
1193         callback(null, self);
1194       }
1195     }]);
1196
1197     if(!server.lastIsMaster().arbiterOnly) {
1198       // Execute the auth only against non arbiter servers
1199       server.auth.apply(server, finalArguments);
1200     } else {
1201       // If we are authenticating against an arbiter just ignore it
1202       finalArguments.pop()(null);
1203     }
1204   }
1205
1206   // Get total count
1207   var count = servers.length;
1208   // Authenticate against all servers
1209   while(servers.length > 0) {
1210     auth(servers.shift());
1211   }
1212 }
1213
1214 /**
1215  * Logout from a database
1216  * @method
1217  * @param {string} db The db we are logging out from
1218  * @param {authResultCallback} callback A callback function
1219  */
1220 ReplSet.prototype.logout = function(dbName, callback) {
1221   var self = this;
1222   // Are we authenticating or logging out, throw
1223   if(this.authenticating) {
1224     throw new MongoError('authentication or logout allready in process');
1225   }
1226
1227   // Ensure no new members are processed while logging out
1228   this.authenticating = true;
1229
1230   // Remove from all auth providers (avoid any reaplication of the auth details)
1231   var providers = Object.keys(this.authProviders);
1232   for(var i = 0; i < providers.length; i++) {
1233     this.authProviders[providers[i]].logout(dbName);
1234   }
1235
1236   // Now logout all the servers
1237   var servers = this.s.replicaSetState.allServers();
1238   var count = servers.length;
1239   if(count == 0) return callback();
1240   var errors = [];
1241
1242   function logoutServer(_server, cb) {
1243     _server.logout(dbName, function(err) {
1244       if(err) errors.push({name: _server.name, err: err});
1245       cb();
1246     });
1247   }
1248
1249   // Execute logout on all server instances
1250   for(i = 0; i < servers.length; i++) {
1251     logoutServer(servers[i], function() {
1252       count = count - 1;
1253
1254       if(count == 0) {
1255         // Do not block new operations
1256         self.authenticating = false;
1257         // If we have one or more errors
1258         if(errors.length) return callback(MongoError.create({
1259           message: f('logout failed against db %s', dbName), errors: errors
1260         }), false);
1261
1262         // No errors
1263         callback();
1264       }
1265     })
1266   }
1267 }
1268
1269 /**
1270  * Perform one or more remove operations
1271  * @method
1272  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1273  * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
1274  * @param {object} [options.batchSize=0] Batchsize for the operation
1275  * @param {array} [options.documents=[]] Initial documents list for cursor
1276  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1277  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1278  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1279  * @param {opResultCallback} callback A callback function
1280  */
1281 ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
1282   cursorOptions = cursorOptions || {};
1283   var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
1284   return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
1285 }
1286
1287 /**
1288  * A replset connect event, used to verify that the connection is up and running
1289  *
1290  * @event ReplSet#connect
1291  * @type {ReplSet}
1292  */
1293
1294 /**
1295  * A replset reconnect event, used to verify that the topology reconnected
1296  *
1297  * @event ReplSet#reconnect
1298  * @type {ReplSet}
1299  */
1300
1301 /**
1302  * A replset fullsetup event, used to signal that all topology members have been contacted.
1303  *
1304  * @event ReplSet#fullsetup
1305  * @type {ReplSet}
1306  */
1307
1308 /**
1309  * A replset all event, used to signal that all topology members have been contacted.
1310  *
1311  * @event ReplSet#all
1312  * @type {ReplSet}
1313  */
1314
1315 /**
1316  * A replset failed event, used to signal that initial replset connection failed.
1317  *
1318  * @event ReplSet#failed
1319  * @type {ReplSet}
1320  */
1321
1322 /**
1323  * A server member left the replicaset
1324  *
1325  * @event ReplSet#left
1326  * @type {function}
1327  * @param {string} type The type of member that left (primary|secondary|arbiter)
1328  * @param {Server} server The server object that left
1329  */
1330
1331 /**
1332  * A server member joined the replicaset
1333  *
1334  * @event ReplSet#joined
1335  * @type {function}
1336  * @param {string} type The type of member that joined (primary|secondary|arbiter)
1337  * @param {Server} server The server object that joined
1338  */
1339
1340 /**
1341  * A server opening SDAM monitoring event
1342  *
1343  * @event ReplSet#serverOpening
1344  * @type {object}
1345  */
1346
1347 /**
1348  * A server closed SDAM monitoring event
1349  *
1350  * @event ReplSet#serverClosed
1351  * @type {object}
1352  */
1353
1354 /**
1355  * A server description SDAM change monitoring event
1356  *
1357  * @event ReplSet#serverDescriptionChanged
1358  * @type {object}
1359  */
1360
1361 /**
1362  * A topology open SDAM event
1363  *
1364  * @event ReplSet#topologyOpening
1365  * @type {object}
1366  */
1367
1368 /**
1369  * A topology closed SDAM event
1370  *
1371  * @event ReplSet#topologyClosed
1372  * @type {object}
1373  */
1374
1375 /**
1376  * A topology structure SDAM change event
1377  *
1378  * @event ReplSet#topologyDescriptionChanged
1379  * @type {object}
1380  */
1381
1382 /**
1383  * A topology serverHeartbeatStarted SDAM event
1384  *
1385  * @event ReplSet#serverHeartbeatStarted
1386  * @type {object}
1387  */
1388
1389 /**
1390  * A topology serverHeartbeatFailed SDAM event
1391  *
1392  * @event ReplSet#serverHeartbeatFailed
1393  * @type {object}
1394  */
1395
1396 /**
1397  * A topology serverHeartbeatSucceeded SDAM change event
1398  *
1399  * @event ReplSet#serverHeartbeatSucceeded
1400  * @type {object}
1401  */
1402
1403 module.exports = ReplSet;