fac82b5c7832d19298f0e41a1876592f054cd6bd
[aai/esr-gui.git] /
1 "use strict"
2
3 var inherits = require('util').inherits,
4   require_optional = require('require_optional'),
5   f = require('util').format,
6   EventEmitter = require('events').EventEmitter,
7   ReadPreference = require('./read_preference'),
8   Logger = require('../connection/logger'),
9   debugOptions = require('../connection/utils').debugOptions,
10   retrieveBSON = require('../connection/utils').retrieveBSON,
11   Pool = require('../connection/pool'),
12   Query = require('../connection/commands').Query,
13   MongoError = require('../error'),
14   PreTwoSixWireProtocolSupport = require('../wireprotocol/2_4_support'),
15   TwoSixWireProtocolSupport = require('../wireprotocol/2_6_support'),
16   ThreeTwoWireProtocolSupport = require('../wireprotocol/3_2_support'),
17   BasicCursor = require('../cursor'),
18   sdam = require('./shared'),
19   assign = require('./shared').assign,
20   createClientInfo = require('./shared').createClientInfo;
21
22 // Used for filtering out fields for loggin
23 var debugFields = ['reconnect', 'reconnectTries', 'reconnectInterval', 'emitError', 'cursorFactory', 'host'
24   , 'port', 'size', 'keepAlive', 'keepAliveInitialDelay', 'noDelay', 'connectionTimeout', 'checkServerIdentity'
25   , 'socketTimeout', 'singleBufferSerializtion', 'ssl', 'ca', 'cert', 'key', 'rejectUnauthorized', 'promoteLongs', 'promoteValues'
26   , 'promoteBuffers', 'servername'];
27
28 // Server instance id
29 var id = 0;
30 var serverAccounting = false;
31 var servers = {};
32 var BSON = retrieveBSON();
33
34 /**
35  * Creates a new Server instance
36  * @class
37  * @param {boolean} [options.reconnect=true] Server will attempt to reconnect on loss of connection
38  * @param {number} [options.reconnectTries=30] Server attempt to reconnect #times
39  * @param {number} [options.reconnectInterval=1000] Server will wait # milliseconds between retries
40  * @param {number} [options.monitoring=true] Enable the server state monitoring (calling ismaster at monitoringInterval)
41  * @param {number} [options.monitoringInterval=5000] The interval of calling ismaster when monitoring is enabled.
42  * @param {Cursor} [options.cursorFactory=Cursor] The cursor factory class used for all query cursors
43  * @param {string} options.host The server host
44  * @param {number} options.port The server port
45  * @param {number} [options.size=5] Server connection pool size
46  * @param {boolean} [options.keepAlive=true] TCP Connection keep alive enabled
47  * @param {number} [options.keepAliveInitialDelay=0] Initial delay before TCP keep alive enabled
48  * @param {boolean} [options.noDelay=true] TCP Connection no delay
49  * @param {number} [options.connectionTimeout=0] TCP Connection timeout setting
50  * @param {number} [options.socketTimeout=0] TCP Socket timeout setting
51  * @param {boolean} [options.ssl=false] Use SSL for connection
52  * @param {boolean|function} [options.checkServerIdentity=true] Ensure we check server identify during SSL, set to false to disable checking. Only works for Node 0.12.x or higher. You can pass in a boolean or your own checkServerIdentity override function.
53  * @param {Buffer} [options.ca] SSL Certificate store binary buffer
54  * @param {Buffer} [options.cert] SSL Certificate binary buffer
55  * @param {Buffer} [options.key] SSL Key file binary buffer
56  * @param {string} [options.passphrase] SSL Certificate pass phrase
57  * @param {boolean} [options.rejectUnauthorized=true] Reject unauthorized server certificates
58  * @param {string} [options.servername=null] String containing the server name requested via TLS SNI.
59  * @param {boolean} [options.promoteLongs=true] Convert Long values from the db into Numbers if they fit into 53 bits
60  * @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
61  * @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
62  * @param {string} [options.appname=null] Application name, passed in on ismaster call and logged in mongod server logs. Maximum size 128 bytes.
63  * @param {boolean} [options.domainsEnabled=false] Enable the wrapping of the callback in the current domain, disabled by default to avoid perf hit.
64  * @return {Server} A cursor instance
65  * @fires Server#connect
66  * @fires Server#close
67  * @fires Server#error
68  * @fires Server#timeout
69  * @fires Server#parseError
70  * @fires Server#reconnect
71  * @fires Server#reconnectFailed
72  * @fires Server#serverHeartbeatStarted
73  * @fires Server#serverHeartbeatSucceeded
74  * @fires Server#serverHeartbeatFailed
75  * @fires Server#topologyOpening
76  * @fires Server#topologyClosed
77  * @fires Server#topologyDescriptionChanged
78  * @property {string} type the topology type.
79  * @property {string} parserType the parser type used (c++ or js).
80  */
81 var Server = function(options) {
82   options = options || {};
83
84   // Add event listener
85   EventEmitter.call(this);
86
87   // Server instance id
88   this.id = id++;
89
90   // Internal state
91   this.s = {
92     // Options
93     options: options,
94     // Logger
95     logger: Logger('Server', options),
96     // Factory overrides
97     Cursor: options.cursorFactory || BasicCursor,
98     // BSON instance
99     bson: options.bson || new BSON([BSON.Binary, BSON.Code, BSON.DBRef, BSON.Decimal128,
100       BSON.Double, BSON.Int32, BSON.Long, BSON.Map, BSON.MaxKey, BSON.MinKey,
101       BSON.ObjectId, BSON.BSONRegExp, BSON.Symbol, BSON.Timestamp]),
102     // Pool
103     pool: null,
104     // Disconnect handler
105     disconnectHandler: options.disconnectHandler,
106     // Monitor thread (keeps the connection alive)
107     monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : true,
108     // Is the server in a topology
109     inTopology: typeof options.inTopology == 'boolean' ? options.inTopology : false,
110     // Monitoring timeout
111     monitoringInterval: typeof options.monitoringInterval == 'number'
112       ? options.monitoringInterval
113       : 5000,
114     // Topology id
115     topologyId: -1
116   }
117
118   // Curent ismaster
119   this.ismaster = null;
120   // Current ping time
121   this.lastIsMasterMS = -1;
122   // The monitoringProcessId
123   this.monitoringProcessId = null;
124   // Initial connection
125   this.initalConnect = true;
126   // Wire protocol handler, default to oldest known protocol handler
127   // this gets changed when the first ismaster is called.
128   this.wireProtocolHandler = new PreTwoSixWireProtocolSupport();
129   // Default type
130   this._type = 'server';
131   // Set the client info
132   this.clientInfo = createClientInfo(options);
133
134   // Max Stalleness values
135   // last time we updated the ismaster state
136   this.lastUpdateTime = 0;
137   // Last write time
138   this.lastWriteDate = 0;
139   // Stalleness
140   this.staleness = 0;
141 }
142
143 inherits(Server, EventEmitter);
144
145 Object.defineProperty(Server.prototype, 'type', {
146   enumerable:true, get: function() { return this._type; }
147 });
148
149 Object.defineProperty(Server.prototype, 'parserType', {
150   enumerable:true, get: function() {
151     return BSON.native ? "c++" : "js";
152   }
153 });
154
155 Server.enableServerAccounting = function() {
156   serverAccounting = true;
157   servers = {};
158 }
159
160 Server.disableServerAccounting = function() {
161   serverAccounting = false;
162 }
163
164 Server.servers = function() {
165   return servers;
166 }
167
168 Object.defineProperty(Server.prototype, 'name', {
169   enumerable:true,
170   get: function() { return this.s.options.host + ":" + this.s.options.port; }
171 });
172
173 function configureWireProtocolHandler(self, ismaster) {
174   // 3.2 wire protocol handler
175   if(ismaster.maxWireVersion >= 4) {
176     return new ThreeTwoWireProtocolSupport(new TwoSixWireProtocolSupport());
177   }
178
179   // 2.6 wire protocol handler
180   if(ismaster.maxWireVersion >= 2) {
181     return new TwoSixWireProtocolSupport();
182   }
183
184   // 2.4 or earlier wire protocol handler
185   return new PreTwoSixWireProtocolSupport();
186 }
187
188 function disconnectHandler(self, type, ns, cmd, options, callback) {
189   // Topology is not connected, save the call in the provided store to be
190   // Executed at some point when the handler deems it's reconnected
191   if(!self.s.pool.isConnected() && self.s.disconnectHandler != null && !options.monitoring) {
192     self.s.disconnectHandler.add(type, ns, cmd, options, callback);
193     return true;
194   }
195
196   // If we have no connection error
197   if(!self.s.pool.isConnected()) {
198     callback(MongoError.create(f("no connection available to server %s", self.name)));
199     return true;
200   }
201 }
202
203 function monitoringProcess(self) {
204   return function() {
205     // Pool was destroyed do not continue process
206     if(self.s.pool.isDestroyed()) return;
207     // Emit monitoring Process event
208     self.emit('monitoring', self);
209     // Perform ismaster call
210     // Query options
211     var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
212     // Create a query instance
213     var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true}, queryOptions);
214     // Get start time
215     var start = new Date().getTime();
216     // Execute the ismaster query
217     self.s.pool.write(query, {
218       socketTimeout: self.s.options.connectionTimeout || 2000,
219     }, function(err, result) {
220       // Set initial lastIsMasterMS
221       self.lastIsMasterMS = new Date().getTime() - start;
222       if(self.s.pool.isDestroyed()) return;
223       // Update the ismaster view if we have a result
224       if(result) {
225         self.ismaster = result.result;
226       }
227       // Re-schedule the monitoring process
228       self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
229     });
230   }
231 }
232
233 var eventHandler = function(self, event) {
234   return function(err) {
235     // Log information of received information if in info mode
236     if(self.s.logger.isInfo()) {
237       var object = err instanceof MongoError ? JSON.stringify(err) : {}
238       self.s.logger.info(f('server %s fired event %s out with message %s'
239         , self.name, event, object));
240     }
241
242     // Handle connect event
243     if(event == 'connect') {
244       // Issue an ismaster command at connect
245       // Query options
246       var queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
247       // Create a query instance
248       var query = new Query(self.s.bson, 'admin.$cmd', {ismaster:true, client: self.clientInfo}, queryOptions);
249       // Get start time
250       var start = new Date().getTime();
251       // Execute the ismaster query
252       self.s.pool.write(query, {
253         socketTimeout: self.s.options.connectionTimeout || 2000,
254       }, function(err, result) {
255         // Set initial lastIsMasterMS
256         self.lastIsMasterMS = new Date().getTime() - start;
257         if(err) {
258           self.destroy();
259           if(self.listeners('error').length > 0) self.emit('error', err);
260           return;
261         }
262
263         // Ensure no error emitted after initial connect when reconnecting
264         self.initalConnect = false;
265         // Save the ismaster
266         self.ismaster = result.result;
267
268         // It's a proxy change the type so
269         // the wireprotocol will send $readPreference
270         if(self.ismaster.msg == 'isdbgrid') {
271           self._type = 'mongos';
272         }
273         // Add the correct wire protocol handler
274         self.wireProtocolHandler = configureWireProtocolHandler(self, self.ismaster);
275         // Have we defined self monitoring
276         if(self.s.monitoring) {
277           self.monitoringProcessId = setTimeout(monitoringProcess(self), self.s.monitoringInterval);
278         }
279
280         // Emit server description changed if something listening
281         sdam.emitServerDescriptionChanged(self, {
282           address: self.name, arbiters: [], hosts: [], passives: [], type: !self.s.inTopology ? 'Standalone' : sdam.getTopologyType(self)
283         });
284
285         // Emit topology description changed if something listening
286         sdam.emitTopologyDescriptionChanged(self, {
287           topologyType: 'Single', servers: [{address: self.name, arbiters: [], hosts: [], passives: [], type: 'Standalone'}]
288         });
289
290         // Log the ismaster if available
291         if(self.s.logger.isInfo()) {
292           self.s.logger.info(f('server %s connected with ismaster [%s]', self.name, JSON.stringify(self.ismaster)));
293         }
294
295         // Emit connect
296         self.emit('connect', self);
297       });
298     } else if(event == 'error' || event == 'parseError'
299       || event == 'close' || event == 'timeout' || event == 'reconnect'
300       || event == 'attemptReconnect' || 'reconnectFailed') {
301       // Remove server instance from accounting
302       if(serverAccounting && ['close', 'timeout', 'error', 'parseError', 'reconnectFailed'].indexOf(event) != -1) {
303         // Emit toplogy opening event if not in topology
304         if(!self.s.inTopology) {
305           self.emit('topologyOpening', { topologyId: self.id });
306         }
307
308         delete servers[self.id];
309       }
310
311       // Reconnect failed return error
312       if(event == 'reconnectFailed') {
313         self.emit('reconnectFailed', err);
314         // Emit error if any listeners
315         if(self.listeners('error').length > 0) {
316           self.emit('error', err);
317         }
318         // Terminate
319         return;
320       }
321
322       // On first connect fail
323       if(self.s.pool.state == 'disconnected' && self.initalConnect && ['close', 'timeout', 'error', 'parseError'].indexOf(event) != -1) {
324         self.initalConnect = false;
325         return self.emit('error', new MongoError(f('failed to connect to server [%s] on first connect', self.name)));
326       }
327
328       // Reconnect event, emit the server
329       if(event == 'reconnect') {
330         return self.emit(event, self);
331       }
332
333       // Emit the event
334       self.emit(event, err);
335     }
336   }
337 }
338
339 /**
340  * Initiate server connect
341  * @method
342  * @param {array} [options.auth=null] Array of auth options to apply on connect
343  */
344 Server.prototype.connect = function(options) {
345   var self = this;
346   options = options || {};
347
348   // Set the connections
349   if(serverAccounting) servers[this.id] = this;
350
351   // Do not allow connect to be called on anything that's not disconnected
352   if(self.s.pool && !self.s.pool.isDisconnected() && !self.s.pool.isDestroyed()) {
353     throw MongoError.create(f('server instance in invalid state %s', self.s.state));
354   }
355
356   // Create a pool
357   self.s.pool = new Pool(assign(self.s.options, options, {bson: this.s.bson}));
358
359   // Set up listeners
360   self.s.pool.on('close', eventHandler(self, 'close'));
361   self.s.pool.on('error', eventHandler(self, 'error'));
362   self.s.pool.on('timeout', eventHandler(self, 'timeout'));
363   self.s.pool.on('parseError', eventHandler(self, 'parseError'));
364   self.s.pool.on('connect', eventHandler(self, 'connect'));
365   self.s.pool.on('reconnect', eventHandler(self, 'reconnect'));
366   self.s.pool.on('reconnectFailed', eventHandler(self, 'reconnectFailed'));
367
368   // Emit toplogy opening event if not in topology
369   if(!self.s.inTopology) {
370     this.emit('topologyOpening', { topologyId: self.id });
371   }
372
373   // Emit opening server event
374   self.emit('serverOpening', {
375     topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id,
376     address: self.name
377   });
378
379   // Connect with optional auth settings
380   if(options.auth) {
381     self.s.pool.connect.apply(self.s.pool, options.auth);
382   } else {
383     self.s.pool.connect();
384   }
385 }
386
387 /**
388  * Get the server description
389  * @method
390  * @return {object}
391 */
392 Server.prototype.getDescription = function() {
393   var ismaster = this.ismaster || {};
394   var description = {
395     type: sdam.getTopologyType(this),
396     address: this.name,
397   };
398
399   // Add fields if available
400   if(ismaster.hosts) description.hosts = ismaster.hosts;
401   if(ismaster.arbiters) description.arbiters = ismaster.arbiters;
402   if(ismaster.passives) description.passives = ismaster.passives;
403   if(ismaster.setName) description.setName = ismaster.setName;
404   return description;
405 }
406
407 /**
408  * Returns the last known ismaster document for this server
409  * @method
410  * @return {object}
411  */
412 Server.prototype.lastIsMaster = function() {
413   return this.ismaster;
414 }
415
416 /**
417  * Unref all connections belong to this server
418  * @method
419  */
420 Server.prototype.unref = function() {
421   this.s.pool.unref();
422 }
423
424 /**
425  * Figure out if the server is connected
426  * @method
427  * @return {boolean}
428  */
429 Server.prototype.isConnected = function() {
430   if(!this.s.pool) return false;
431   return this.s.pool.isConnected();
432 }
433
434 /**
435  * Figure out if the server instance was destroyed by calling destroy
436  * @method
437  * @return {boolean}
438  */
439 Server.prototype.isDestroyed = function() {
440   if(!this.s.pool) return false;
441   return this.s.pool.isDestroyed();
442 }
443
444 function basicWriteValidations(self) {
445   if(!self.s.pool) return MongoError.create('server instance is not connected');
446   if(self.s.pool.isDestroyed()) return MongoError.create('server instance pool was destroyed');
447 }
448
449 function basicReadValidations(self, options) {
450   basicWriteValidations(self, options);
451
452   if(options.readPreference && !(options.readPreference instanceof ReadPreference)) {
453     throw new Error("readPreference must be an instance of ReadPreference");
454   }
455 }
456
457 /**
458  * Execute a command
459  * @method
460  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
461  * @param {object} cmd The command hash
462  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
463  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
464  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
465  * @param {Boolean} [options.fullResult=false] Return the full envelope instead of just the result document.
466  * @param {opResultCallback} callback A callback function
467  */
468 Server.prototype.command = function(ns, cmd, options, callback) {
469   var self = this;
470   if(typeof options == 'function') callback = options, options = {}, options = options || {};
471   var result = basicReadValidations(self, options);
472   if(result) return callback(result);
473
474   // Debug log
475   if(self.s.logger.isDebug()) self.s.logger.debug(f('executing command [%s] against %s', JSON.stringify({
476     ns: ns, cmd: cmd, options: debugOptions(debugFields, options)
477   }), self.name));
478
479   // If we are not connected or have a disconnectHandler specified
480   if(disconnectHandler(self, 'command', ns, cmd, options, callback)) return;
481
482   // Check if we have collation support
483   if(this.ismaster && this.ismaster.maxWireVersion < 5 && cmd.collation) {
484     return callback(new MongoError(f('server %s does not support collation', this.name)));
485   }
486
487   // Query options
488   var queryOptions = {
489     numberToSkip: 0,
490     numberToReturn: -1,
491     checkKeys: typeof options.checkKeys == 'boolean' ? options.checkKeys: false,
492     serializeFunctions: typeof options.serializeFunctions == 'boolean' ? options.serializeFunctions : false,
493     ignoreUndefined: typeof options.ignoreUndefined == 'boolean' ? options.ignoreUndefined : false
494   };
495
496   // Create a query instance
497   var query = new Query(self.s.bson, ns, cmd, queryOptions);
498   // Set slave OK of the query
499   query.slaveOk = options.readPreference ? options.readPreference.slaveOk() : false;
500
501   // Write options
502   var writeOptions = {
503     raw: typeof options.raw == 'boolean' ? options.raw : false,
504     promoteLongs: typeof options.promoteLongs == 'boolean' ? options.promoteLongs : true,
505     promoteValues: typeof options.promoteValues == 'boolean' ? options.promoteValues : true,
506     promoteBuffers: typeof options.promoteBuffers == 'boolean' ? options.promoteBuffers : false,
507     command: true,
508     monitoring: typeof options.monitoring == 'boolean' ? options.monitoring : false,
509     fullResult: typeof options.fullResult == 'boolean' ? options.fullResult : false,
510     requestId: query.requestId,
511     socketTimeout: typeof options.socketTimeout == 'number' ? options.socketTimeout : null,
512   };
513
514   // Write the operation to the pool
515   self.s.pool.write(query, writeOptions, callback);
516 }
517
518 /**
519  * Insert one or more documents
520  * @method
521  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
522  * @param {array} ops An array of documents to insert
523  * @param {boolean} [options.ordered=true] Execute in order or out of order
524  * @param {object} [options.writeConcern={}] Write concern for the operation
525  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
526  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
527  * @param {opResultCallback} callback A callback function
528  */
529 Server.prototype.insert = function(ns, ops, options, callback) {
530   var self = this;
531   if(typeof options == 'function') callback = options, options = {}, options = options || {};
532   var result = basicWriteValidations(self, options);
533   if(result) return callback(result);
534
535   // If we are not connected or have a disconnectHandler specified
536   if(disconnectHandler(self, 'insert', ns, ops, options, callback)) return;
537
538   // Setup the docs as an array
539   ops = Array.isArray(ops) ? ops : [ops];
540
541   // Execute write
542   return self.wireProtocolHandler.insert(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
543 }
544
545 /**
546  * Perform one or more update operations
547  * @method
548  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
549  * @param {array} ops An array of updates
550  * @param {boolean} [options.ordered=true] Execute in order or out of order
551  * @param {object} [options.writeConcern={}] Write concern for the operation
552  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
553  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
554  * @param {opResultCallback} callback A callback function
555  */
556 Server.prototype.update = function(ns, ops, options, callback) {
557   var self = this;
558   if(typeof options == 'function') callback = options, options = {}, options = options || {};
559   var result = basicWriteValidations(self, options);
560   if(result) return callback(result);
561
562   // If we are not connected or have a disconnectHandler specified
563   if(disconnectHandler(self, 'update', ns, ops, options, callback)) return;
564
565   // Check if we have collation support
566   if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) {
567     return callback(new MongoError(f('server %s does not support collation', this.name)));
568   }
569
570   // Setup the docs as an array
571   ops = Array.isArray(ops) ? ops : [ops];
572   // Execute write
573   return self.wireProtocolHandler.update(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
574 }
575
576 /**
577  * Perform one or more remove operations
578  * @method
579  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
580  * @param {array} ops An array of removes
581  * @param {boolean} [options.ordered=true] Execute in order or out of order
582  * @param {object} [options.writeConcern={}] Write concern for the operation
583  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
584  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
585  * @param {opResultCallback} callback A callback function
586  */
587 Server.prototype.remove = function(ns, ops, options, callback) {
588   var self = this;
589   if(typeof options == 'function') callback = options, options = {}, options = options || {};
590   var result = basicWriteValidations(self, options);
591   if(result) return callback(result);
592
593   // If we are not connected or have a disconnectHandler specified
594   if(disconnectHandler(self, 'remove', ns, ops, options, callback)) return;
595
596   // Check if we have collation support
597   if(this.ismaster && this.ismaster.maxWireVersion < 5 && options.collation) {
598     return callback(new MongoError(f('server %s does not support collation', this.name)));
599   }
600
601   // Setup the docs as an array
602   ops = Array.isArray(ops) ? ops : [ops];
603   // Execute write
604   return self.wireProtocolHandler.remove(self.s.pool, self.ismaster, ns, self.s.bson, ops, options, callback);
605 }
606
607 /**
608  * Get a new cursor
609  * @method
610  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
611  * @param {{object}|{Long}} cmd Can be either a command returning a cursor or a cursorId
612  * @param {object} [options.batchSize=0] Batchsize for the operation
613  * @param {array} [options.documents=[]] Initial documents list for cursor
614  * @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
615  * @param {Boolean} [options.serializeFunctions=false] Specify if functions on an object should be serialized.
616  * @param {Boolean} [options.ignoreUndefined=false] Specify if the BSON serializer should ignore undefined fields.
617  * @param {opResultCallback} callback A callback function
618  */
619 Server.prototype.cursor = function(ns, cmd, cursorOptions) {
620   var s = this.s;
621   cursorOptions = cursorOptions || {};
622   // Set up final cursor type
623   var FinalCursor = cursorOptions.cursorFactory || s.Cursor;
624   // Return the cursor
625   return new FinalCursor(s.bson, ns, cmd, cursorOptions, this, s.options);
626 }
627
628 /**
629  * Logout from a database
630  * @method
631  * @param {string} db The db we are logging out from
632  * @param {authResultCallback} callback A callback function
633  */
634 Server.prototype.logout = function(dbName, callback) {
635   this.s.pool.logout(dbName, callback);
636 }
637
638 /**
639  * Authenticate using a specified mechanism
640  * @method
641  * @param {string} mechanism The Auth mechanism we are invoking
642  * @param {string} db The db we are invoking the mechanism against
643  * @param {...object} param Parameters for the specific mechanism
644  * @param {authResultCallback} callback A callback function
645  */
646 Server.prototype.auth = function(mechanism, db) {
647   var self = this;
648
649   // If we have the default mechanism we pick mechanism based on the wire
650   // protocol max version. If it's >= 3 then scram-sha1 otherwise mongodb-cr
651   if(mechanism == 'default' && self.ismaster && self.ismaster.maxWireVersion >= 3) {
652     mechanism = 'scram-sha-1';
653   } else if(mechanism == 'default') {
654     mechanism = 'mongocr';
655   }
656
657   // Slice all the arguments off
658   var args = Array.prototype.slice.call(arguments, 0);
659   // Set the mechanism
660   args[0] = mechanism;
661   // Get the callback
662   var callback = args[args.length - 1];
663
664   // If we are not connected or have a disconnectHandler specified
665   if(disconnectHandler(self, 'auth', db, args, {}, callback)) {
666     return;
667   }
668
669   // Do not authenticate if we are an arbiter
670   if(this.lastIsMaster() && this.lastIsMaster().arbiterOnly) {
671     return callback(null, true);
672   }
673
674   // Apply the arguments to the pool
675   self.s.pool.auth.apply(self.s.pool, args);
676 }
677
678 /**
679  * Compare two server instances
680  * @method
681  * @param {Server} server Server to compare equality against
682  * @return {boolean}
683  */
684 Server.prototype.equals = function(server) {
685   if(typeof server == 'string') return this.name == server;
686   if(server.name) return this.name == server.name;
687   return false;
688 }
689
690 /**
691  * All raw connections
692  * @method
693  * @return {Connection[]}
694  */
695 Server.prototype.connections = function() {
696   return this.s.pool.allConnections();
697 }
698
699 /**
700  * Get server
701  * @method
702  * @return {Server}
703  */
704 Server.prototype.getServer = function() {
705   return this;
706 }
707
708 /**
709  * Get connection
710  * @method
711  * @return {Connection}
712  */
713 Server.prototype.getConnection = function() {
714   return this.s.pool.get();
715 }
716
717 var listeners = ['close', 'error', 'timeout', 'parseError', 'connect'];
718
719 /**
720  * Destroy the server connection
721  * @method
722  * @param {boolean} [options.emitClose=false] Emit close event on destroy
723  * @param {boolean} [options.emitDestroy=false] Emit destroy event on destroy
724  * @param {boolean} [options.force=false] Force destroy the pool
725  */
726 Server.prototype.destroy = function(options) {
727   options = options || {};
728   var self = this;
729
730   // Set the connections
731   if(serverAccounting) delete servers[this.id];
732
733   // Destroy the monitoring process if any
734   if(this.monitoringProcessId) {
735     clearTimeout(this.monitoringProcessId);
736   }
737
738   // Emit close event
739   if(options.emitClose) {
740     self.emit('close', self);
741   }
742
743   // Emit destroy event
744   if(options.emitDestroy) {
745     self.emit('destroy', self);
746   }
747
748   // Remove all listeners
749   listeners.forEach(function(event) {
750     self.s.pool.removeAllListeners(event);
751   });
752
753   // Emit opening server event
754   if(self.listeners('serverClosed').length > 0) self.emit('serverClosed', {
755     topologyId: self.s.topologyId != -1 ? self.s.topologyId : self.id, address: self.name
756   });
757
758   // Emit toplogy opening event if not in topology
759   if(self.listeners('topologyClosed').length > 0 && !self.s.inTopology) {
760     self.emit('topologyClosed', { topologyId: self.id });
761   }
762
763   if(self.s.logger.isDebug()) {
764     self.s.logger.debug(f('destroy called on server %s', self.name));
765   }
766
767   // Destroy the pool
768   this.s.pool.destroy(options.force);
769 }
770
771 /**
772  * A server connect event, used to verify that the connection is up and running
773  *
774  * @event Server#connect
775  * @type {Server}
776  */
777
778 /**
779  * A server reconnect event, used to verify that the server topology has reconnected
780  *
781  * @event Server#reconnect
782  * @type {Server}
783  */
784
785 /**
786  * A server opening SDAM monitoring event
787  *
788  * @event Server#serverOpening
789  * @type {object}
790  */
791
792 /**
793  * A server closed SDAM monitoring event
794  *
795  * @event Server#serverClosed
796  * @type {object}
797  */
798
799 /**
800  * A server description SDAM change monitoring event
801  *
802  * @event Server#serverDescriptionChanged
803  * @type {object}
804  */
805
806 /**
807  * A topology open SDAM event
808  *
809  * @event Server#topologyOpening
810  * @type {object}
811  */
812
813 /**
814  * A topology closed SDAM event
815  *
816  * @event Server#topologyClosed
817  * @type {object}
818  */
819
820 /**
821  * A topology structure SDAM change event
822  *
823  * @event Server#topologyDescriptionChanged
824  * @type {object}
825  */
826
827 /**
828  * Server reconnect failed
829  *
830  * @event Server#reconnectFailed
831  * @type {Error}
832  */
833
834 /**
835  * Server connection pool closed
836  *
837  * @event Server#close
838  * @type {object}
839  */
840
841 /**
842  * Server connection pool caused an error
843  *
844  * @event Server#error
845  * @type {Error}
846  */
847
848 /**
849  * Server destroyed was called
850  *
851  * @event Server#destroy
852  * @type {Server}
853  */
854
855 module.exports = Server;