9d170d9000361d3c633511191edd45a661a44af3
[aai/esr-gui.git] /
1 "use strict";
2
3 var Logger = require('./connection/logger')
4   , retrieveBSON = require('./connection/utils').retrieveBSON
5   , MongoError = require('./error')
6   , f = require('util').format;
7
8 var BSON = retrieveBSON(),
9   Long = BSON.Long;
10
11 /**
12  * This is a cursor results callback
13  *
14  * @callback resultCallback
15  * @param {error} error An error object. Set to null if no error present
16  * @param {object} document
17  */
18
19 /**
20  * @fileOverview The **Cursor** class is an internal class that embodies a cursor on MongoDB
21  * allowing for iteration over the results returned from the underlying query.
22  *
23  * **CURSORS Cannot directly be instantiated**
24  * @example
25  * var Server = require('mongodb-core').Server
26  *   , ReadPreference = require('mongodb-core').ReadPreference
27  *   , assert = require('assert');
28  *
29  * var server = new Server({host: 'localhost', port: 27017});
30  * // Wait for the connection event
31  * server.on('connect', function(server) {
32  *   assert.equal(null, err);
33  *
34  *   // Execute the write
35  *   var cursor = _server.cursor('integration_tests.inserts_example4', {
36  *       find: 'integration_tests.example4'
37  *     , query: {a:1}
38  *   }, {
39  *     readPreference: new ReadPreference('secondary');
40  *   });
41  *
42  *   // Get the first document
43  *   cursor.next(function(err, doc) {
44  *     assert.equal(null, err);
45  *     server.destroy();
46  *   });
47  * });
48  *
49  * // Start connecting
50  * server.connect();
51  */
52
53 /**
54  * Creates a new Cursor, not to be used directly
55  * @class
56  * @param {object} bson An instance of the BSON parser
57  * @param {string} ns The MongoDB fully qualified namespace (ex: db1.collection1)
58  * @param {{object}|Long} cmd The selector (can be a command or a cursorId)
59  * @param {object} [options=null] Optional settings.
60  * @param {object} [options.batchSize=1000] Batchsize for the operation
61  * @param {array} [options.documents=[]] Initial documents list for cursor
62  * @param {object} [options.transforms=null] Transform methods for the cursor results
63  * @param {function} [options.transforms.query] Transform the value returned from the initial query
64  * @param {function} [options.transforms.doc] Transform each document returned from Cursor.prototype.next
65  * @param {object} topology The server topology instance.
66  * @param {object} topologyOptions The server topology options.
67  * @return {Cursor} A cursor instance
68  * @property {number} cursorBatchSize The current cursorBatchSize for the cursor
69  * @property {number} cursorLimit The current cursorLimit for the cursor
70  * @property {number} cursorSkip The current cursorSkip for the cursor
71  */
72 var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
73   options = options || {};
74
75   // Cursor pool
76   this.pool = null;
77   // Cursor server
78   this.server = null;
79
80   // Do we have a not connected handler
81   this.disconnectHandler = options.disconnectHandler;
82
83   // Set local values
84   this.bson = bson;
85   this.ns = ns;
86   this.cmd = cmd;
87   this.options = options;
88   this.topology = topology;
89
90   // All internal state
91   this.cursorState = {
92       cursorId: null
93     , cmd: cmd
94     , documents: options.documents || []
95     , cursorIndex: 0
96     , dead: false
97     , killed: false
98     , init: false
99     , notified: false
100     , limit: options.limit || cmd.limit || 0
101     , skip: options.skip || cmd.skip || 0
102     , batchSize: options.batchSize || cmd.batchSize || 1000
103     , currentLimit: 0
104     // Result field name if not a cursor (contains the array of results)
105     , transforms: options.transforms
106   }
107
108   // Add promoteLong to cursor state
109   if(typeof topologyOptions.promoteLongs == 'boolean') {
110     this.cursorState.promoteLongs = topologyOptions.promoteLongs;
111   } else if(typeof options.promoteLongs == 'boolean') {
112     this.cursorState.promoteLongs = options.promoteLongs;
113   }
114
115   // Add promoteValues to cursor state
116   if(typeof topologyOptions.promoteValues == 'boolean') {
117     this.cursorState.promoteValues = topologyOptions.promoteValues;
118   } else if(typeof options.promoteValues == 'boolean') {
119     this.cursorState.promoteValues = options.promoteValues;
120   }
121
122   // Add promoteBuffers to cursor state
123   if(typeof topologyOptions.promoteBuffers == 'boolean') {
124     this.cursorState.promoteBuffers = topologyOptions.promoteBuffers;
125   } else if(typeof options.promoteBuffers == 'boolean') {
126     this.cursorState.promoteBuffers = options.promoteBuffers;
127   }
128
129   // Logger
130   this.logger = Logger('Cursor', topologyOptions);
131
132   //
133   // Did we pass in a cursor id
134   if(typeof cmd == 'number') {
135     this.cursorState.cursorId = Long.fromNumber(cmd);
136     this.cursorState.lastCursorId = this.cursorState.cursorId;
137   } else if(cmd instanceof Long) {
138     this.cursorState.cursorId = cmd;
139     this.cursorState.lastCursorId = cmd;
140   }
141 }
142
143 Cursor.prototype.setCursorBatchSize = function(value) {
144   this.cursorState.batchSize = value;
145 }
146
147 Cursor.prototype.cursorBatchSize = function() {
148   return this.cursorState.batchSize;
149 }
150
151 Cursor.prototype.setCursorLimit = function(value) {
152   this.cursorState.limit = value;
153 }
154
155 Cursor.prototype.cursorLimit = function() {
156   return this.cursorState.limit;
157 }
158
159 Cursor.prototype.setCursorSkip = function(value) {
160   this.cursorState.skip = value;
161 }
162
163 Cursor.prototype.cursorSkip = function() {
164   return this.cursorState.skip;
165 }
166
167 //
168 // Handle callback (including any exceptions thrown)
169 var handleCallback = function(callback, err, result) {
170   try {
171     callback(err, result);
172   } catch(err) {
173     process.nextTick(function() {
174       throw err;
175     });
176   }
177 }
178
179 // Internal methods
180 Cursor.prototype._find = function(callback) {
181   var self = this;
182
183   if(self.logger.isDebug()) {
184     self.logger.debug(f('issue initial query [%s] with flags [%s]'
185       , JSON.stringify(self.cmd)
186       , JSON.stringify(self.query)));
187   }
188
189   var queryCallback = function(err, r) {
190     if(err) return callback(err);
191
192     // Get the raw message
193     var result = r.message;
194
195     // Query failure bit set
196     if(result.queryFailure) {
197       return callback(MongoError.create(result.documents[0]), null);
198     }
199
200     // Check if we have a command cursor
201     if(Array.isArray(result.documents) && result.documents.length == 1
202       && (!self.cmd.find || (self.cmd.find && self.cmd.virtual == false))
203       && (result.documents[0].cursor != 'string'
204         || result.documents[0]['$err']
205         || result.documents[0]['errmsg']
206         || Array.isArray(result.documents[0].result))
207       ) {
208
209       // We have a an error document return the error
210       if(result.documents[0]['$err']
211         || result.documents[0]['errmsg']) {
212         return callback(MongoError.create(result.documents[0]), null);
213       }
214
215       // We have a cursor document
216       if(result.documents[0].cursor != null
217         && typeof result.documents[0].cursor != 'string') {
218           var id = result.documents[0].cursor.id;
219           // If we have a namespace change set the new namespace for getmores
220           if(result.documents[0].cursor.ns) {
221             self.ns = result.documents[0].cursor.ns;
222           }
223           // Promote id to long if needed
224           self.cursorState.cursorId = typeof id == 'number' ? Long.fromNumber(id) : id;
225           self.cursorState.lastCursorId = self.cursorState.cursorId;
226           // If we have a firstBatch set it
227           if(Array.isArray(result.documents[0].cursor.firstBatch)) {
228             self.cursorState.documents = result.documents[0].cursor.firstBatch;//.reverse();
229           }
230
231           // Return after processing command cursor
232           return callback(null, null);
233       }
234
235       if(Array.isArray(result.documents[0].result)) {
236         self.cursorState.documents = result.documents[0].result;
237         self.cursorState.cursorId = Long.ZERO;
238         return callback(null, null);
239       }
240     }
241
242     // Otherwise fall back to regular find path
243     self.cursorState.cursorId = result.cursorId;
244     self.cursorState.documents = result.documents;
245     self.cursorState.lastCursorId = result.cursorId;
246
247     // Transform the results with passed in transformation method if provided
248     if(self.cursorState.transforms && typeof self.cursorState.transforms.query == 'function') {
249       self.cursorState.documents = self.cursorState.transforms.query(result);
250     }
251
252     // Return callback
253     callback(null, null);
254   }
255
256   // Options passed to the pool
257   var queryOptions = {};
258
259   // If we have a raw query decorate the function
260   if(self.options.raw || self.cmd.raw) {
261     // queryCallback.raw = self.options.raw || self.cmd.raw;
262     queryOptions.raw = self.options.raw || self.cmd.raw;
263   }
264
265   // Do we have documentsReturnedIn set on the query
266   if(typeof self.query.documentsReturnedIn == 'string') {
267     // queryCallback.documentsReturnedIn = self.query.documentsReturnedIn;
268     queryOptions.documentsReturnedIn = self.query.documentsReturnedIn;
269   }
270
271   // Add promote Long value if defined
272   if(typeof self.cursorState.promoteLongs == 'boolean') {
273     queryOptions.promoteLongs = self.cursorState.promoteLongs;
274   }
275
276   // Add promote values if defined
277   if(typeof self.cursorState.promoteValues == 'boolean') {
278     queryOptions.promoteValues = self.cursorState.promoteValues;
279   }
280
281   // Add promote values if defined
282   if(typeof self.cursorState.promoteBuffers == 'boolean') {
283     queryOptions.promoteBuffers = self.cursorState.promoteBuffers;
284   }
285
286   // Write the initial command out
287   self.server.s.pool.write(self.query, queryOptions, queryCallback);
288 }
289
290 Cursor.prototype._getmore = function(callback) {
291   if(this.logger.isDebug()) this.logger.debug(f('schedule getMore call for query [%s]', JSON.stringify(this.query)))
292   // Determine if it's a raw query
293   var raw = this.options.raw || this.cmd.raw;
294
295   // Set the current batchSize
296   var batchSize = this.cursorState.batchSize;
297   if(this.cursorState.limit > 0
298     && ((this.cursorState.currentLimit + batchSize) > this.cursorState.limit)) {
299     batchSize = this.cursorState.limit - this.cursorState.currentLimit;
300   }
301
302   // Default pool
303   var pool = this.server.s.pool;
304
305   // We have a wire protocol handler
306   this.server.wireProtocolHandler.getMore(this.bson, this.ns, this.cursorState, batchSize, raw, pool, this.options, callback);
307 }
308
309 Cursor.prototype._killcursor = function(callback) {
310   // Set cursor to dead
311   this.cursorState.dead = true;
312   this.cursorState.killed = true;
313   // Remove documents
314   this.cursorState.documents = [];
315
316   // If no cursor id just return
317   if(this.cursorState.cursorId == null || this.cursorState.cursorId.isZero() || this.cursorState.init == false) {
318     if(callback) callback(null, null);
319     return;
320   }
321
322   // Default pool
323   var pool = this.server.s.pool;
324   // Execute command
325   this.server.wireProtocolHandler.killCursor(this.bson, this.ns, this.cursorState.cursorId, pool, callback);
326 }
327
328 /**
329  * Clone the cursor
330  * @method
331  * @return {Cursor}
332  */
333 Cursor.prototype.clone = function() {
334   return this.topology.cursor(this.ns, this.cmd, this.options);
335 }
336
337 /**
338  * Checks if the cursor is dead
339  * @method
340  * @return {boolean} A boolean signifying if the cursor is dead or not
341  */
342 Cursor.prototype.isDead = function() {
343   return this.cursorState.dead == true;
344 }
345
346 /**
347  * Checks if the cursor was killed by the application
348  * @method
349  * @return {boolean} A boolean signifying if the cursor was killed by the application
350  */
351 Cursor.prototype.isKilled = function() {
352   return this.cursorState.killed == true;
353 }
354
355 /**
356  * Checks if the cursor notified it's caller about it's death
357  * @method
358  * @return {boolean} A boolean signifying if the cursor notified the callback
359  */
360 Cursor.prototype.isNotified = function() {
361   return this.cursorState.notified == true;
362 }
363
364 /**
365  * Returns current buffered documents length
366  * @method
367  * @return {number} The number of items in the buffered documents
368  */
369 Cursor.prototype.bufferedCount = function() {
370   return this.cursorState.documents.length - this.cursorState.cursorIndex;
371 }
372
373 /**
374  * Returns current buffered documents
375  * @method
376  * @return {Array} An array of buffered documents
377  */
378 Cursor.prototype.readBufferedDocuments = function(number) {
379   var unreadDocumentsLength = this.cursorState.documents.length - this.cursorState.cursorIndex;
380   var length = number < unreadDocumentsLength ? number : unreadDocumentsLength;
381   var elements = this.cursorState.documents.slice(this.cursorState.cursorIndex, this.cursorState.cursorIndex + length);
382
383   // Transform the doc with passed in transformation method if provided
384   if(this.cursorState.transforms && typeof this.cursorState.transforms.doc == 'function') {
385     // Transform all the elements
386     for(var i = 0; i < elements.length; i++) {
387       elements[i] = this.cursorState.transforms.doc(elements[i]);
388     }
389   }
390
391   // Ensure we do not return any more documents than the limit imposed
392   // Just return the number of elements up to the limit
393   if(this.cursorState.limit > 0 && (this.cursorState.currentLimit + elements.length) > this.cursorState.limit) {
394     elements = elements.slice(0, (this.cursorState.limit - this.cursorState.currentLimit));
395     this.kill();
396   }
397
398   // Adjust current limit
399   this.cursorState.currentLimit = this.cursorState.currentLimit + elements.length;
400   this.cursorState.cursorIndex = this.cursorState.cursorIndex + elements.length;
401
402   // Return elements
403   return elements;
404 }
405
406 /**
407  * Kill the cursor
408  * @method
409  * @param {resultCallback} callback A callback function
410  */
411 Cursor.prototype.kill = function(callback) {
412   this._killcursor(callback);
413 }
414
415 /**
416  * Resets the cursor
417  * @method
418  * @return {null}
419  */
420 Cursor.prototype.rewind = function() {
421   if(this.cursorState.init) {
422     if(!this.cursorState.dead) {
423       this.kill();
424     }
425
426     this.cursorState.currentLimit = 0;
427     this.cursorState.init = false;
428     this.cursorState.dead = false;
429     this.cursorState.killed = false;
430     this.cursorState.notified = false;
431     this.cursorState.documents = [];
432     this.cursorState.cursorId = null;
433     this.cursorState.cursorIndex = 0;
434   }
435 }
436
437 /**
438  * Validate if the pool is dead and return error
439  */
440 var isConnectionDead = function(self, callback) {
441   if(self.pool
442     && self.pool.isDestroyed()) {
443     self.cursorState.notified = true;
444     self.cursorState.killed = true;
445     self.cursorState.documents = [];
446     self.cursorState.cursorIndex = 0;
447     callback(MongoError.create(f('connection to host %s:%s was destroyed', self.pool.host, self.pool.port)))
448     return true;
449   }
450
451   return false;
452 }
453
454 /**
455  * Validate if the cursor is dead but was not explicitly killed by user
456  */
457 var isCursorDeadButNotkilled = function(self, callback) {
458   // Cursor is dead but not marked killed, return null
459   if(self.cursorState.dead && !self.cursorState.killed) {
460     self.cursorState.notified = true;
461     self.cursorState.killed = true;
462     self.cursorState.documents = [];
463     self.cursorState.cursorIndex = 0;
464     handleCallback(callback, null, null);
465     return true;
466   }
467
468   return false;
469 }
470
471 /**
472  * Validate if the cursor is dead and was killed by user
473  */
474 var isCursorDeadAndKilled = function(self, callback) {
475   if(self.cursorState.dead && self.cursorState.killed) {
476     handleCallback(callback, MongoError.create('cursor is dead'));
477     return true;
478   }
479
480   return false;
481 }
482
483 /**
484  * Validate if the cursor was killed by the user
485  */
486 var isCursorKilled = function(self, callback) {
487   if(self.cursorState.killed) {
488     self.cursorState.notified = true;
489     self.cursorState.documents = [];
490     self.cursorState.cursorIndex = 0;
491     handleCallback(callback, null, null);
492     return true;
493   }
494
495   return false;
496 }
497
498 /**
499  * Mark cursor as being dead and notified
500  */
501 var setCursorDeadAndNotified = function(self, callback) {
502   self.cursorState.dead = true;
503   self.cursorState.notified = true;
504   self.cursorState.documents = [];
505   self.cursorState.cursorIndex = 0;
506   handleCallback(callback, null, null);
507 }
508
509 /**
510  * Mark cursor as being notified
511  */
512 var setCursorNotified = function(self, callback) {
513   self.cursorState.notified = true;
514   self.cursorState.documents = [];
515   self.cursorState.cursorIndex = 0;
516   handleCallback(callback, null, null);
517 }
518
519 var nextFunction = function(self, callback) {
520   // We have notified about it
521   if(self.cursorState.notified) {
522     return callback(new Error('cursor is exhausted'));
523   }
524
525   // Cursor is killed return null
526   if(isCursorKilled(self, callback)) return;
527
528   // Cursor is dead but not marked killed, return null
529   if(isCursorDeadButNotkilled(self, callback)) return;
530
531   // We have a dead and killed cursor, attempting to call next should error
532   if(isCursorDeadAndKilled(self, callback)) return;
533
534   // We have just started the cursor
535   if(!self.cursorState.init) {
536     // Topology is not connected, save the call in the provided store to be
537     // Executed at some point when the handler deems it's reconnected
538     if(!self.topology.isConnected(self.options) && self.disconnectHandler != null) {
539       if (self.topology.isDestroyed()) {
540         // Topology was destroyed, so don't try to wait for it to reconnect
541         return callback(new MongoError('Topology was destroyed'));
542       }
543       return self.disconnectHandler.addObjectAndMethod('cursor', self, 'next', [callback], callback);
544     }
545
546     try {
547       self.server = self.topology.getServer(self.options);
548     } catch(err) {
549       // Handle the error and add object to next method call
550       if(self.disconnectHandler != null) {
551         return self.disconnectHandler.addObjectAndMethod('cursor', self, 'next', [callback], callback);
552       }
553
554       // Otherwise return the error
555       return callback(err);
556     }
557
558     // Set as init
559     self.cursorState.init = true;
560
561     // Server does not support server
562     if(self.cmd
563       && self.cmd.collation
564       && self.server.ismaster.maxWireVersion < 5) {
565       return callback(new MongoError(f('server %s does not support collation', self.server.name)));
566     }
567
568     try {
569       self.query = self.server.wireProtocolHandler.command(self.bson, self.ns, self.cmd, self.cursorState, self.topology, self.options);
570     } catch(err) {
571       return callback(err);
572     }
573   }
574
575   // If we don't have a cursorId execute the first query
576   if(self.cursorState.cursorId == null) {
577     // Check if pool is dead and return if not possible to
578     // execute the query against the db
579     if(isConnectionDead(self, callback)) return;
580
581     // Check if topology is destroyed
582     if(self.topology.isDestroyed()) return callback(new MongoError('connection destroyed, not possible to instantiate cursor'));
583
584     // query, cmd, options, cursorState, callback
585     self._find(function(err) {
586       if(err) return handleCallback(callback, err, null);
587
588       if(self.cursorState.documents.length == 0
589         && self.cursorState.cursorId && self.cursorState.cursorId.isZero()
590         && !self.cmd.tailable && !self.cmd.awaitData) {
591         return setCursorNotified(self, callback);
592       }
593
594       nextFunction(self, callback);
595     });
596   } else if(self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
597     // Ensure we kill the cursor on the server
598     self.kill();
599     // Set cursor in dead and notified state
600     return setCursorDeadAndNotified(self, callback);
601   } else if(self.cursorState.cursorIndex == self.cursorState.documents.length
602       && !Long.ZERO.equals(self.cursorState.cursorId)) {
603       // Ensure an empty cursor state
604       self.cursorState.documents = [];
605       self.cursorState.cursorIndex = 0;
606
607       // Check if topology is destroyed
608       if(self.topology.isDestroyed()) return callback(new MongoError('connection destroyed, not possible to instantiate cursor'));
609
610       // Check if connection is dead and return if not possible to
611       // execute a getmore on this connection
612       if(isConnectionDead(self, callback)) return;
613
614       // Execute the next get more
615       self._getmore(function(err, doc, connection) {
616         if(err) return handleCallback(callback, err);
617
618         // Save the returned connection to ensure all getMore's fire over the same connection
619         self.connection = connection;
620
621         // Tailable cursor getMore result, notify owner about it
622         // No attempt is made here to retry, this is left to the user of the
623         // core module to handle to keep core simple
624         if(self.cursorState.documents.length == 0
625           && self.cmd.tailable && Long.ZERO.equals(self.cursorState.cursorId)) {
626           // No more documents in the tailed cursor
627           return handleCallback(callback, MongoError.create({
628               message: 'No more documents in tailed cursor'
629             , tailable: self.cmd.tailable
630             , awaitData: self.cmd.awaitData
631           }));
632         } else if(self.cursorState.documents.length == 0
633           && self.cmd.tailable && !Long.ZERO.equals(self.cursorState.cursorId)) {
634           return nextFunction(self, callback);
635         }
636
637         if(self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
638           return setCursorDeadAndNotified(self, callback);
639         }
640
641         nextFunction(self, callback);
642       });
643   } else if(self.cursorState.documents.length == self.cursorState.cursorIndex
644     && self.cmd.tailable && Long.ZERO.equals(self.cursorState.cursorId)) {
645       return handleCallback(callback, MongoError.create({
646           message: 'No more documents in tailed cursor'
647         , tailable: self.cmd.tailable
648         , awaitData: self.cmd.awaitData
649       }));
650   } else if(self.cursorState.documents.length == self.cursorState.cursorIndex
651       && Long.ZERO.equals(self.cursorState.cursorId)) {
652       setCursorDeadAndNotified(self, callback);
653   } else {
654     if(self.cursorState.limit > 0 && self.cursorState.currentLimit >= self.cursorState.limit) {
655       // Ensure we kill the cursor on the server
656       self.kill();
657       // Set cursor in dead and notified state
658       return setCursorDeadAndNotified(self, callback);
659     }
660
661     // Increment the current cursor limit
662     self.cursorState.currentLimit += 1;
663
664     // Get the document
665     var doc = self.cursorState.documents[self.cursorState.cursorIndex++];
666
667     // Doc overflow
668     if(doc.$err) {
669       // Ensure we kill the cursor on the server
670       self.kill();
671       // Set cursor in dead and notified state
672       return setCursorDeadAndNotified(self, function() {
673         handleCallback(callback, new MongoError(doc.$err));
674       });
675     }
676
677     // Transform the doc with passed in transformation method if provided
678     if(self.cursorState.transforms && typeof self.cursorState.transforms.doc == 'function') {
679       doc = self.cursorState.transforms.doc(doc);
680     }
681
682     // Return the document
683     handleCallback(callback, null, doc);
684   }
685 }
686
687 /**
688  * Retrieve the next document from the cursor
689  * @method
690  * @param {resultCallback} callback A callback function
691  */
692 Cursor.prototype.next = function(callback) {
693   nextFunction(this, callback);
694 }
695
696 module.exports = Cursor;