97791bfffb2845f2f39f53fcd5a019dda8d5c4fb
[aai/esr-gui.git] /
1 "use strict";
2
3 var EventEmitter = require('events').EventEmitter
4   , inherits = require('util').inherits
5   , f = require('util').format
6   , Server = require('./server')
7   , Mongos = require('./mongos')
8   , Cursor = require('./cursor')
9   , AggregationCursor = require('./aggregation_cursor')
10   , CommandCursor = require('./command_cursor')
11   , ReadPreference = require('./read_preference')
12   , MongoCR = require('mongodb-core').MongoCR
13   , MongoError = require('mongodb-core').MongoError
14   , ServerCapabilities = require('./topology_base').ServerCapabilities
15   , Store = require('./topology_base').Store
16   , Define = require('./metadata')
17   , CServer = require('mongodb-core').Server
18   , CReplSet = require('mongodb-core').ReplSet
19   , CoreReadPreference = require('mongodb-core').ReadPreference
20   , shallowClone = require('./utils').shallowClone
21   , MAX_JS_INT = require('./utils').MAX_JS_INT
22   , translateOptions = require('./utils').translateOptions
23   , filterOptions = require('./utils').filterOptions
24   , mergeOptions = require('./utils').mergeOptions
25   , os = require('os');
26 /**
27  * @fileOverview The **ReplSet** class is a class that represents a Replicaset topology and is
28  * used to construct connections.
29  *
30  * **ReplSet Should not be used, use MongoClient.connect**
31  * @example
32  * var Db = require('mongodb').Db,
33  *   ReplSet = require('mongodb').ReplSet,
34  *   Server = require('mongodb').Server,
35  *   test = require('assert');
36  * // Connect using ReplSet
37  * var server = new Server('localhost', 27017);
38  * var db = new Db('test', new ReplSet([server]));
39  * db.open(function(err, db) {
40  *   // Get an additional db
41  *   db.close();
42  * });
43  */
44
45 // Allowed parameters
46 var legalOptionNames = ['ha', 'haInterval', 'replicaSet', 'rs_name', 'secondaryAcceptableLatencyMS'
47   , 'connectWithNoPrimary', 'poolSize', 'ssl', 'checkServerIdentity', 'sslValidate'
48   , 'sslCA', 'sslCert', 'sslKey', 'sslPass', 'socketOptions', 'bufferMaxEntries'
49   , 'store', 'auto_reconnect', 'autoReconnect', 'emitError'
50   , 'keepAlive', 'noDelay', 'connectTimeoutMS', 'socketTimeoutMS', 'strategy', 'debug'
51   , 'loggerLevel', 'logger', 'reconnectTries', 'appname', 'domainsEnabled'
52   , 'servername', 'promoteLongs', 'promoteValues', 'promoteBuffers'];
53
54 // Get package.json variable
55 var driverVersion = require(__dirname + '/../package.json').version;
56 var nodejsversion = f('Node.js %s, %s', process.version, os.endianness());
57 var type = os.type();
58 var name = process.platform;
59 var architecture = process.arch;
60 var release = os.release();
61
62 /**
63  * Creates a new ReplSet instance
64  * @class
65  * @deprecated
66  * @param {Server[]} servers A seedlist of servers participating in the replicaset.
67  * @param {object} [options=null] Optional settings.
68  * @param {booelan} [options.ha=true] Turn on high availability monitoring.
69  * @param {number} [options.haInterval=10000] Time between each replicaset status check.
70  * @param {string} [options.replicaSet] The name of the replicaset to connect to.
71  * @param {number} [options.secondaryAcceptableLatencyMS=15] Sets the range of servers to pick when using NEAREST (lowest ping ms + the latency fence, ex: range of 1 to (1 + 15) ms)
72  * @param {boolean} [options.connectWithNoPrimary=false] Sets if the driver should connect even if no primary is available
73  * @param {number} [options.poolSize=5] Number of connections in the connection pool for each server instance, set to 5 as default for legacy reasons.
74  * @param {boolean} [options.ssl=false] Use ssl connection (needs to have a mongod server with ssl support)
75  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
76  * @param {object} [options.sslValidate=true] Validate mongod server certificate against ca (needs to have a mongod server with ssl support, 2.4 or higher)
77  * @param {array} [options.sslCA=null] Array of valid certificates either as Buffers or Strings (needs to have a mongod server with ssl support, 2.4 or higher)
78  * @param {(Buffer|string)} [options.sslCert=null] String or buffer containing the certificate we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
79  * @param {(Buffer|string)} [options.sslKey=null] String or buffer containing the certificate private key we wish to present (needs to have a mongod server with ssl support, 2.4 or higher)
80  * @param {(Buffer|string)} [options.sslPass=null] String or buffer containing the certificate password (needs to have a mongod server with ssl support, 2.4 or higher)
81  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
82  * @param {object} [options.socketOptions=null] Socket options
83  * @param {boolean} [options.socketOptions.noDelay=true] TCP Socket NoDelay option.
84  * @param {number} [options.socketOptions.keepAlive=0] TCP KeepAlive on the socket with a X ms delay before start.
85  * @param {number} [options.socketOptions.connectTimeoutMS=10000] TCP Connection timeout setting
86  * @param {number} [options.socketOptions.socketTimeoutMS=0] TCP Socket timeout setting
87  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
88  * @fires ReplSet#connect
89  * @fires ReplSet#ha
90  * @fires ReplSet#joined
91  * @fires ReplSet#left
92  * @fires ReplSet#fullsetup
93  * @fires ReplSet#open
94  * @fires ReplSet#close
95  * @fires ReplSet#error
96  * @fires ReplSet#timeout
97  * @fires ReplSet#parseError
98  * @return {ReplSet} a ReplSet instance.
99  */
100 var ReplSet = function(servers, options) {
101   if(!(this instanceof ReplSet)) return new ReplSet(servers, options);
102   options = options || {};
103   var self = this;
104   // Set up event emitter
105   EventEmitter.call(this);
106
107   // Filter the options
108   options = filterOptions(options, legalOptionNames);
109
110   // Ensure all the instances are Server
111   for(var i = 0; i < servers.length; i++) {
112     if(!(servers[i] instanceof Server)) {
113       throw MongoError.create({message: "all seed list instances must be of the Server type", driver:true});
114     }
115   }
116
117   // Stored options
118   var storeOptions = {
119       force: false
120     , bufferMaxEntries: typeof options.bufferMaxEntries == 'number' ? options.bufferMaxEntries : MAX_JS_INT
121   }
122
123   // Shared global store
124   var store = options.store || new Store(self, storeOptions);
125
126   // Build seed list
127   var seedlist = servers.map(function(x) {
128     return {host: x.host, port: x.port}
129   });
130
131   // Clone options
132   var clonedOptions = mergeOptions({}, {
133     disconnectHandler: store,
134     cursorFactory: Cursor,
135     reconnect: false,
136     emitError: typeof options.emitError == 'boolean' ? options.emitError : true,
137     size: typeof options.poolSize == 'number' ? options.poolSize : 5
138   });
139
140   // Translate any SSL options and other connectivity options
141   clonedOptions = translateOptions(clonedOptions, options);
142
143   // Socket options
144   var socketOptions = options.socketOptions && Object.keys(options.socketOptions).length > 0
145     ? options.socketOptions : options;
146
147   // Translate all the options to the mongodb-core ones
148   clonedOptions = translateOptions(clonedOptions, socketOptions);
149   if(typeof clonedOptions.keepAlive == 'number') {
150     clonedOptions.keepAliveInitialDelay = clonedOptions.keepAlive;
151     clonedOptions.keepAlive = clonedOptions.keepAlive > 0;
152   }
153
154   // Client info
155   this.clientInfo = {
156     driver: {
157       name: "nodejs",
158       version: driverVersion
159     },
160     os: {
161       type: type,
162       name: name,
163       architecture: architecture,
164       version: release
165     },
166     platform: nodejsversion
167   }
168
169   // Build default client information
170   clonedOptions.clientInfo = this.clientInfo;
171   // Do we have an application specific string
172   if(options.appname) {
173     clonedOptions.clientInfo.application = { name: options.appname };
174   }
175
176   // Create the ReplSet
177   var replset = new CReplSet(seedlist, clonedOptions);
178   // Server capabilities
179   var sCapabilities = null;
180
181   // Listen to reconnect event
182   replset.on('reconnect', function() {
183     self.emit('reconnect');
184     store.execute();
185   });
186
187   // Internal state
188   this.s = {
189     // Replicaset
190     replset: replset
191     // Server capabilities
192     , sCapabilities: null
193     // Debug tag
194     , tag: options.tag
195     // Store options
196     , storeOptions: storeOptions
197     // Cloned options
198     , clonedOptions: clonedOptions
199     // Store
200     , store: store
201     // Options
202     , options: options
203   }
204
205   // Debug
206   if(clonedOptions.debug) {
207     // Last ismaster
208     Object.defineProperty(this, 'replset', {
209       enumerable:true, get: function() { return replset; }
210     });
211   }
212 }
213
214 /**
215  * @ignore
216  */
217 inherits(ReplSet, EventEmitter);
218
219 // Last ismaster
220 Object.defineProperty(ReplSet.prototype, 'isMasterDoc', {
221   enumerable:true, get: function() { return this.s.replset.lastIsMaster(); }
222 });
223
224 // BSON property
225 Object.defineProperty(ReplSet.prototype, 'bson', {
226   enumerable: true, get: function() {
227     return this.s.replset.s.bson;
228   }
229 });
230
231 Object.defineProperty(ReplSet.prototype, 'haInterval', {
232   enumerable:true, get: function() { return this.s.replset.s.haInterval; }
233 });
234
235 var define = ReplSet.define = new Define('ReplSet', ReplSet, false);
236
237 // Ensure the right read Preference object
238 var translateReadPreference = function(options) {
239   if(typeof options.readPreference == 'string') {
240     options.readPreference = new CoreReadPreference(options.readPreference);
241   } else if(options.readPreference instanceof ReadPreference) {
242     options.readPreference = new CoreReadPreference(options.readPreference.mode
243       , options.readPreference.tags, {maxStalenessMS: options.readPreference.maxStalenessMS});
244   }
245
246   return options;
247 }
248
249 // Connect method
250 ReplSet.prototype.connect = function(db, _options, callback) {
251   var self = this;
252   if('function' === typeof _options) callback = _options, _options = {};
253   if(_options == null) _options = {};
254   if(!('function' === typeof callback)) callback = null;
255   self.s.options = _options;
256
257   // Update bufferMaxEntries
258   self.s.storeOptions.bufferMaxEntries = db.bufferMaxEntries;
259
260   // Actual handler
261   var errorHandler = function(event) {
262     return function(err) {
263       if(event != 'error') {
264         self.emit(event, err);
265       }
266     }
267   }
268
269   // Connect handler
270   var connectHandler = function() {
271     // Clear out all the current handlers left over
272     ["timeout", "error", "close", 'serverOpening', 'serverDescriptionChanged', 'serverHeartbeatStarted',
273       'serverHeartbeatSucceeded', 'serverHeartbeatFailed', 'serverClosed', 'topologyOpening',
274       'topologyClosed', 'topologyDescriptionChanged'].forEach(function(e) {
275       self.s.replset.removeAllListeners(e);
276     });
277
278     // Set up listeners
279     self.s.replset.once('timeout', errorHandler('timeout'));
280     self.s.replset.once('error', errorHandler('error'));
281     self.s.replset.once('close', errorHandler('close'));
282
283     // relay the event
284     var relay = function(event) {
285       return function(t, server) {
286         self.emit(event, t, server);
287       }
288     }
289
290     // Replset events relay
291     var replsetRelay = function(event) {
292       return function(t, server) {
293         self.emit(event, t, server.lastIsMaster(), server);
294       }
295     }
296
297     // Relay ha
298     var relayHa = function(t, state) {
299       self.emit('ha', t, state);
300
301       if(t == 'start') {
302         self.emit('ha_connect', t, state);
303       } else if(t == 'end') {
304         self.emit('ha_ismaster', t, state);
305       }
306     }
307
308     // Set up serverConfig listeners
309     self.s.replset.on('joined', replsetRelay('joined'));
310     self.s.replset.on('left', relay('left'));
311     self.s.replset.on('ping', relay('ping'));
312     self.s.replset.on('ha', relayHa);
313
314     // Set up SDAM listeners
315     self.s.replset.on('serverDescriptionChanged', relay('serverDescriptionChanged'));
316     self.s.replset.on('serverHeartbeatStarted', relay('serverHeartbeatStarted'));
317     self.s.replset.on('serverHeartbeatSucceeded', relay('serverHeartbeatSucceeded'));
318     self.s.replset.on('serverHeartbeatFailed', relay('serverHeartbeatFailed'));
319     self.s.replset.on('serverOpening', relay('serverOpening'));
320     self.s.replset.on('serverClosed', relay('serverClosed'));
321     self.s.replset.on('topologyOpening', relay('topologyOpening'));
322     self.s.replset.on('topologyClosed', relay('topologyClosed'));
323     self.s.replset.on('topologyDescriptionChanged', relay('topologyDescriptionChanged'));
324
325     self.s.replset.on('fullsetup', function(topology) {
326       self.emit('fullsetup', null, self);
327     });
328
329     self.s.replset.on('all', function(topology) {
330       self.emit('all', null, self);
331     });
332
333     // Emit open event
334     self.emit('open', null, self);
335
336     // Return correctly
337     try {
338       callback(null, self);
339     } catch(err) {
340       process.nextTick(function() { throw err; })
341     }
342   }
343
344   // Error handler
345   var connectErrorHandler = function(event) {
346     return function(err) {
347       ['timeout', 'error', 'close'].forEach(function(e) {
348         self.s.replset.removeListener(e, connectErrorHandler);
349       });
350
351       self.s.replset.removeListener('connect', connectErrorHandler);
352       // Destroy the replset
353       self.s.replset.destroy();
354
355       // Try to callback
356       try {
357         callback(err);
358       } catch(err) {
359         if(!self.s.replset.isConnected())
360           process.nextTick(function() { throw err; })
361       }
362     }
363   }
364
365   // Set up listeners
366   self.s.replset.once('timeout', connectErrorHandler('timeout'));
367   self.s.replset.once('error', connectErrorHandler('error'));
368   self.s.replset.once('close', connectErrorHandler('close'));
369   self.s.replset.once('connect', connectHandler);
370
371   // Start connection
372   self.s.replset.connect(_options);
373 }
374
375 // Server capabilities
376 ReplSet.prototype.capabilities = function() {
377   if(this.s.sCapabilities) return this.s.sCapabilities;
378   if(this.s.replset.lastIsMaster() == null) return null;
379   this.s.sCapabilities = new ServerCapabilities(this.s.replset.lastIsMaster());
380   return this.s.sCapabilities;
381 }
382
383 define.classMethod('capabilities', {callback: false, promise:false, returns: [ServerCapabilities]});
384
385 // Command
386 ReplSet.prototype.command = function(ns, cmd, options, callback) {
387   options = translateReadPreference(options);
388   this.s.replset.command(ns, cmd, options, callback);
389 }
390
391 define.classMethod('command', {callback: true, promise:false});
392
393 // Insert
394 ReplSet.prototype.insert = function(ns, ops, options, callback) {
395   this.s.replset.insert(ns, ops, options, callback);
396 }
397
398 define.classMethod('insert', {callback: true, promise:false});
399
400 // Update
401 ReplSet.prototype.update = function(ns, ops, options, callback) {
402   this.s.replset.update(ns, ops, options, callback);
403 }
404
405 define.classMethod('update', {callback: true, promise:false});
406
407 // Remove
408 ReplSet.prototype.remove = function(ns, ops, options, callback) {
409   this.s.replset.remove(ns, ops, options, callback);
410 }
411
412 define.classMethod('remove', {callback: true, promise:false});
413
414 // Destroyed
415 ReplSet.prototype.isDestroyed = function() {
416   return this.s.replset.isDestroyed();
417 }
418
419 // IsConnected
420 ReplSet.prototype.isConnected = function() {
421   return this.s.replset.isConnected();
422 }
423
424 define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});
425
426 // Insert
427 ReplSet.prototype.cursor = function(ns, cmd, options) {
428   options = translateReadPreference(options);
429   options.disconnectHandler = this.s.store;
430   return this.s.replset.cursor(ns, cmd, options);
431 }
432
433 define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});
434
435 ReplSet.prototype.lastIsMaster = function() {
436   return this.s.replset.lastIsMaster();
437 }
438
439 ReplSet.prototype.close = function(forceClosed) {
440   var self = this;
441   this.s.replset.destroy();
442   // We need to wash out all stored processes
443   if(forceClosed == true) {
444     this.s.storeOptions.force = forceClosed;
445     this.s.store.flush();
446   }
447
448   var events = ['timeout', 'error', 'close', 'joined', 'left'];
449   events.forEach(function(e) {
450     self.removeAllListeners(e);
451   });
452 }
453
454 define.classMethod('close', {callback: false, promise:false});
455
456 ReplSet.prototype.auth = function() {
457   var args = Array.prototype.slice.call(arguments, 0);
458   this.s.replset.auth.apply(this.s.replset, args);
459 }
460
461 define.classMethod('auth', {callback: true, promise:false});
462
463 ReplSet.prototype.logout = function() {
464   var args = Array.prototype.slice.call(arguments, 0);
465   this.s.replset.logout.apply(this.s.replset, args);
466 }
467
468 define.classMethod('logout', {callback: true, promise:false});
469
470 /**
471  * All raw connections
472  * @method
473  * @return {array}
474  */
475 ReplSet.prototype.connections = function() {
476   return this.s.replset.connections();
477 }
478
479 define.classMethod('connections', {callback: false, promise:false, returns:[Array]});
480
481 /**
482  * A replset connect event, used to verify that the connection is up and running
483  *
484  * @event ReplSet#connect
485  * @type {ReplSet}
486  */
487
488 /**
489  * The replset high availability event
490  *
491  * @event ReplSet#ha
492  * @type {function}
493  * @param {string} type The stage in the high availability event (start|end)
494  * @param {boolean} data.norepeat This is a repeating high availability process or a single execution only
495  * @param {number} data.id The id for this high availability request
496  * @param {object} data.state An object containing the information about the current replicaset
497  */
498
499 /**
500  * A server member left the replicaset
501  *
502  * @event ReplSet#left
503  * @type {function}
504  * @param {string} type The type of member that left (primary|secondary|arbiter)
505  * @param {Server} server The server object that left
506  */
507
508 /**
509  * A server member joined the replicaset
510  *
511  * @event ReplSet#joined
512  * @type {function}
513  * @param {string} type The type of member that joined (primary|secondary|arbiter)
514  * @param {Server} server The server object that joined
515  */
516
517 /**
518  * ReplSet open event, emitted when replicaset can start processing commands.
519  *
520  * @event ReplSet#open
521  * @type {Replset}
522  */
523
524 /**
525  * ReplSet fullsetup event, emitted when all servers in the topology have been connected to.
526  *
527  * @event ReplSet#fullsetup
528  * @type {Replset}
529  */
530
531 /**
532  * ReplSet close event
533  *
534  * @event ReplSet#close
535  * @type {object}
536  */
537
538 /**
539  * ReplSet error event, emitted if there is an error listener.
540  *
541  * @event ReplSet#error
542  * @type {MongoError}
543  */
544
545 /**
546  * ReplSet timeout event
547  *
548  * @event ReplSet#timeout
549  * @type {object}
550  */
551
552 /**
553  * ReplSet parseError event
554  *
555  * @event ReplSet#parseError
556  * @type {object}
557  */
558
559 module.exports = ReplSet;