44431c1dbfd4e90f24bb654eecaddb168281ad33
[aai/esr-gui.git] /
1 "use strict";
2
3 var inherits = require('util').inherits
4   , f = require('util').format
5   , formattedOrderClause = require('./utils').formattedOrderClause
6   , handleCallback = require('./utils').handleCallback
7   , ReadPreference = require('./read_preference')
8   , MongoError = require('mongodb-core').MongoError
9   , Readable = require('stream').Readable || require('readable-stream').Readable
10   , Define = require('./metadata')
11   , CoreCursor = require('mongodb-core').Cursor
12   , Map = require('mongodb-core').BSON.Map
13   , Query = require('mongodb-core').Query
14   , CoreReadPreference = require('mongodb-core').ReadPreference;
15
16 /**
17  * @fileOverview The **Cursor** class is an internal class that embodies a cursor on MongoDB
18  * allowing for iteration over the results returned from the underlying query. It supports
19  * one by one document iteration, conversion to an array or can be iterated as a Node 0.10.X
20  * or higher stream
21  *
22  * **CURSORS Cannot directly be instantiated**
23  * @example
24  * var MongoClient = require('mongodb').MongoClient,
25  *   test = require('assert');
26  * // Connection url
27  * var url = 'mongodb://localhost:27017/test';
28  * // Connect using MongoClient
29  * MongoClient.connect(url, function(err, db) {
30  *   // Create a collection we want to drop later
31  *   var col = db.collection('createIndexExample1');
32  *   // Insert a bunch of documents
33  *   col.insert([{a:1, b:1}
34  *     , {a:2, b:2}, {a:3, b:3}
35  *     , {a:4, b:4}], {w:1}, function(err, result) {
36  *     test.equal(null, err);
37  *
38  *     // Show that duplicate records got dropped
39  *     col.find({}).toArray(function(err, items) {
40  *       test.equal(null, err);
41  *       test.equal(4, items.length);
42  *       db.close();
43  *     });
44  *   });
45  * });
46  */
47
48 /**
49  * Namespace provided by the mongodb-core and node.js
50  * @external CoreCursor
51  * @external Readable
52  */
53
54 // Flags allowed for cursor
55 var flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
56 var fields = ['numberOfRetries', 'tailableRetryInterval'];
57 var push = Array.prototype.push;
58
59 /**
60  * Creates a new Cursor instance (INTERNAL TYPE, do not instantiate directly)
61  * @class Cursor
62  * @extends external:CoreCursor
63  * @extends external:Readable
64  * @property {string} sortValue Cursor query sort setting.
65  * @property {boolean} timeout Is Cursor able to time out.
66  * @property {ReadPreference} readPreference Get cursor ReadPreference.
67  * @fires Cursor#data
68  * @fires Cursor#end
69  * @fires Cursor#close
70  * @fires Cursor#readable
71  * @return {Cursor} a Cursor instance.
72  * @example
73  * Cursor cursor options.
74  *
75  * collection.find({}).project({a:1})                             // Create a projection of field a
76  * collection.find({}).skip(1).limit(10)                          // Skip 1 and limit 10
77  * collection.find({}).batchSize(5)                               // Set batchSize on cursor to 5
78  * collection.find({}).filter({a:1})                              // Set query on the cursor
79  * collection.find({}).comment('add a comment')                   // Add a comment to the query, allowing to correlate queries
80  * collection.find({}).addCursorFlag('tailable', true)            // Set cursor as tailable
81  * collection.find({}).addCursorFlag('oplogReplay', true)         // Set cursor as oplogReplay
82  * collection.find({}).addCursorFlag('noCursorTimeout', true)     // Set cursor as noCursorTimeout
83  * collection.find({}).addCursorFlag('awaitData', true)           // Set cursor as awaitData
84  * collection.find({}).addCursorFlag('partial', true)             // Set cursor as partial
85  * collection.find({}).addQueryModifier('$orderby', {a:1})        // Set $orderby {a:1}
86  * collection.find({}).max(10)                                    // Set the cursor maxScan
87  * collection.find({}).maxScan(10)                                // Set the cursor maxScan
88  * collection.find({}).maxTimeMS(1000)                            // Set the cursor maxTimeMS
89  * collection.find({}).min(100)                                   // Set the cursor min
90  * collection.find({}).returnKey(10)                              // Set the cursor returnKey
91  * collection.find({}).setReadPreference(ReadPreference.PRIMARY)  // Set the cursor readPreference
92  * collection.find({}).showRecordId(true)                         // Set the cursor showRecordId
93  * collection.find({}).snapshot(true)                             // Set the cursor snapshot
94  * collection.find({}).sort([['a', 1]])                           // Sets the sort order of the cursor query
95  * collection.find({}).hint('a_1')                                // Set the cursor hint
96  *
97  * All options are chainable, so one can do the following.
98  *
99  * collection.find({}).maxTimeMS(1000).maxScan(100).skip(1).toArray(..)
100  */
101 var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
102   CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
103   var self = this;
104   var state = Cursor.INIT;
105   var streamOptions = {};
106
107   // Tailable cursor options
108   var numberOfRetries = options.numberOfRetries || 5;
109   var tailableRetryInterval = options.tailableRetryInterval || 500;
110   var currentNumberOfRetries = numberOfRetries;
111
112   // Get the promiseLibrary
113   var promiseLibrary = options.promiseLibrary;
114
115   // No promise library selected fall back
116   if(!promiseLibrary) {
117     promiseLibrary = typeof global.Promise == 'function' ?
118       global.Promise : require('es6-promise').Promise;
119   }
120
121   // Set up
122   Readable.call(this, {objectMode: true});
123
124   // Internal cursor state
125   this.s = {
126     // Tailable cursor options
127       numberOfRetries: numberOfRetries
128     , tailableRetryInterval: tailableRetryInterval
129     , currentNumberOfRetries: currentNumberOfRetries
130     // State
131     , state: state
132     // Stream options
133     , streamOptions: streamOptions
134     // BSON
135     , bson: bson
136     // Namespace
137     , ns: ns
138     // Command
139     , cmd: cmd
140     // Options
141     , options: options
142     // Topology
143     , topology: topology
144     // Topology options
145     , topologyOptions: topologyOptions
146     // Promise library
147     , promiseLibrary: promiseLibrary
148     // Current doc
149     , currentDoc: null
150   }
151
152   // Translate correctly
153   if(self.s.options.noCursorTimeout == true) {
154     self.addCursorFlag('noCursorTimeout', true);
155   }
156
157   // Set the sort value
158   this.sortValue = self.s.cmd.sort;
159 }
160
161 /**
162  * Cursor stream data event, fired for each document in the cursor.
163  *
164  * @event Cursor#data
165  * @type {object}
166  */
167
168 /**
169  * Cursor stream end event
170  *
171  * @event Cursor#end
172  * @type {null}
173  */
174
175 /**
176  * Cursor stream close event
177  *
178  * @event Cursor#close
179  * @type {null}
180  */
181
182 /**
183  * Cursor stream readable event
184  *
185  * @event Cursor#readable
186  * @type {null}
187  */
188
189 // Inherit from Readable
190 inherits(Cursor, Readable);
191
192 // Map core cursor _next method so we can apply mapping
193 CoreCursor.prototype._next = CoreCursor.prototype.next;
194
195 for(var name in CoreCursor.prototype) {
196   Cursor.prototype[name] = CoreCursor.prototype[name];
197 }
198
199 var define = Cursor.define = new Define('Cursor', Cursor, true);
200
201 /**
202  * Check if there is any document still available in the cursor
203  * @method
204  * @param {Cursor~resultCallback} [callback] The result callback.
205  * @throws {MongoError}
206  * @return {Promise} returns Promise if no callback passed
207  */
208 Cursor.prototype.hasNext = function(callback) {
209   var self = this;
210
211   // Execute using callback
212   if(typeof callback == 'function') {
213     if(self.s.currentDoc){
214       return callback(null, true);
215     } else {
216       return nextObject(self, function(err, doc) {
217         if(!doc) return callback(null, false);
218         self.s.currentDoc = doc;
219         callback(null, true);
220       });
221     }
222   }
223
224   // Return a Promise
225   return new this.s.promiseLibrary(function(resolve, reject) {
226     if(self.s.currentDoc){
227       resolve(true);
228     } else {
229       nextObject(self, function(err, doc) {
230         if(self.s.state == Cursor.CLOSED || self.isDead()) return resolve(false);
231         if(err) return reject(err);
232         if(!doc) return resolve(false);
233         self.s.currentDoc = doc;
234         resolve(true);
235       });
236     }
237   });
238 }
239
240 define.classMethod('hasNext', {callback: true, promise:true});
241
242 /**
243  * Get the next available document from the cursor, returns null if no more documents are available.
244  * @method
245  * @param {Cursor~resultCallback} [callback] The result callback.
246  * @throws {MongoError}
247  * @return {Promise} returns Promise if no callback passed
248  */
249 Cursor.prototype.next = function(callback) {
250   var self = this;
251
252   // Execute using callback
253   if(typeof callback == 'function') {
254     // Return the currentDoc if someone called hasNext first
255     if(self.s.currentDoc) {
256       var doc = self.s.currentDoc;
257       self.s.currentDoc = null;
258       return callback(null, doc);
259     }
260
261     // Return the next object
262     return nextObject(self, callback)
263   };
264
265   // Return a Promise
266   return new this.s.promiseLibrary(function(resolve, reject) {
267     // Return the currentDoc if someone called hasNext first
268     if(self.s.currentDoc) {
269       var doc = self.s.currentDoc;
270       self.s.currentDoc = null;
271       return resolve(doc);
272     }
273
274     nextObject(self, function(err, r) {
275       if(err) return reject(err);
276       resolve(r);
277     });
278   });
279 }
280
281 define.classMethod('next', {callback: true, promise:true});
282
283 /**
284  * Set the cursor query
285  * @method
286  * @param {object} filter The filter object used for the cursor.
287  * @return {Cursor}
288  */
289 Cursor.prototype.filter = function(filter) {
290   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
291   this.s.cmd.query = filter;
292   return this;
293 }
294
295 define.classMethod('filter', {callback: false, promise:false, returns: [Cursor]});
296
297 /**
298  * Set the cursor maxScan
299  * @method
300  * @param {object} maxScan Constrains the query to only scan the specified number of documents when fulfilling the query
301  * @return {Cursor}
302  */
303 Cursor.prototype.maxScan = function(maxScan) {
304   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
305   this.s.cmd.maxScan = maxScan;
306   return this;
307 }
308
309 define.classMethod('maxScan', {callback: false, promise:false, returns: [Cursor]});
310
311 /**
312  * Set the cursor hint
313  * @method
314  * @param {object} hint If specified, then the query system will only consider plans using the hinted index.
315  * @return {Cursor}
316  */
317 Cursor.prototype.hint = function(hint) {
318   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
319   this.s.cmd.hint = hint;
320   return this;
321 }
322
323 define.classMethod('hint', {callback: false, promise:false, returns: [Cursor]});
324
325 /**
326  * Set the cursor min
327  * @method
328  * @param {object} min Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
329  * @return {Cursor}
330  */
331 Cursor.prototype.min = function(min) {
332   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
333   this.s.cmd.min = min;
334   return this;
335 }
336
337 define.classMethod('min', {callback: false, promise:false, returns: [Cursor]});
338
339 /**
340  * Set the cursor max
341  * @method
342  * @param {object} max Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
343  * @return {Cursor}
344  */
345 Cursor.prototype.max = function(max) {
346   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
347   this.s.cmd.max = max;
348   return this;
349 }
350
351 define.classMethod('max', {callback: false, promise:false, returns: [Cursor]});
352
353 /**
354  * Set the cursor returnKey
355  * @method
356  * @param {object} returnKey Only return the index field or fields for the results of the query. If $returnKey is set to true and the query does not use an index to perform the read operation, the returned documents will not contain any fields. Use one of the following forms:
357  * @return {Cursor}
358  */
359 Cursor.prototype.returnKey = function(value) {
360   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
361   this.s.cmd.returnKey = value;
362   return this;
363 }
364
365 define.classMethod('returnKey', {callback: false, promise:false, returns: [Cursor]});
366
367 /**
368  * Set the cursor showRecordId
369  * @method
370  * @param {object} showRecordId The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
371  * @return {Cursor}
372  */
373 Cursor.prototype.showRecordId = function(value) {
374   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
375   this.s.cmd.showDiskLoc = value;
376   return this;
377 }
378
379 define.classMethod('showRecordId', {callback: false, promise:false, returns: [Cursor]});
380
381 /**
382  * Set the cursor snapshot
383  * @method
384  * @param {object} snapshot The $snapshot operator prevents the cursor from returning a document more than once because an intervening write operation results in a move of the document.
385  * @return {Cursor}
386  */
387 Cursor.prototype.snapshot = function(value) {
388   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
389   this.s.cmd.snapshot = value;
390   return this;
391 }
392
393 define.classMethod('snapshot', {callback: false, promise:false, returns: [Cursor]});
394
395 /**
396  * Set a node.js specific cursor option
397  * @method
398  * @param {string} field The cursor option to set ['numberOfRetries', 'tailableRetryInterval'].
399  * @param {object} value The field value.
400  * @throws {MongoError}
401  * @return {Cursor}
402  */
403 Cursor.prototype.setCursorOption = function(field, value) {
404   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
405   if(fields.indexOf(field) == -1) throw MongoError.create({message: f("option %s not a supported option %s", field, fields), driver:true });
406   this.s[field] = value;
407   if(field == 'numberOfRetries')
408     this.s.currentNumberOfRetries = value;
409   return this;
410 }
411
412 define.classMethod('setCursorOption', {callback: false, promise:false, returns: [Cursor]});
413
414 /**
415  * Add a cursor flag to the cursor
416  * @method
417  * @param {string} flag The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial'].
418  * @param {boolean} value The flag boolean value.
419  * @throws {MongoError}
420  * @return {Cursor}
421  */
422 Cursor.prototype.addCursorFlag = function(flag, value) {
423   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
424   if(flags.indexOf(flag) == -1) throw MongoError.create({message: f("flag %s not a supported flag %s", flag, flags), driver:true });
425   if(typeof value != 'boolean') throw MongoError.create({message: f("flag %s must be a boolean value", flag), driver:true});
426   this.s.cmd[flag] = value;
427   return this;
428 }
429
430 define.classMethod('addCursorFlag', {callback: false, promise:false, returns: [Cursor]});
431
432 /**
433  * Add a query modifier to the cursor query
434  * @method
435  * @param {string} name The query modifier (must start with $, such as $orderby etc)
436  * @param {boolean} value The flag boolean value.
437  * @throws {MongoError}
438  * @return {Cursor}
439  */
440 Cursor.prototype.addQueryModifier = function(name, value) {
441   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
442   if(name[0] != '$') throw MongoError.create({message: f("%s is not a valid query modifier"), driver:true});
443   // Strip of the $
444   var field = name.substr(1);
445   // Set on the command
446   this.s.cmd[field] = value;
447   // Deal with the special case for sort
448   if(field == 'orderby') this.s.cmd.sort = this.s.cmd[field];
449   return this;
450 }
451
452 define.classMethod('addQueryModifier', {callback: false, promise:false, returns: [Cursor]});
453
454 /**
455  * Add a comment to the cursor query allowing for tracking the comment in the log.
456  * @method
457  * @param {string} value The comment attached to this query.
458  * @throws {MongoError}
459  * @return {Cursor}
460  */
461 Cursor.prototype.comment = function(value) {
462   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
463   this.s.cmd.comment = value;
464   return this;
465 }
466
467 define.classMethod('comment', {callback: false, promise:false, returns: [Cursor]});
468
469 /**
470  * Set a maxAwaitTimeMS on a tailing cursor query to allow to customize the timeout value for the option awaitData (Only supported on MongoDB 3.2 or higher, ignored otherwise)
471  * @method
472  * @param {number} value Number of milliseconds to wait before aborting the tailed query.
473  * @throws {MongoError}
474  * @return {Cursor}
475  */
476 Cursor.prototype.maxAwaitTimeMS = function(value) {
477   if(typeof value != 'number') throw MongoError.create({message: "maxAwaitTimeMS must be a number", driver:true});
478   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
479   this.s.cmd.maxAwaitTimeMS = value;
480   return this;
481 }
482
483 define.classMethod('maxAwaitTimeMS', {callback: false, promise:false, returns: [Cursor]});
484
485 /**
486  * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
487  * @method
488  * @param {number} value Number of milliseconds to wait before aborting the query.
489  * @throws {MongoError}
490  * @return {Cursor}
491  */
492 Cursor.prototype.maxTimeMS = function(value) {
493   if(typeof value != 'number') throw MongoError.create({message: "maxTimeMS must be a number", driver:true});
494   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
495   this.s.cmd.maxTimeMS = value;
496   return this;
497 }
498
499 define.classMethod('maxTimeMS', {callback: false, promise:false, returns: [Cursor]});
500
501 Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
502
503 define.classMethod('maxTimeMs', {callback: false, promise:false, returns: [Cursor]});
504
505 /**
506  * Sets a field projection for the query.
507  * @method
508  * @param {object} value The field projection object.
509  * @throws {MongoError}
510  * @return {Cursor}
511  */
512 Cursor.prototype.project = function(value) {
513   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
514   this.s.cmd.fields = value;
515   return this;
516 }
517
518 define.classMethod('project', {callback: false, promise:false, returns: [Cursor]});
519
520 /**
521  * Sets the sort order of the cursor query.
522  * @method
523  * @param {(string|array|object)} keyOrList The key or keys set for the sort.
524  * @param {number} [direction] The direction of the sorting (1 or -1).
525  * @throws {MongoError}
526  * @return {Cursor}
527  */
528 Cursor.prototype.sort = function(keyOrList, direction) {
529   if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support sorting", driver:true});
530   if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
531   var order = keyOrList;
532
533   // We have an array of arrays, we need to preserve the order of the sort
534   // so we will us a Map
535   if(Array.isArray(order) && Array.isArray(order[0])) {
536     order = new Map(order.map(function(x) {
537       var value = [x[0], null];
538       if(x[1] == 'asc') {
539         value[1] = 1;
540       } else if(x[1] == 'desc') {
541         value[1] = -1;
542       } else if(x[1] == 1 || x[1] == -1) {
543         value[1] = x[1];
544       } else {
545         throw new MongoError("Illegal sort clause, must be of the form [['field1', '(ascending|descending)'], ['field2', '(ascending|descending)']]");
546       }
547
548       return value;
549     }));
550   }
551
552   if(direction != null) {
553     order = [[keyOrList, direction]];
554   }
555
556   this.s.cmd.sort = order;
557   this.sortValue = order;
558   return this;
559 }
560
561 define.classMethod('sort', {callback: false, promise:false, returns: [Cursor]});
562
563 /**
564  * Set the batch size for the cursor.
565  * @method
566  * @param {number} value The batchSize for the cursor.
567  * @throws {MongoError}
568  * @return {Cursor}
569  */
570 Cursor.prototype.batchSize = function(value) {
571   if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support batchSize", driver:true});
572   if(this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
573   if(typeof value != 'number') throw MongoError.create({message: "batchSize requires an integer", driver:true});
574   this.s.cmd.batchSize = value;
575   this.setCursorBatchSize(value);
576   return this;
577 }
578
579 define.classMethod('batchSize', {callback: false, promise:false, returns: [Cursor]});
580
581 /**
582  * Set the collation options for the cursor.
583  * @method
584  * @param {object} value The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
585  * @throws {MongoError}
586  * @return {Cursor}
587  */
588 Cursor.prototype.collation = function(value) {
589   this.s.cmd.collation = value;
590   return this;
591 }
592
593 define.classMethod('collation', {callback: false, promise:false, returns: [Cursor]});
594
595 /**
596  * Set the limit for the cursor.
597  * @method
598  * @param {number} value The limit for the cursor query.
599  * @throws {MongoError}
600  * @return {Cursor}
601  */
602 Cursor.prototype.limit = function(value) {
603   if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support limit", driver:true});
604   if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
605   if(typeof value != 'number') throw MongoError.create({message: "limit requires an integer", driver:true});
606   this.s.cmd.limit = value;
607   // this.cursorLimit = value;
608   this.setCursorLimit(value);
609   return this;
610 }
611
612 define.classMethod('limit', {callback: false, promise:false, returns: [Cursor]});
613
614 /**
615  * Set the skip for the cursor.
616  * @method
617  * @param {number} value The skip for the cursor query.
618  * @throws {MongoError}
619  * @return {Cursor}
620  */
621 Cursor.prototype.skip = function(value) {
622   if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support skip", driver:true});
623   if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
624   if(typeof value != 'number') throw MongoError.create({message: "skip requires an integer", driver:true});
625   this.s.cmd.skip = value;
626   this.setCursorSkip(value);
627   return this;
628 }
629
630 define.classMethod('skip', {callback: false, promise:false, returns: [Cursor]});
631
632 /**
633  * The callback format for results
634  * @callback Cursor~resultCallback
635  * @param {MongoError} error An error instance representing the error during the execution.
636  * @param {(object|null|boolean)} result The result object if the command was executed successfully.
637  */
638
639 /**
640  * Clone the cursor
641  * @function external:CoreCursor#clone
642  * @return {Cursor}
643  */
644
645 /**
646  * Resets the cursor
647  * @function external:CoreCursor#rewind
648  * @return {null}
649  */
650
651 /**
652  * Get the next available document from the cursor, returns null if no more documents are available.
653  * @method
654  * @param {Cursor~resultCallback} [callback] The result callback.
655  * @throws {MongoError}
656  * @deprecated
657  * @return {Promise} returns Promise if no callback passed
658  */
659 Cursor.prototype.nextObject = Cursor.prototype.next;
660
661 var nextObject = function(self, callback) {
662   if(self.s.state == Cursor.CLOSED || self.isDead && self.isDead()) return handleCallback(callback, MongoError.create({message: "Cursor is closed", driver:true}));
663   if(self.s.state == Cursor.INIT && self.s.cmd.sort) {
664     try {
665       self.s.cmd.sort = formattedOrderClause(self.s.cmd.sort);
666     } catch(err) {
667       return handleCallback(callback, err);
668     }
669   }
670
671   // Get the next object
672   self._next(function(err, doc) {
673     self.s.state = Cursor.OPEN;
674     if(err) return handleCallback(callback, err);
675     handleCallback(callback, null, doc);
676   });
677 }
678
679 define.classMethod('nextObject', {callback: true, promise:true});
680
681 // Trampoline emptying the number of retrieved items
682 // without incurring a nextTick operation
683 var loop = function(self, callback) {
684   // No more items we are done
685   if(self.bufferedCount() == 0) return;
686   // Get the next document
687   self._next(callback);
688   // Loop
689   return loop;
690 }
691
692 Cursor.prototype.next = Cursor.prototype.nextObject;
693
694 define.classMethod('next', {callback: true, promise:true});
695
696 /**
697  * Iterates over all the documents for this cursor. As with **{cursor.toArray}**,
698  * not all of the elements will be iterated if this cursor had been previouly accessed.
699  * In that case, **{cursor.rewind}** can be used to reset the cursor. However, unlike
700  * **{cursor.toArray}**, the cursor will only hold a maximum of batch size elements
701  * at any given time if batch size is specified. Otherwise, the caller is responsible
702  * for making sure that the entire result can fit the memory.
703  * @method
704  * @deprecated
705  * @param {Cursor~resultCallback} callback The result callback.
706  * @throws {MongoError}
707  * @return {null}
708  */
709 Cursor.prototype.each = function(callback) {
710   // Rewind cursor state
711   this.rewind();
712   // Set current cursor to INIT
713   this.s.state = Cursor.INIT;
714   // Run the query
715   _each(this, callback);
716 };
717
718 define.classMethod('each', {callback: true, promise:false});
719
720 // Run the each loop
721 var _each = function(self, callback) {
722   if(!callback) throw MongoError.create({message: 'callback is mandatory', driver:true});
723   if(self.isNotified()) return;
724   if(self.s.state == Cursor.CLOSED || self.isDead()) {
725     return handleCallback(callback, MongoError.create({message: "Cursor is closed", driver:true}));
726   }
727
728   if(self.s.state == Cursor.INIT) self.s.state = Cursor.OPEN;
729
730   // Define function to avoid global scope escape
731   var fn = null;
732   // Trampoline all the entries
733   if(self.bufferedCount() > 0) {
734     while(fn = loop(self, callback)) fn(self, callback);
735     _each(self, callback);
736   } else {
737     self.next(function(err, item) {
738       if(err) return handleCallback(callback, err);
739       if(item == null) {
740         self.s.state = Cursor.CLOSED;
741         return handleCallback(callback, null, null);
742       }
743
744       if(handleCallback(callback, null, item) == false) return;
745       _each(self, callback);
746     })
747   }
748 }
749
750 /**
751  * The callback format for the forEach iterator method
752  * @callback Cursor~iteratorCallback
753  * @param {Object} doc An emitted document for the iterator
754  */
755
756 /**
757  * The callback error format for the forEach iterator method
758  * @callback Cursor~endCallback
759  * @param {MongoError} error An error instance representing the error during the execution.
760  */
761
762 /**
763  * Iterates over all the documents for this cursor using the iterator, callback pattern.
764  * @method
765  * @param {Cursor~iteratorCallback} iterator The iteration callback.
766  * @param {Cursor~endCallback} callback The end callback.
767  * @throws {MongoError}
768  * @return {null}
769  */
770 Cursor.prototype.forEach = function(iterator, callback) {
771   this.each(function(err, doc){
772     if(err) { callback(err); return false; }
773     if(doc != null) { iterator(doc); return true; }
774     if(doc == null && callback) {
775       var internalCallback = callback;
776       callback = null;
777       internalCallback(null);
778       return false;
779     }
780   });
781 }
782
783 define.classMethod('forEach', {callback: true, promise:false});
784
785 /**
786  * Set the ReadPreference for the cursor.
787  * @method
788  * @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
789  * @throws {MongoError}
790  * @return {Cursor}
791  */
792 Cursor.prototype.setReadPreference = function(r) {
793   if(this.s.state != Cursor.INIT) throw MongoError.create({message: 'cannot change cursor readPreference after cursor has been accessed', driver:true});
794   if(r instanceof ReadPreference) {
795     this.s.options.readPreference = new CoreReadPreference(r.mode, r.tags, {maxStalenessMS: r.maxStalenessMS});
796   } else if(typeof r == 'string'){
797     this.s.options.readPreference = new CoreReadPreference(r);
798   } else if(r instanceof CoreReadPreference) {
799     this.s.options.readPreference = r;
800   }
801
802   return this;
803 }
804
805 define.classMethod('setReadPreference', {callback: false, promise:false, returns: [Cursor]});
806
807 /**
808  * The callback format for results
809  * @callback Cursor~toArrayResultCallback
810  * @param {MongoError} error An error instance representing the error during the execution.
811  * @param {object[]} documents All the documents the satisfy the cursor.
812  */
813
814 /**
815  * Returns an array of documents. The caller is responsible for making sure that there
816  * is enough memory to store the results. Note that the array only contain partial
817  * results when this cursor had been previouly accessed. In that case,
818  * cursor.rewind() can be used to reset the cursor.
819  * @method
820  * @param {Cursor~toArrayResultCallback} [callback] The result callback.
821  * @throws {MongoError}
822  * @return {Promise} returns Promise if no callback passed
823  */
824 Cursor.prototype.toArray = function(callback) {
825   var self = this;
826   if(self.s.options.tailable) throw MongoError.create({message: 'Tailable cursor cannot be converted to array', driver:true});
827
828   // Execute using callback
829   if(typeof callback == 'function') return toArray(self, callback);
830
831   // Return a Promise
832   return new this.s.promiseLibrary(function(resolve, reject) {
833     toArray(self, function(err, r) {
834       if(err) return reject(err);
835       resolve(r);
836     });
837   });
838 }
839
840 var toArray = function(self, callback) {
841   var items = [];
842
843   // Reset cursor
844   self.rewind();
845   self.s.state = Cursor.INIT;
846
847   // Fetch all the documents
848   var fetchDocs = function() {
849     self._next(function(err, doc) {
850       if(err) return handleCallback(callback, err);
851       if(doc == null) {
852         self.s.state = Cursor.CLOSED;
853         return handleCallback(callback, null, items);
854       }
855
856       // Add doc to items
857       items.push(doc)
858
859       // Get all buffered objects
860       if(self.bufferedCount() > 0) {
861         var docs = self.readBufferedDocuments(self.bufferedCount())
862
863         // Transform the doc if transform method added
864         if(self.s.transforms && typeof self.s.transforms.doc == 'function') {
865           docs = docs.map(self.s.transforms.doc);
866         }
867
868         push.apply(items, docs);
869       }
870
871       // Attempt a fetch
872       fetchDocs();
873     })
874   }
875
876   fetchDocs();
877 }
878
879 define.classMethod('toArray', {callback: true, promise:true});
880
881 /**
882  * The callback format for results
883  * @callback Cursor~countResultCallback
884  * @param {MongoError} error An error instance representing the error during the execution.
885  * @param {number} count The count of documents.
886  */
887
888 /**
889  * Get the count of documents for this cursor
890  * @method
891  * @param {boolean} [applySkipLimit=true] Should the count command apply limit and skip settings on the cursor or in the passed in options.
892  * @param {object} [options=null] Optional settings.
893  * @param {number} [options.skip=null] The number of documents to skip.
894  * @param {number} [options.limit=null] The maximum amounts to count before aborting.
895  * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
896  * @param {string} [options.hint=null] An index name hint for the query.
897  * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
898  * @param {Cursor~countResultCallback} [callback] The result callback.
899  * @return {Promise} returns Promise if no callback passed
900  */
901 Cursor.prototype.count = function(applySkipLimit, opts, callback) {
902   var self = this;
903   if(self.s.cmd.query == null) throw MongoError.create({message: "count can only be used with find command", driver:true});
904   if(typeof opts == 'function') callback = opts, opts = {};
905   opts = opts || {};
906
907   // Execute using callback
908   if(typeof callback == 'function') return count(self, applySkipLimit, opts, callback);
909
910   // Return a Promise
911   return new this.s.promiseLibrary(function(resolve, reject) {
912     count(self, applySkipLimit, opts, function(err, r) {
913       if(err) return reject(err);
914       resolve(r);
915     });
916   });
917 };
918
919 var count = function(self, applySkipLimit, opts, callback) {
920   if(typeof applySkipLimit == 'function') {
921     callback = applySkipLimit;
922     applySkipLimit = true;
923   }
924
925   if(applySkipLimit) {
926     if(typeof self.cursorSkip() == 'number') opts.skip = self.cursorSkip();
927     if(typeof self.cursorLimit() == 'number') opts.limit = self.cursorLimit();
928   }
929
930   // Command
931   var delimiter = self.s.ns.indexOf('.');
932
933   var command = {
934     'count': self.s.ns.substr(delimiter+1), 'query': self.s.cmd.query
935   }
936
937   if(typeof opts.maxTimeMS == 'number') {
938     command.maxTimeMS = opts.maxTimeMS;
939   } else if(self.s.cmd && typeof self.s.cmd.maxTimeMS == 'number') {
940     command.maxTimeMS = self.s.cmd.maxTimeMS;
941   }
942
943   // Merge in any options
944   if(opts.skip) command.skip = opts.skip;
945   if(opts.limit) command.limit = opts.limit;
946   if(self.s.options.hint) command.hint = self.s.options.hint;
947
948   // Execute the command
949   self.topology.command(f("%s.$cmd", self.s.ns.substr(0, delimiter))
950     , command, function(err, result) {
951       callback(err, result ? result.result.n : null)
952     });
953 }
954
955 define.classMethod('count', {callback: true, promise:true});
956
957 /**
958  * Close the cursor, sending a KillCursor command and emitting close.
959  * @method
960  * @param {Cursor~resultCallback} [callback] The result callback.
961  * @return {Promise} returns Promise if no callback passed
962  */
963 Cursor.prototype.close = function(callback) {
964   this.s.state = Cursor.CLOSED;
965   // Kill the cursor
966   this.kill();
967   // Emit the close event for the cursor
968   this.emit('close');
969   // Callback if provided
970   if(typeof callback == 'function') return handleCallback(callback, null, this);
971   // Return a Promise
972   return new this.s.promiseLibrary(function(resolve, reject) {
973     resolve();
974   });
975 }
976
977 define.classMethod('close', {callback: true, promise:true});
978
979 /**
980  * Map all documents using the provided function
981  * @method
982  * @param {function} [transform] The mapping transformation method.
983  * @return {null}
984  */
985 Cursor.prototype.map = function(transform) {
986   this.cursorState.transforms = { doc: transform };
987   return this;
988 }
989
990 define.classMethod('map', {callback: false, promise:false, returns: [Cursor]});
991
992 /**
993  * Is the cursor closed
994  * @method
995  * @return {boolean}
996  */
997 Cursor.prototype.isClosed = function() {
998   return this.isDead();
999 }
1000
1001 define.classMethod('isClosed', {callback: false, promise:false, returns: [Boolean]});
1002
1003 Cursor.prototype.destroy = function(err) {
1004   if(err) this.emit('error', err);
1005   this.pause();
1006   this.close();
1007 }
1008
1009 define.classMethod('destroy', {callback: false, promise:false});
1010
1011 /**
1012  * Return a modified Readable stream including a possible transform method.
1013  * @method
1014  * @param {object} [options=null] Optional settings.
1015  * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
1016  * @return {Cursor}
1017  */
1018 Cursor.prototype.stream = function(options) {
1019   this.s.streamOptions = options || {};
1020   return this;
1021 }
1022
1023 define.classMethod('stream', {callback: false, promise:false, returns: [Cursor]});
1024
1025 /**
1026  * Execute the explain for the cursor
1027  * @method
1028  * @param {Cursor~resultCallback} [callback] The result callback.
1029  * @return {Promise} returns Promise if no callback passed
1030  */
1031 Cursor.prototype.explain = function(callback) {
1032   var self = this;
1033   this.s.cmd.explain = true;
1034
1035   // Do we have a readConcern
1036   if(this.s.cmd.readConcern) {
1037     delete this.s.cmd['readConcern'];
1038   }
1039
1040   // Execute using callback
1041   if(typeof callback == 'function') return this._next(callback);
1042
1043   // Return a Promise
1044   return new this.s.promiseLibrary(function(resolve, reject) {
1045     self._next(function(err, r) {
1046       if(err) return reject(err);
1047       resolve(r);
1048     });
1049   });
1050 }
1051
1052 define.classMethod('explain', {callback: true, promise:true});
1053
1054 Cursor.prototype._read = function(n) {
1055   var self = this;
1056   if(self.s.state == Cursor.CLOSED || self.isDead()) {
1057     return self.push(null);
1058   }
1059
1060   // Get the next item
1061   self.nextObject(function(err, result) {
1062     if(err) {
1063       if(self.listeners('error') && self.listeners('error').length > 0) {
1064         self.emit('error', err);
1065       }
1066       if(!self.isDead()) self.close();
1067
1068       // Emit end event
1069       self.emit('end');
1070       return self.emit('finish');
1071     }
1072
1073     // If we provided a transformation method
1074     if(typeof self.s.streamOptions.transform == 'function' && result != null) {
1075       return self.push(self.s.streamOptions.transform(result));
1076     }
1077
1078     // If we provided a map function
1079     if(self.cursorState.transforms && typeof self.cursorState.transforms.doc == 'function' && result != null) {
1080       return self.push(self.cursorState.transforms.doc(result));
1081     }
1082
1083     // Return the result
1084     self.push(result);
1085   });
1086 }
1087
1088 Object.defineProperty(Cursor.prototype, 'readPreference', {
1089   enumerable:true,
1090   get: function() {
1091     if (!this || !this.s) {
1092       return null;
1093     }
1094
1095     return this.s.options.readPreference;
1096   }
1097 });
1098
1099 Object.defineProperty(Cursor.prototype, 'namespace', {
1100   enumerable: true,
1101   get: function() {
1102     if (!this || !this.s) {
1103       return null;
1104     }
1105
1106     // TODO: refactor this logic into core
1107     var ns = this.s.ns || '';
1108     var firstDot = ns.indexOf('.');
1109     if (firstDot < 0) {
1110       return {
1111         database: this.s.ns,
1112         collection: ''
1113       };
1114     }
1115     return {
1116       database: ns.substr(0, firstDot),
1117       collection: ns.substr(firstDot + 1)
1118     };
1119   }
1120 });
1121
1122 /**
1123  * The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.
1124  * @function external:Readable#read
1125  * @param {number} size Optional argument to specify how much data to read.
1126  * @return {(String | Buffer | null)}
1127  */
1128
1129 /**
1130  * Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects.
1131  * @function external:Readable#setEncoding
1132  * @param {string} encoding The encoding to use.
1133  * @return {null}
1134  */
1135
1136 /**
1137  * This method will cause the readable stream to resume emitting data events.
1138  * @function external:Readable#resume
1139  * @return {null}
1140  */
1141
1142 /**
1143  * This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
1144  * @function external:Readable#pause
1145  * @return {null}
1146  */
1147
1148 /**
1149  * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
1150  * @function external:Readable#pipe
1151  * @param {Writable} destination The destination for writing data
1152  * @param {object} [options] Pipe options
1153  * @return {null}
1154  */
1155
1156 /**
1157  * This method will remove the hooks set up for a previous pipe() call.
1158  * @function external:Readable#unpipe
1159  * @param {Writable} [destination] The destination for writing data
1160  * @return {null}
1161  */
1162
1163 /**
1164  * This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
1165  * @function external:Readable#unshift
1166  * @param {(Buffer|string)} chunk Chunk of data to unshift onto the read queue.
1167  * @return {null}
1168  */
1169
1170 /**
1171  * Versions of Node prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See "Compatibility" below for more information.)
1172  * @function external:Readable#wrap
1173  * @param {Stream} stream An "old style" readable stream.
1174  * @return {null}
1175  */
1176
1177 Cursor.INIT = 0;
1178 Cursor.OPEN = 1;
1179 Cursor.CLOSED = 2;
1180 Cursor.GET_MORE = 3;
1181
1182 module.exports = Cursor;