90a6862fbdad297ed76ac27c3836fbc7e81c5c8d
[aai/esr-gui.git] /
1 "use strict"
2
3 var inherits = require('util').inherits,
4   f = require('util').format,
5   EventEmitter = require('events').EventEmitter,
6   BSON = require('bson').native().BSON,
7   ReadPreference = require('./read_preference'),
8   BasicCursor = require('../cursor'),
9   Logger = require('../connection/logger'),
10   debugOptions = require('../connection/utils').debugOptions,
11   MongoError = require('../error'),
12   Server = require('./server'),
13   ReplSetState = require('./replset_state'),
14   assign = require('./shared').assign,
15   clone = require('./shared').clone,
16   createClientInfo = require('./shared').createClientInfo;
17
18 var MongoCR = require('../auth/mongocr')
19   , X509 = require('../auth/x509')
20   , Plain = require('../auth/plain')
21   , GSSAPI = require('../auth/gssapi')
22   , SSPI = require('../auth/sspi')
23   , ScramSHA1 = require('../auth/scram');
24
25 //
26 // States
27 var DISCONNECTED = 'disconnected';
28 var CONNECTING = 'connecting';
29 var CONNECTED = 'connected';
30 var DESTROYED = 'destroyed';
31
32 function stateTransition(self, newState) {
33   var legalTransitions = {
34     'disconnected': [CONNECTING, DESTROYED, DISCONNECTED],
35     'connecting': [CONNECTING, DESTROYED, CONNECTED, DISCONNECTED],
36     'connected': [CONNECTED, DISCONNECTED, DESTROYED],
37     'destroyed': [DESTROYED]
38   }
39
40   // Get current state
41   var legalStates = legalTransitions[self.state];
42   if(legalStates && legalStates.indexOf(newState) != -1) {
43     self.state = newState;
44   } else {
45     self.logger.error(f('Pool with id [%s] failed attempted illegal state transition from [%s] to [%s] only following state allowed [%s]'
46       , self.id, self.state, newState, legalStates));
47   }
48 }
49
50 //
51 // ReplSet instance id
52 var id = 1;
53 var handlers = ['connect', 'close', 'error', 'timeout', 'parseError'];
54
55 /**
56  * Creates a new Replset instance
57  * @class
58  * @param {array} seedlist A list of seeds for the replicaset
59  * @param {boolean} options.setName The Replicaset set name
60  * @param {boolean} [options.secondaryOnlyConnectionAllowed=false] Allow connection to a secondary only replicaset
61  * @param {number} [options.haInterval=10000] The High availability period for replicaset inquiry
62  * @param {boolean} [options.emitError=false] Server will emit errors events
63  * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
64  * @param {number} [options.size=5] Server connection pool size
65  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
66  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
67  * @param {boolean} [options.noDelay=true] TCP Connection no delay
68  * @param {number} [options.connectionTimeout=10000] TCP Connection timeout setting
69  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
70  * @param {boolean} [options.singleBufferSerializtion=true] Serialize into single buffer, trade of peak memory for serialization speed
71  * @param {boolean} [options.ssl=false] Use SSL for connection
72  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
73  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
74  * @param {Buffer} [options.cert] SSL Certificate binary buffer
75  * @param {Buffer} [options.key] SSL Key file binary buffer
76  * @param {string} [options.passphrase] SSL Certificate pass phrase
77  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
78  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
79  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
80  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
81  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
82  * @param {number} [options.pingInterval=5000] Ping interval to check the response time to the different servers
83  * @param {number} [options.localThresholdMS=15] Cutoff latency point in MS for MongoS proxy selection
84  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
85  * @return {ReplSet} A cursor instance
86  * @fires ReplSet#connect
87  * @fires ReplSet#ha
88  * @fires ReplSet#joined
89  * @fires ReplSet#left
90  * @fires ReplSet#failed
91  * @fires ReplSet#fullsetup
92  * @fires ReplSet#all
93  * @fires ReplSet#error
94  * @fires ReplSet#serverHeartbeatStarted
95  * @fires ReplSet#serverHeartbeatSucceeded
96  * @fires ReplSet#serverHeartbeatFailed
97  * @fires ReplSet#topologyOpening
98  * @fires ReplSet#topologyClosed
99  * @fires ReplSet#topologyDescriptionChanged
100  */
101 var ReplSet = function(seedlist, options) {
102   var self = this;
103   options = options || {};
104
105   // Validate seedlist
106   if(!Array.isArray(seedlist)) throw new MongoError("seedlist must be an array");
107   // Validate list
108   if(seedlist.length == 0) throw new MongoError("seedlist must contain at least one entry");
109   // Validate entries
110   seedlist.forEach(function(e) {
111     if(typeof e.host != 'string' || typeof e.port != 'number')
112       throw new MongoError("seedlist entry must contain a host and port");
113   });
114
115   // Add event listener
116   EventEmitter.call(this);
117
118   // Get replSet Id
119   this.id = id++;
120
121   // Get the localThresholdMS
122   var localThresholdMS = options.localThresholdMS || 15;
123   // Backward compatibility
124   if(options.acceptableLatency) localThresholdMS = options.acceptableLatency;
125
126   // Create a logger
127   var logger = Logger('ReplSet', options);
128
129   // Internal state
130   this.s = {
131     options: assign({}, options),
132     // BSON instance
133     bson: options.bson || new BSON(),
134     // Factory overrides
135     Cursor: options.cursorFactory || BasicCursor,
136     // Logger instance
137     logger: logger,
138     // Seedlist
139     seedlist: seedlist,
140     // Replicaset state
141     replicaSetState: new ReplSetState({
142       id: this.id, setName: options.setName,
143       acceptableLatency: localThresholdMS,
144       heartbeatFrequencyMS: options.haInterval ? options.haInterval : 10000,
145       logger: logger
146     }),
147     // Current servers we are connecting to
148     connectingServers: [],
149     // Ha interval
150     haInterval: options.haInterval ? options.haInterval : 10000,
151     // Minimum heartbeat frequency used if we detect a server close
152     minHeartbeatFrequencyMS: 500,
153     // Disconnect handler
154     disconnectHandler: options.disconnectHandler,
155     // Server selection index
156     index: 0,
157     // Connect function options passed in
158     connectOptions: {},
159     // Are we running in debug mode
160     debug: typeof options.debug == 'boolean' ? options.debug : false,
161     // Client info
162     clientInfo: createClientInfo(options)
163   }
164
165   // Add handler for topology change
166   this.s.replicaSetState.on('topologyDescriptionChanged', function(r) { self.emit('topologyDescriptionChanged', r); });
167
168   // Log info warning if the socketTimeout < haInterval as it will cause
169   // a lot of recycled connections to happen.
170   if(this.s.logger.isWarn()
171     && this.s.options.socketTimeout != 0
172     && this.s.options.socketTimeout < this.s.haInterval) {
173       this.s.logger.warn(f('warning socketTimeout %s is less than haInterval %s. This might cause unnecessary server reconnections due to socket timeouts'
174         , this.s.options.socketTimeout, this.s.haInterval));
175   }
176
177   // All the authProviders
178   this.authProviders = options.authProviders || {
179       'mongocr': new MongoCR(this.s.bson), 'x509': new X509(this.s.bson)
180     , 'plain': new Plain(this.s.bson), 'gssapi': new GSSAPI(this.s.bson)
181     , 'sspi': new SSPI(this.s.bson), 'scram-sha-1': new ScramSHA1(this.s.bson)
182   }
183
184   // Add forwarding of events from state handler
185   var types = ['joined', 'left'];
186   types.forEach(function(x) {
187     self.s.replicaSetState.on(x, function(t, s) {
188       self.emit(x, t, s);
189     });
190   });
191
192   // Connect stat
193   this.initialConnectState = {
194     connect: false, fullsetup: false, all: false
195   }
196
197   // Disconnected state
198   this.state = DISCONNECTED;
199   this.haTimeoutId = null;
200   // Are we authenticating
201   this.authenticating = false;
202   // Last ismaster
203   this.ismaster = null;
204 }
205
206 inherits(ReplSet, EventEmitter);
207
208 Object.defineProperty(ReplSet.prototype, 'type', {
209   enumerable:true, get: function() { return 'replset'; }
210 });
211
212 function attemptReconnect(self) {
213   if(self.runningAttempReconnect) return;
214   // Set as running
215   self.runningAttempReconnect = true;
216   // Wait before execute
217   self.haTimeoutId = setTimeout(function() {
218     if(self.state == DESTROYED) return;
219
220     // Debug log
221     if(self.s.logger.isDebug()) {
222       self.s.logger.debug(f('attemptReconnect for replset with id %s', self.id));
223     }
224
225     // Get all known hosts
226     var keys = Object.keys(self.s.replicaSetState.set);
227     var servers = keys.map(function(x) {
228       return new Server(assign({}, self.s.options, {
229         host: x.split(':')[0], port: parseInt(x.split(':')[1], 10)
230       }, {
231         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
232       }, {
233         clientInfo: clone(self.s.clientInfo)
234       }));
235     });
236
237     // Create the list of servers
238     self.s.connectingServers = servers.slice(0);
239
240     // Handle all events coming from servers
241     function _handleEvent(self, event) {
242       return function(err) {
243         // Destroy the instance
244         if(self.state == DESTROYED) {
245           return this.destroy();
246         }
247
248         // Debug log
249         if(self.s.logger.isDebug()) {
250           self.s.logger.debug(f('attemptReconnect for replset with id %s using server %s ended with event %s', self.id, this.name, event));
251         }
252
253         // Check if we are done
254         function done() {
255           // Done with the reconnection attempt
256           if(self.s.connectingServers.length == 0) {
257             if(self.state == DESTROYED) return;
258
259             // If we have a primary and a disconnect handler, execute
260             // buffered operations
261             if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
262               self.s.disconnectHandler.execute();
263             } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
264               self.s.disconnectHandler.execute({ executePrimary:true });
265             } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
266               self.s.disconnectHandler.execute({ executeSecondary:true });
267             }
268
269             // Do we have a primary
270             if(self.s.replicaSetState.hasPrimary()) {
271               // Connect any missing servers
272               connectNewServers(self, self.s.replicaSetState.unknownServers, function(err, cb) {
273                 // Debug log
274                 if(self.s.logger.isDebug()) {
275                   self.s.logger.debug(f('attemptReconnect for replset with id successful resuming topologyMonitor', self.id));
276                 }
277
278                 // Reset the running
279                 self.runningAttempReconnect = false;
280                 // Go back to normal topology monitoring
281                 topologyMonitor(self);
282               });
283             } else {
284               if(self.listeners("close").length > 0) {
285                 self.emit('close', self);
286               }
287
288               // Reset the running
289               self.runningAttempReconnect = false;
290               // Attempt a new reconnect
291               attemptReconnect(self);
292             }
293           }
294         }
295
296         // Remove the server from our list
297         for(var i = 0; i < self.s.connectingServers.length; i++) {
298           if(self.s.connectingServers[i].equals(this)) {
299             self.s.connectingServers.splice(i, 1);
300           }
301         }
302
303         // Keep reference to server
304         var _self = this;
305
306         // Debug log
307         if(self.s.logger.isDebug()) {
308           self.s.logger.debug(f('attemptReconnect in replset with id %s for', self.id));
309         }
310
311         // Connect and not authenticating
312         if(event == 'connect' && !self.authenticating) {
313           if(self.state == DESTROYED) {
314             return _self.destroy();
315           }
316
317           // Update the replicaset state
318           if(self.s.replicaSetState.update(_self)) {
319             // Primary lastIsMaster store it
320             if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
321               self.ismaster = _self.lastIsMaster();
322             }
323
324             // Remove the handlers
325             for(var i = 0; i < handlers.length; i++) {
326               _self.removeAllListeners(handlers[i]);
327             }
328
329             // Add stable state handlers
330             _self.on('error', handleEvent(self, 'error'));
331             _self.on('close', handleEvent(self, 'close'));
332             _self.on('timeout', handleEvent(self, 'timeout'));
333             _self.on('parseError', handleEvent(self, 'parseError'));
334           } else {
335             _self.destroy();
336           }
337         } else if(event == 'connect' && self.authenticating) {
338           this.destroy();
339         }
340
341         done();
342       }
343     }
344
345     // Index used to interleaf the server connects, avoiding
346     // runtime issues on io constrained vm's
347     var timeoutInterval = 0;
348
349     function connect(server, timeoutInterval) {
350       setTimeout(function() {
351         server.once('connect', _handleEvent(self, 'connect'));
352         server.once('close', _handleEvent(self, 'close'));
353         server.once('timeout', _handleEvent(self, 'timeout'));
354         server.once('error', _handleEvent(self, 'error'));
355         server.once('parseError', _handleEvent(self, 'parseError'));
356
357         // SDAM Monitoring events
358         server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
359         server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
360         server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
361
362         server.connect(self.s.connectOptions);
363       }, timeoutInterval);
364     }
365
366     // Connect all servers
367     while(servers.length > 0) {
368       connect(servers.shift(), timeoutInterval++);
369     }
370   }, self.s.minHeartbeatFrequencyMS);
371 }
372
373 function connectNewServers(self, servers, callback) {
374   // Count lefts
375   var count = servers.length;
376
377   // Handle events
378   var _handleEvent = function(self, event) {
379     return function(err, r) {
380       var _self = this;
381       count = count - 1;
382
383       // Destroyed
384       if(self.state == DESTROYED) {
385         return this.destroy();
386       }
387
388       if(event == 'connect' && !self.authenticating) {
389         // Destroyed
390         if(self.state == DESTROYED) {
391           return _self.destroy();
392         }
393
394         var result = self.s.replicaSetState.update(_self);
395         // Update the state with the new server
396         if(result) {
397           // Primary lastIsMaster store it
398           if(_self.lastIsMaster() && _self.lastIsMaster().ismaster) {
399             self.ismaster = _self.lastIsMaster();
400           }
401
402           // Remove the handlers
403           for(var i = 0; i < handlers.length; i++) {
404             _self.removeAllListeners(handlers[i]);
405           }
406
407           // Add stable state handlers
408           _self.on('error', handleEvent(self, 'error'));
409           _self.on('close', handleEvent(self, 'close'));
410           _self.on('timeout', handleEvent(self, 'timeout'));
411           _self.on('parseError', handleEvent(self, 'parseError'));
412         } else {
413           _self.destroy();
414         }
415       } else if(event == 'connect' && self.authenticating) {
416         this.destroy();
417       }
418
419       // Are we done finish up callback
420       if(count == 0) { callback(); }
421     }
422   }
423
424   // No new servers
425   if(count == 0) return callback();
426
427   // Execute method
428   function execute(_server, i) {
429     setTimeout(function() {
430       // Destroyed
431       if(self.state == DESTROYED) {
432         return;
433       }
434
435       // Create a new server instance
436       var server = new Server(assign({}, self.s.options, {
437         host: _server.split(':')[0],
438         port: parseInt(_server.split(':')[1], 10)
439       }, {
440         authProviders: self.authProviders, reconnect:false, monitoring: false, inTopology: true
441       }, {
442         clientInfo: clone(self.s.clientInfo)
443       }));
444
445       // Add temp handlers
446       server.once('connect', _handleEvent(self, 'connect'));
447       server.once('close', _handleEvent(self, 'close'));
448       server.once('timeout', _handleEvent(self, 'timeout'));
449       server.once('error', _handleEvent(self, 'error'));
450       server.once('parseError', _handleEvent(self, 'parseError'));
451
452       // SDAM Monitoring events
453       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
454       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
455       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
456       server.connect(self.s.connectOptions);
457     }, i);
458   }
459
460   // Create new instances
461   for(var i = 0; i < servers.length; i++) {
462     execute(servers[i], i);
463   }
464 }
465
466 function topologyMonitor(self, options) {
467   options = options || {};
468
469   // Set momitoring timeout
470   self.haTimeoutId = setTimeout(function() {
471     if(self.state == DESTROYED) return;
472
473     // Is this a on connect topology discovery
474     // Schedule a proper topology monitoring to happen
475     // To ensure any discovered servers do not timeout
476     // while waiting for the initial discovery to happen.
477     if(options.haInterval) {
478       topologyMonitor(self);
479     }
480
481     // If we have a primary and a disconnect handler, execute
482     // buffered operations
483     if(self.s.replicaSetState.hasPrimaryAndSecondary() && self.s.disconnectHandler) {
484       self.s.disconnectHandler.execute();
485     } else if(self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler) {
486       self.s.disconnectHandler.execute({ executePrimary:true });
487     } else if(self.s.replicaSetState.hasSecondary() && self.s.disconnectHandler) {
488       self.s.disconnectHandler.execute({ executeSecondary:true });
489     }
490
491     // Get the connectingServers
492     var connectingServers = self.s.replicaSetState.allServers();
493     // Debug log
494     if(self.s.logger.isDebug()) {
495       self.s.logger.debug(f('topologyMonitor in replset with id %s connected servers [%s]'
496         , self.id
497         , connectingServers.map(function(x) {
498           return x.name;
499         })));
500     }
501     // Get the count
502     var count = connectingServers.length;
503
504     // If we have no servers connected
505     if(count == 0 && !options.haInterval) {
506       if(self.listeners("close").length > 0) {
507         self.emit('close', self);
508       }
509
510       return attemptReconnect(self);
511     }
512
513     // If the count is zero schedule a new fast
514     function pingServer(_self, _server, cb) {
515       // Measure running time
516       var start = new Date().getTime();
517
518       // Emit the server heartbeat start
519       emitSDAMEvent(self, 'serverHeartbeatStarted', { connectionId: _server.name });
520       // Execute ismaster
521       _server.command('admin.$cmd', {ismaster:true}, {monitoring: true}, function(err, r) {
522         if(self.state == DESTROYED) {
523           _server.destroy();
524           return cb(err, r);
525         }
526
527         // Calculate latency
528         var latencyMS = new Date().getTime() - start;
529
530         // Set the last updatedTime
531         var hrTime = process.hrtime();
532         // Calculate the last update time
533         _server.lastUpdateTime = hrTime[0] * 1000 + Math.round(hrTime[1]/1000);
534
535         // We had an error, remove it from the state
536         if(err) {
537           // Emit the server heartbeat failure
538           emitSDAMEvent(self, 'serverHeartbeatFailed', { durationMS: latencyMS, failure: err, connectionId: _server.name });
539         } else {
540           // Update the server ismaster
541           _server.ismaster = r.result;
542
543           // Check if we have a lastWriteDate convert it to MS
544           // and store on the server instance for later use
545           if(_server.ismaster.lastWrite && _server.ismaster.lastWrite.lastWriteDate) {
546             _server.lastWriteDate = _server.ismaster.lastWrite.lastWriteDate.getTime();
547           }
548
549           // Do we have a brand new server
550           if(_server.lastIsMasterMS == -1) {
551             _server.lastIsMasterMS = latencyMS;
552           } else if(_server.lastIsMasterMS) {
553             // After the first measurement, average RTT MUST be computed using an
554             // exponentially-weighted moving average formula, with a weighting factor (alpha) of 0.2.
555             // If the prior average is denoted old_rtt, then the new average (new_rtt) is
556             // computed from a new RTT measurement (x) using the following formula:
557             // alpha = 0.2
558             // new_rtt = alpha * x + (1 - alpha) * old_rtt
559             _server.lastIsMasterMS = 0.2 * latencyMS + (1 - 0.2) * _server.lastIsMasterMS;
560           }
561
562           if(_self.s.replicaSetState.update(_server)) {
563             // Primary lastIsMaster store it
564             if(_server.lastIsMaster() && _server.lastIsMaster().ismaster) {
565               self.ismaster = _server.lastIsMaster();
566             }
567           };
568
569           // Server heart beat event
570           emitSDAMEvent(self, 'serverHeartbeatSucceeded', { durationMS: latencyMS, reply: r.result, connectionId: _server.name });
571         }
572
573         // Calculate the stalness for this server
574         self.s.replicaSetState.updateServerMaxStaleness(_server, self.s.haInterval);
575
576         // Callback
577         cb(err, r);
578       });
579     }
580
581     // Connect any missing servers
582     function connectMissingServers() {
583       if(self.state == DESTROYED) return;
584
585       // Attempt to connect to any unknown servers
586       connectNewServers(self, self.s.replicaSetState.unknownServers, function(err, cb) {
587         if(self.state == DESTROYED) return;
588
589         // Check if we have an options.haInterval (meaning it was triggered from connect)
590         if(options.haInterval) {
591           // Do we have a primary and secondary
592           if(self.state == CONNECTING
593             && self.s.replicaSetState.hasPrimaryAndSecondary()) {
594             // Transition to connected
595             stateTransition(self, CONNECTED);
596             // Update initial state
597             self.initialConnectState.connect = true;
598             self.initialConnectState.fullsetup = true;
599             self.initialConnectState.all = true;
600             // Emit fullsetup and all events
601             process.nextTick(function() {
602               self.emit('connect', self);
603               self.emit('fullsetup', self);
604               self.emit('all', self);
605             });
606           } else if(self.state == CONNECTING
607             && self.s.replicaSetState.hasPrimary()) {
608               // Transition to connected
609               stateTransition(self, CONNECTED);
610               // Update initial state
611               self.initialConnectState.connect = true;
612               // Emit connected sign
613               process.nextTick(function() {
614                 self.emit('connect', self);
615               });
616           } else if(self.state == CONNECTING
617             && self.s.replicaSetState.hasSecondary()
618             && self.s.options.secondaryOnlyConnectionAllowed) {
619               // Transition to connected
620               stateTransition(self, CONNECTED);
621               // Update initial state
622               self.initialConnectState.connect = true;
623               // Emit connected sign
624               process.nextTick(function() {
625                 self.emit('connect', self);
626               });
627           } else if(self.state == CONNECTING) {
628             self.emit('error', new MongoError('no primary found in replicaset'));
629             // Destroy the topology
630             return self.destroy();
631           } else if(self.state == CONNECTED
632             && self.s.replicaSetState.hasPrimaryAndSecondary()
633             && !self.initialConnectState.fullsetup) {
634               self.initialConnectState.fullsetup = true;
635             // Emit fullsetup and all events
636             process.nextTick(function() {
637               self.emit('fullsetup', self);
638               self.emit('all', self);
639             });
640           }
641         }
642
643         if(!options.haInterval) topologyMonitor(self);
644       });
645     }
646
647     // No connectingServers but unknown servers
648     if(connectingServers.length == 0
649       && self.s.replicaSetState.unknownServers.length > 0 && options.haInterval) {
650         return connectMissingServers();
651     } else if(connectingServers.length == 0 && options.haInterval) {
652       self.destroy();
653       return self.emit('error', new MongoError('no valid replicaset members found'));
654     }
655
656     // Ping all servers
657     for(var i = 0; i < connectingServers.length; i++) {
658       pingServer(self, connectingServers[i], function(err, r) {
659         count = count - 1;
660
661         if(count == 0) {
662           connectMissingServers();
663         }
664       });
665     }
666   }, options.haInterval || self.s.haInterval)
667 }
668
669 function handleEvent(self, event) {
670   return function(err) {
671     if(self.state == DESTROYED) return;
672     // Debug log
673     if(self.s.logger.isDebug()) {
674       self.s.logger.debug(f('handleEvent %s from server %s in replset with id %s', event, this.name, self.id));
675     }
676
677     self.s.replicaSetState.remove(this);
678   }
679 }
680
681 function handleInitialConnectEvent(self, event) {
682   return function(err) {
683     // Debug log
684     if(self.s.logger.isDebug()) {
685       self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s', event, this.name, self.id));
686     }
687
688     // Destroy the instance
689     if(self.state == DESTROYED) {
690       return this.destroy();
691     }
692
693     // Check the type of server
694     if(event == 'connect') {
695       // Update the state
696       var result = self.s.replicaSetState.update(this);
697       if(result == true) {
698         // Primary lastIsMaster store it
699         if(this.lastIsMaster() && this.lastIsMaster().ismaster) {
700           self.ismaster = this.lastIsMaster();
701         }
702
703         // Debug log
704         if(self.s.logger.isDebug()) {
705           self.s.logger.debug(f('handleInitialConnectEvent %s from server %s in replset with id %s has state [%s]', event, this.name, self.id, JSON.stringify(self.s.replicaSetState.set)));
706         }
707
708         // Remove the handlers
709         for(var i = 0; i < handlers.length; i++) {
710           this.removeAllListeners(handlers[i]);
711         }
712
713         // Add stable state handlers
714         this.on('error', handleEvent(self, 'error'));
715         this.on('close', handleEvent(self, 'close'));
716         this.on('timeout', handleEvent(self, 'timeout'));
717         this.on('parseError', handleEvent(self, 'parseError'));
718       } else if(result instanceof MongoError) {
719         this.destroy();
720         self.destroy();
721         return self.emit('error', result);
722       } else {
723         this.destroy();
724       }
725     } else {
726       // Emit failure to connect
727       self.emit('failed', this);
728       // Remove from the state
729       self.s.replicaSetState.remove(this);
730     }
731
732     // Remove from the list from connectingServers
733     for(var i = 0; i < self.s.connectingServers.length; i++) {
734       if(self.s.connectingServers[i].equals(this)) {
735         self.s.connectingServers.splice(i, 1);
736       }
737     }
738
739     // Trigger topologyMonitor
740     if(self.s.connectingServers.length == 0) {
741       topologyMonitor(self, {haInterval: 1});
742     }
743   };
744 }
745
746 function connectServers(self, servers) {
747   // Update connectingServers
748   self.s.connectingServers = self.s.connectingServers.concat(servers);
749
750   // Index used to interleaf the server connects, avoiding
751   // runtime issues on io constrained vm's
752   var timeoutInterval = 0;
753
754   function connect(server, timeoutInterval) {
755     setTimeout(function() {
756       // Add the server to the state
757       if(self.s.replicaSetState.update(server)) {
758         // Primary lastIsMaster store it
759         if(server.lastIsMaster() && server.lastIsMaster().ismaster) {
760           self.ismaster = server.lastIsMaster();
761         }
762       }
763
764       // Add event handlers
765       server.once('close', handleInitialConnectEvent(self, 'close'));
766       server.once('timeout', handleInitialConnectEvent(self, 'timeout'));
767       server.once('parseError', handleInitialConnectEvent(self, 'parseError'));
768       server.once('error', handleInitialConnectEvent(self, 'error'));
769       server.once('connect', handleInitialConnectEvent(self, 'connect'));
770       // SDAM Monitoring events
771       server.on('serverOpening', function(e) { self.emit('serverOpening', e); });
772       server.on('serverDescriptionChanged', function(e) { self.emit('serverDescriptionChanged', e); });
773       server.on('serverClosed', function(e) { self.emit('serverClosed', e); });
774       // Start connection
775       server.connect(self.s.connectOptions);
776     }, timeoutInterval);
777   }
778
779   // Start all the servers
780   while(servers.length > 0) {
781     connect(servers.shift(), timeoutInterval++);
782   }
783 }
784
785 /**
786  * Emit event if it exists
787  * @method
788  */
789 function emitSDAMEvent(self, event, description) {
790   if(self.listeners(event).length > 0) {
791     self.emit(event, description);
792   }
793 }
794
795 /**
796  * Initiate server connect
797  * @method
798  * @param {array} [options.auth=null] Array of auth options to apply on connect
799  */
800 ReplSet.prototype.connect = function(options) {
801   var self = this;
802   // Add any connect level options to the internal state
803   this.s.connectOptions = options || {};
804   // Set connecting state
805   stateTransition(this, CONNECTING);
806   // Create server instances
807   var servers = this.s.seedlist.map(function(x) {
808     return new Server(assign({}, self.s.options, x, {
809       authProviders: self.authProviders, reconnect:false, monitoring:false, inTopology: true
810     }, {
811       clientInfo: clone(self.s.clientInfo)
812     }));
813   });
814
815   // Emit the topology opening event
816   emitSDAMEvent(this, 'topologyOpening', { topologyId: this.id });
817
818   // Start all server connections
819   connectServers(self, servers);
820 }
821
822 /**
823  * Destroy the server connection
824  * @method
825  */
826 ReplSet.prototype.destroy = function() {
827   // Transition state
828   stateTransition(this, DESTROYED);
829   // Clear out any monitoring process
830   if(this.haTimeoutId) clearTimeout(this.haTimeoutId);
831   // Destroy the replicaset
832   this.s.replicaSetState.destroy();
833
834   // Destroy all connecting servers
835   this.s.connectingServers.forEach(function(x) {
836     x.destroy();
837   });
838
839   // Emit toplogy closing event
840   emitSDAMEvent(this, 'topologyClosed', { topologyId: this.id });
841 }
842
843 /**
844  * Unref all connections belong to this server
845  * @method
846  */
847 ReplSet.prototype.unref = function() {
848   // Transition state
849   stateTransition(this, DISCONNECTED);
850
851   this.s.replicaSetState.allServers().forEach(function(x) {
852     x.unref();
853   });
854
855   clearTimeout(this.haTimeoutId);
856 }
857
858 /**
859  * Returns the last known ismaster document for this server
860  * @method
861  * @return {object}
862  */
863 ReplSet.prototype.lastIsMaster = function() {
864   return this.s.replicaSetState.primary
865     ? this.s.replicaSetState.primary.lastIsMaster() : this.ismaster;
866 }
867
868 /**
869  * All raw connections
870  * @method
871  * @return {Connection[]}
872  */
873 ReplSet.prototype.connections = function() {
874   var servers = this.s.replicaSetState.allServers();
875   var connections = [];
876   for(var i = 0; i < servers.length; i++) {
877     connections = connections.concat(servers[i].connections());
878   }
879
880   return connections;
881 }
882
883 /**
884  * Figure out if the server is connected
885  * @method
886  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
887  * @return {boolean}
888  */
889 ReplSet.prototype.isConnected = function(options) {
890   options = options || {};
891
892   // If we are authenticating signal not connected
893   // To avoid interleaving of operations
894   if(this.authenticating) return false;
895
896   // If we specified a read preference check if we are connected to something
897   // than can satisfy this
898   if(options.readPreference
899     && options.readPreference.equals(ReadPreference.secondary)) {
900     return this.s.replicaSetState.hasSecondary();
901   }
902
903   if(options.readPreference
904     && options.readPreference.equals(ReadPreference.primary)) {
905     return this.s.replicaSetState.hasPrimary();
906   }
907
908   if(options.readPreference
909     && options.readPreference.equals(ReadPreference.primaryPreferred)) {
910     return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
911   }
912
913   if(options.readPreference
914     && options.readPreference.equals(ReadPreference.secondaryPreferred)) {
915     return this.s.replicaSetState.hasSecondary() || this.s.replicaSetState.hasPrimary();
916   }
917
918   if(this.s.secondaryOnlyConnectionAllowed
919     && this.s.replicaSetState.hasSecondary()) {
920       return true;
921   }
922
923   return this.s.replicaSetState.hasPrimary();
924 }
925
926 /**
927  * Figure out if the replicaset instance was destroyed by calling destroy
928  * @method
929  * @return {boolean}
930  */
931 ReplSet.prototype.isDestroyed = function() {
932   return this.state == DESTROYED;
933 }
934
935 /**
936  * Get server
937  * @method
938  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
939  * @return {Server}
940  */
941 ReplSet.prototype.getServer = function(options) {
942   // Ensure we have no options
943   options = options || {};
944
945   // Pick the right server baspickServerd on readPreference
946   var server = this.s.replicaSetState.pickServer(options.readPreference);
947   if(this.s.debug) this.emit('pickedServer', options.readPreference, server);
948   return server;
949 }
950
951 /**
952  * Get all connected servers
953  * @method
954  * @return {Server[]}
955  */
956 ReplSet.prototype.getServers = function() {
957   return this.s.replicaSetState.allServers();
958 }
959
960 function basicReadPreferenceValidation(self, options) {
961   if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
962     throw new Error("readPreference must be an instance of ReadPreference");
963   }
964 }
965
966 //
967 // Execute write operation
968 var executeWriteOperation = function(self, op, ns, ops, options, callback) {
969   if(typeof options == 'function') callback = options, options = {}, options = options || {};
970   // Ensure we have no options
971   options = options || {};
972
973   // No server returned we had an error
974   if(self.s.replicaSetState.primary == null) {
975     return callback(new MongoError("no primary server found"));
976   }
977
978   // Execute the command
979   self.s.replicaSetState.primary[op](ns, ops, options, callback);
980 }
981
982 /**
983  * Insert one or more documents
984  * @method
985  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
986  * @param {array} ops An array of documents to insert
987  * @param {boolean} [options.ordered=true] Execute in order or out of order
988  * @param {object} [options.writeConcern={}] Write concern for the operation
989  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
990  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
991  * @param {opResultCallback} callback A callback function
992  */
993 ReplSet.prototype.insert = function(ns, ops, options, callback) {
994   if(typeof options == 'function') callback = options, options = {}, options = options || {};
995   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
996
997   // Not connected but we have a disconnecthandler
998   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
999     return this.s.disconnectHandler.add('insert', ns, ops, options, callback);
1000   }
1001
1002   // Execute write operation
1003   executeWriteOperation(this, 'insert', ns, ops, options, callback);
1004 }
1005
1006 /**
1007  * Perform one or more update operations
1008  * @method
1009  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1010  * @param {array} ops An array of updates
1011  * @param {boolean} [options.ordered=true] Execute in order or out of order
1012  * @param {object} [options.writeConcern={}] Write concern for the operation
1013  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1014  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1015  * @param {opResultCallback} callback A callback function
1016  */
1017 ReplSet.prototype.update = function(ns, ops, options, callback) {
1018   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1019   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1020
1021   // Not connected but we have a disconnecthandler
1022   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1023     return this.s.disconnectHandler.add('update', ns, ops, options, callback);
1024   }
1025
1026   // Execute write operation
1027   executeWriteOperation(this, 'update', ns, ops, options, callback);
1028 }
1029
1030 /**
1031  * Perform one or more remove operations
1032  * @method
1033  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1034  * @param {array} ops An array of removes
1035  * @param {boolean} [options.ordered=true] Execute in order or out of order
1036  * @param {object} [options.writeConcern={}] Write concern for the operation
1037  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1038  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1039  * @param {opResultCallback} callback A callback function
1040  */
1041 ReplSet.prototype.remove = function(ns, ops, options, callback) {
1042   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1043   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1044
1045   // Not connected but we have a disconnecthandler
1046   if(!this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1047     return this.s.disconnectHandler.add('remove', ns, ops, options, callback);
1048   }
1049
1050   // Execute write operation
1051   executeWriteOperation(this, 'remove', ns, ops, options, callback);
1052 }
1053
1054 /**
1055  * Execute a command
1056  * @method
1057  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1058  * @param {object} cmd The command hash
1059  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1060  * @param {Connection} [options.connection] Specify connection object to execute command against
1061  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1062  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1063  * @param {opResultCallback} callback A callback function
1064  */
1065 ReplSet.prototype.command = function(ns, cmd, options, callback) {
1066   if(typeof options == 'function') callback = options, options = {}, options = options || {};
1067   if(this.state == DESTROYED) return callback(new MongoError(f('topology was destroyed')));
1068   var self = this;
1069
1070   // Establish readPreference
1071   var readPreference = options.readPreference ? options.readPreference : ReadPreference.primary;
1072
1073   // If the readPreference is primary and we have no primary, store it
1074   if(readPreference.preference == 'primary' && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1075     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1076   } else if(readPreference.preference == 'secondary' && !this.s.replicaSetState.hasSecondary() && this.s.disconnectHandler != null) {
1077     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1078   } else if(readPreference.preference != 'primary' && !this.s.replicaSetState.hasSecondary() && !this.s.replicaSetState.hasPrimary() && this.s.disconnectHandler != null) {
1079     return this.s.disconnectHandler.add('command', ns, cmd, options, callback);
1080   }
1081
1082   // Pick a server
1083   var server = this.s.replicaSetState.pickServer(readPreference);
1084   // We received an error, return it
1085   if(!(server instanceof Server)) return callback(server);
1086   // Emit debug event
1087   if(self.s.debug) self.emit('pickedServer', ReadPreference.primary, server);
1088
1089   // No server returned we had an error
1090   if(server == null) {
1091     return callback(new MongoError(f("no server found that matches the provided readPreference %s", readPreference)));
1092   }
1093
1094   // Execute the command
1095   server.command(ns, cmd, options, callback);
1096 }
1097
1098 /**
1099  * Authenticate using a specified mechanism
1100  * @method
1101  * @param {string} mechanism The Auth mechanism we are invoking
1102  * @param {string} db The db we are invoking the mechanism against
1103  * @param {...object} param Parameters for the specific mechanism
1104  * @param {authResultCallback} callback A callback function
1105  */
1106 ReplSet.prototype.auth = function(mechanism, db) {
1107   var allArgs = Array.prototype.slice.call(arguments, 0).slice(0);
1108   var self = this;
1109   var args = Array.prototype.slice.call(arguments, 2);
1110   var callback = args.pop();
1111
1112   // If we don't have the mechanism fail
1113   if(this.authProviders[mechanism] == null && mechanism != 'default') {
1114     return callback(new MongoError(f("auth provider %s does not exist", mechanism)));
1115   }
1116
1117   // Are we already authenticating, throw
1118   if(this.authenticating) {
1119     return callback(new MongoError('authentication or logout allready in process'));
1120   }
1121
1122   // Topology is not connected, save the call in the provided store to be
1123   // Executed at some point when the handler deems it's reconnected
1124   if(!self.s.replicaSetState.hasPrimary() && self.s.disconnectHandler != null) {
1125     return self.s.disconnectHandler.add('auth', db, allArgs, {}, callback);
1126   }
1127
1128   // Set to authenticating
1129   this.authenticating = true;
1130   // All errors
1131   var errors = [];
1132
1133   // Get all the servers
1134   var servers = this.s.replicaSetState.allServers();
1135   // No servers return
1136   if(servers.length == 0) {
1137     this.authenticating = false;
1138     callback(null, true);
1139   }
1140
1141   // Authenticate
1142   function auth(server) {
1143     // Arguments without a callback
1144     var argsWithoutCallback = [mechanism, db].concat(args.slice(0));
1145     // Create arguments
1146     var finalArguments = argsWithoutCallback.concat([function(err, r) {
1147       count = count - 1;
1148       // Save all the errors
1149       if(err) errors.push({name: server.name, err: err});
1150       // We are done
1151       if(count == 0) {
1152         // Auth is done
1153         self.authenticating = false;
1154
1155         // Return the auth error
1156         if(errors.length) return callback(MongoError.create({
1157           message: 'authentication fail', errors: errors
1158         }), false);
1159
1160         // Successfully authenticated session
1161         callback(null, self);
1162       }
1163     }]);
1164
1165     if(!server.lastIsMaster().arbiterOnly) {
1166       // Execute the auth only against non arbiter servers
1167       server.auth.apply(server, finalArguments);
1168     } else {
1169       // If we are authenticating against an arbiter just ignore it
1170       finalArguments.pop()(null);
1171     }
1172   }
1173
1174   // Get total count
1175   var count = servers.length;
1176   // Authenticate against all servers
1177   while(servers.length > 0) {
1178     auth(servers.shift());
1179   }
1180 }
1181
1182 /**
1183  * Logout from a database
1184  * @method
1185  * @param {string} db The db we are logging out from
1186  * @param {authResultCallback} callback A callback function
1187  */
1188 ReplSet.prototype.logout = function(dbName, callback) {
1189   var self = this;
1190   // Are we authenticating or logging out, throw
1191   if(this.authenticating) {
1192     throw new MongoError('authentication or logout allready in process');
1193   }
1194
1195   // Ensure no new members are processed while logging out
1196   this.authenticating = true;
1197
1198   // Remove from all auth providers (avoid any reaplication of the auth details)
1199   var providers = Object.keys(this.authProviders);
1200   for(var i = 0; i < providers.length; i++) {
1201     this.authProviders[providers[i]].logout(dbName);
1202   }
1203
1204   // Now logout all the servers
1205   var servers = this.s.replicaSetState.allServers();
1206   var count = servers.length;
1207   if(count == 0) return callback();
1208   var errors = [];
1209
1210   // Execute logout on all server instances
1211   for(var i = 0; i < servers.length; i++) {
1212     servers[i].logout(dbName, function(err) {
1213       count = count - 1;
1214       if(err) errors.push({name: server.name, err: err});
1215
1216       if(count == 0) {
1217         // Do not block new operations
1218         self.authenticating = false;
1219         // If we have one or more errors
1220         if(errors.length) return callback(MongoError.create({
1221           message: f('logout failed against db %s', dbName), errors: errors
1222         }), false);
1223
1224         // No errors
1225         callback();
1226       }
1227     });
1228   }
1229 }
1230
1231 /**
1232  * Perform one or more remove operations
1233  * @method
1234  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
1235  * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
1236  * @param {object} [options.batchSize=0] Batchsize for the operation
1237  * @param {array} [options.documents=[]] Initial documents list for cursor
1238  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
1239  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
1240  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
1241  * @param {opResultCallback} callback A callback function
1242  */
1243 ReplSet.prototype.cursor = function(ns, cmd, cursorOptions) {
1244   cursorOptions = cursorOptions || {};
1245   var FinalCursor = cursorOptions.cursorFactory || this.s.Cursor;
1246   return new FinalCursor(this.s.bson, ns, cmd, cursorOptions, this, this.s.options);
1247 }
1248
1249 /**
1250  * A replset connect event, used to verify that the connection is up and running
1251  *
1252  * @event ReplSet#connect
1253  * @type {ReplSet}
1254  */
1255
1256 /**
1257  * A replset reconnect event, used to verify that the topology reconnected
1258  *
1259  * @event ReplSet#reconnect
1260  * @type {ReplSet}
1261  */
1262
1263 /**
1264  * A replset fullsetup event, used to signal that all topology members have been contacted.
1265  *
1266  * @event ReplSet#fullsetup
1267  * @type {ReplSet}
1268  */
1269
1270 /**
1271  * A replset all event, used to signal that all topology members have been contacted.
1272  *
1273  * @event ReplSet#all
1274  * @type {ReplSet}
1275  */
1276
1277 /**
1278  * A replset failed event, used to signal that initial replset connection failed.
1279  *
1280  * @event ReplSet#failed
1281  * @type {ReplSet}
1282  */
1283
1284 /**
1285  * A server member left the replicaset
1286  *
1287  * @event ReplSet#left
1288  * @type {function}
1289  * @param {string} type The type of member that left (primary|secondary|arbiter)
1290  * @param {Server} server The server object that left
1291  */
1292
1293 /**
1294  * A server member joined the replicaset
1295  *
1296  * @event ReplSet#joined
1297  * @type {function}
1298  * @param {string} type The type of member that joined (primary|secondary|arbiter)
1299  * @param {Server} server The server object that joined
1300  */
1301
1302 /**
1303  * A server opening SDAM monitoring event
1304  *
1305  * @event ReplSet#serverOpening
1306  * @type {object}
1307  */
1308
1309 /**
1310  * A server closed SDAM monitoring event
1311  *
1312  * @event ReplSet#serverClosed
1313  * @type {object}
1314  */
1315
1316 /**
1317  * A server description SDAM change monitoring event
1318  *
1319  * @event ReplSet#serverDescriptionChanged
1320  * @type {object}
1321  */
1322
1323 /**
1324  * A topology open SDAM event
1325  *
1326  * @event ReplSet#topologyOpening
1327  * @type {object}
1328  */
1329
1330 /**
1331  * A topology closed SDAM event
1332  *
1333  * @event ReplSet#topologyClosed
1334  * @type {object}
1335  */
1336
1337 /**
1338  * A topology structure SDAM change event
1339  *
1340  * @event ReplSet#topologyDescriptionChanged
1341  * @type {object}
1342  */
1343
1344 /**
1345  * A topology serverHeartbeatStarted SDAM event
1346  *
1347  * @event ReplSet#serverHeartbeatStarted
1348  * @type {object}
1349  */
1350
1351 /**
1352  * A topology serverHeartbeatFailed SDAM event
1353  *
1354  * @event ReplSet#serverHeartbeatFailed
1355  * @type {object}
1356  */
1357
1358 /**
1359  * A topology serverHeartbeatSucceeded SDAM change event
1360  *
1361  * @event ReplSet#serverHeartbeatSucceeded
1362  * @type {object}
1363  */
1364
1365 module.exports = ReplSet;