3 var inherits = require('util').inherits
4 , f = require('util').format
5 , formattedOrderClause = require('./utils').formattedOrderClause
6 , handleCallback = require('./utils').handleCallback
7 , ReadPreference = require('./read_preference')
8 , MongoError = require('mongodb-core').MongoError
9 , Readable = require('stream').Readable || require('readable-stream').Readable
10 , Define = require('./metadata')
11 , CoreCursor = require('mongodb-core').Cursor
12 , Map = require('mongodb-core').BSON.Map
13 , Query = require('mongodb-core').Query
14 , CoreReadPreference = require('mongodb-core').ReadPreference;
17 * @fileOverview The **Cursor** class is an internal class that embodies a cursor on MongoDB
18 * allowing for iteration over the results returned from the underlying query. It supports
19 * one by one document iteration, conversion to an array or can be iterated as a Node 0.10.X
22 * **CURSORS Cannot directly be instantiated**
24 * var MongoClient = require('mongodb').MongoClient,
25 * test = require('assert');
27 * var url = 'mongodb://localhost:27017/test';
28 * // Connect using MongoClient
29 * MongoClient.connect(url, function(err, db) {
30 * // Create a collection we want to drop later
31 * var col = db.collection('createIndexExample1');
32 * // Insert a bunch of documents
33 * col.insert([{a:1, b:1}
34 * , {a:2, b:2}, {a:3, b:3}
35 * , {a:4, b:4}], {w:1}, function(err, result) {
36 * test.equal(null, err);
38 * // Show that duplicate records got dropped
39 * col.find({}).toArray(function(err, items) {
40 * test.equal(null, err);
41 * test.equal(4, items.length);
49 * Namespace provided by the mongodb-core and node.js
50 * @external CoreCursor
54 // Flags allowed for cursor
55 var flags = ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'exhaust', 'partial'];
56 var fields = ['numberOfRetries', 'tailableRetryInterval'];
57 var push = Array.prototype.push;
60 * Creates a new Cursor instance (INTERNAL TYPE, do not instantiate directly)
62 * @extends external:CoreCursor
63 * @extends external:Readable
64 * @property {string} sortValue Cursor query sort setting.
65 * @property {boolean} timeout Is Cursor able to time out.
66 * @property {ReadPreference} readPreference Get cursor ReadPreference.
70 * @fires Cursor#readable
71 * @return {Cursor} a Cursor instance.
73 * Cursor cursor options.
75 * collection.find({}).project({a:1}) // Create a projection of field a
76 * collection.find({}).skip(1).limit(10) // Skip 1 and limit 10
77 * collection.find({}).batchSize(5) // Set batchSize on cursor to 5
78 * collection.find({}).filter({a:1}) // Set query on the cursor
79 * collection.find({}).comment('add a comment') // Add a comment to the query, allowing to correlate queries
80 * collection.find({}).addCursorFlag('tailable', true) // Set cursor as tailable
81 * collection.find({}).addCursorFlag('oplogReplay', true) // Set cursor as oplogReplay
82 * collection.find({}).addCursorFlag('noCursorTimeout', true) // Set cursor as noCursorTimeout
83 * collection.find({}).addCursorFlag('awaitData', true) // Set cursor as awaitData
84 * collection.find({}).addCursorFlag('partial', true) // Set cursor as partial
85 * collection.find({}).addQueryModifier('$orderby', {a:1}) // Set $orderby {a:1}
86 * collection.find({}).max(10) // Set the cursor maxScan
87 * collection.find({}).maxScan(10) // Set the cursor maxScan
88 * collection.find({}).maxTimeMS(1000) // Set the cursor maxTimeMS
89 * collection.find({}).min(100) // Set the cursor min
90 * collection.find({}).returnKey(10) // Set the cursor returnKey
91 * collection.find({}).setReadPreference(ReadPreference.PRIMARY) // Set the cursor readPreference
92 * collection.find({}).showRecordId(true) // Set the cursor showRecordId
93 * collection.find({}).snapshot(true) // Set the cursor snapshot
94 * collection.find({}).sort([['a', 1]]) // Sets the sort order of the cursor query
95 * collection.find({}).hint('a_1') // Set the cursor hint
97 * All options are chainable, so one can do the following.
99 * collection.find({}).maxTimeMS(1000).maxScan(100).skip(1).toArray(..)
101 var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
102 CoreCursor.apply(this, Array.prototype.slice.call(arguments, 0));
104 var state = Cursor.INIT;
105 var streamOptions = {};
107 // Tailable cursor options
108 var numberOfRetries = options.numberOfRetries || 5;
109 var tailableRetryInterval = options.tailableRetryInterval || 500;
110 var currentNumberOfRetries = numberOfRetries;
112 // Get the promiseLibrary
113 var promiseLibrary = options.promiseLibrary;
115 // No promise library selected fall back
116 if(!promiseLibrary) {
117 promiseLibrary = typeof global.Promise == 'function' ?
118 global.Promise : require('es6-promise').Promise;
122 Readable.call(this, {objectMode: true});
124 // Internal cursor state
126 // Tailable cursor options
127 numberOfRetries: numberOfRetries
128 , tailableRetryInterval: tailableRetryInterval
129 , currentNumberOfRetries: currentNumberOfRetries
133 , streamOptions: streamOptions
145 , topologyOptions: topologyOptions
147 , promiseLibrary: promiseLibrary
152 // Translate correctly
153 if(self.s.options.noCursorTimeout == true) {
154 self.addCursorFlag('noCursorTimeout', true);
157 // Set the sort value
158 this.sortValue = self.s.cmd.sort;
162 * Cursor stream data event, fired for each document in the cursor.
169 * Cursor stream end event
176 * Cursor stream close event
178 * @event Cursor#close
183 * Cursor stream readable event
185 * @event Cursor#readable
189 // Inherit from Readable
190 inherits(Cursor, Readable);
192 // Map core cursor _next method so we can apply mapping
193 CoreCursor.prototype._next = CoreCursor.prototype.next;
195 for(var name in CoreCursor.prototype) {
196 Cursor.prototype[name] = CoreCursor.prototype[name];
199 var define = Cursor.define = new Define('Cursor', Cursor, true);
202 * Check if there is any document still available in the cursor
204 * @param {Cursor~resultCallback} [callback] The result callback.
205 * @throws {MongoError}
206 * @return {Promise} returns Promise if no callback passed
208 Cursor.prototype.hasNext = function(callback) {
211 // Execute using callback
212 if(typeof callback == 'function') {
213 if(self.s.currentDoc){
214 return callback(null, true);
216 return nextObject(self, function(err, doc) {
217 if(!doc) return callback(null, false);
218 self.s.currentDoc = doc;
219 callback(null, true);
225 return new this.s.promiseLibrary(function(resolve, reject) {
226 if(self.s.currentDoc){
229 nextObject(self, function(err, doc) {
230 if(self.s.state == Cursor.CLOSED || self.isDead()) return resolve(false);
231 if(err) return reject(err);
232 if(!doc) return resolve(false);
233 self.s.currentDoc = doc;
240 define.classMethod('hasNext', {callback: true, promise:true});
243 * Get the next available document from the cursor, returns null if no more documents are available.
245 * @param {Cursor~resultCallback} [callback] The result callback.
246 * @throws {MongoError}
247 * @return {Promise} returns Promise if no callback passed
249 Cursor.prototype.next = function(callback) {
252 // Execute using callback
253 if(typeof callback == 'function') {
254 // Return the currentDoc if someone called hasNext first
255 if(self.s.currentDoc) {
256 var doc = self.s.currentDoc;
257 self.s.currentDoc = null;
258 return callback(null, doc);
261 // Return the next object
262 return nextObject(self, callback)
266 return new this.s.promiseLibrary(function(resolve, reject) {
267 // Return the currentDoc if someone called hasNext first
268 if(self.s.currentDoc) {
269 var doc = self.s.currentDoc;
270 self.s.currentDoc = null;
274 nextObject(self, function(err, r) {
275 if(err) return reject(err);
281 define.classMethod('next', {callback: true, promise:true});
284 * Set the cursor query
286 * @param {object} filter The filter object used for the cursor.
289 Cursor.prototype.filter = function(filter) {
290 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
291 this.s.cmd.query = filter;
295 define.classMethod('filter', {callback: false, promise:false, returns: [Cursor]});
298 * Set the cursor maxScan
300 * @param {object} maxScan Constrains the query to only scan the specified number of documents when fulfilling the query
303 Cursor.prototype.maxScan = function(maxScan) {
304 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
305 this.s.cmd.maxScan = maxScan;
309 define.classMethod('maxScan', {callback: false, promise:false, returns: [Cursor]});
312 * Set the cursor hint
314 * @param {object} hint If specified, then the query system will only consider plans using the hinted index.
317 Cursor.prototype.hint = function(hint) {
318 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
319 this.s.cmd.hint = hint;
323 define.classMethod('hint', {callback: false, promise:false, returns: [Cursor]});
328 * @param {object} min Specify a $min value to specify the inclusive lower bound for a specific index in order to constrain the results of find(). The $min specifies the lower bound for all keys of a specific index in order.
331 Cursor.prototype.min = function(min) {
332 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
333 this.s.cmd.min = min;
337 define.classMethod('min', {callback: false, promise:false, returns: [Cursor]});
342 * @param {object} max Specify a $max value to specify the exclusive upper bound for a specific index in order to constrain the results of find(). The $max specifies the upper bound for all keys of a specific index in order.
345 Cursor.prototype.max = function(max) {
346 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
347 this.s.cmd.max = max;
351 define.classMethod('max', {callback: false, promise:false, returns: [Cursor]});
354 * Set the cursor returnKey
356 * @param {object} returnKey Only return the index field or fields for the results of the query. If $returnKey is set to true and the query does not use an index to perform the read operation, the returned documents will not contain any fields. Use one of the following forms:
359 Cursor.prototype.returnKey = function(value) {
360 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
361 this.s.cmd.returnKey = value;
365 define.classMethod('returnKey', {callback: false, promise:false, returns: [Cursor]});
368 * Set the cursor showRecordId
370 * @param {object} showRecordId The $showDiskLoc option has now been deprecated and replaced with the showRecordId field. $showDiskLoc will still be accepted for OP_QUERY stye find.
373 Cursor.prototype.showRecordId = function(value) {
374 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
375 this.s.cmd.showDiskLoc = value;
379 define.classMethod('showRecordId', {callback: false, promise:false, returns: [Cursor]});
382 * Set the cursor snapshot
384 * @param {object} snapshot The $snapshot operator prevents the cursor from returning a document more than once because an intervening write operation results in a move of the document.
387 Cursor.prototype.snapshot = function(value) {
388 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
389 this.s.cmd.snapshot = value;
393 define.classMethod('snapshot', {callback: false, promise:false, returns: [Cursor]});
396 * Set a node.js specific cursor option
398 * @param {string} field The cursor option to set ['numberOfRetries', 'tailableRetryInterval'].
399 * @param {object} value The field value.
400 * @throws {MongoError}
403 Cursor.prototype.setCursorOption = function(field, value) {
404 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
405 if(fields.indexOf(field) == -1) throw MongoError.create({message: f("option %s not a supported option %s", field, fields), driver:true });
406 this.s[field] = value;
407 if(field == 'numberOfRetries')
408 this.s.currentNumberOfRetries = value;
412 define.classMethod('setCursorOption', {callback: false, promise:false, returns: [Cursor]});
415 * Add a cursor flag to the cursor
417 * @param {string} flag The flag to set, must be one of following ['tailable', 'oplogReplay', 'noCursorTimeout', 'awaitData', 'partial'].
418 * @param {boolean} value The flag boolean value.
419 * @throws {MongoError}
422 Cursor.prototype.addCursorFlag = function(flag, value) {
423 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
424 if(flags.indexOf(flag) == -1) throw MongoError.create({message: f("flag %s not a supported flag %s", flag, flags), driver:true });
425 if(typeof value != 'boolean') throw MongoError.create({message: f("flag %s must be a boolean value", flag), driver:true});
426 this.s.cmd[flag] = value;
430 define.classMethod('addCursorFlag', {callback: false, promise:false, returns: [Cursor]});
433 * Add a query modifier to the cursor query
435 * @param {string} name The query modifier (must start with $, such as $orderby etc)
436 * @param {boolean} value The flag boolean value.
437 * @throws {MongoError}
440 Cursor.prototype.addQueryModifier = function(name, value) {
441 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
442 if(name[0] != '$') throw MongoError.create({message: f("%s is not a valid query modifier"), driver:true});
444 var field = name.substr(1);
445 // Set on the command
446 this.s.cmd[field] = value;
447 // Deal with the special case for sort
448 if(field == 'orderby') this.s.cmd.sort = this.s.cmd[field];
452 define.classMethod('addQueryModifier', {callback: false, promise:false, returns: [Cursor]});
455 * Add a comment to the cursor query allowing for tracking the comment in the log.
457 * @param {string} value The comment attached to this query.
458 * @throws {MongoError}
461 Cursor.prototype.comment = function(value) {
462 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
463 this.s.cmd.comment = value;
467 define.classMethod('comment', {callback: false, promise:false, returns: [Cursor]});
470 * Set a maxAwaitTimeMS on a tailing cursor query to allow to customize the timeout value for the option awaitData (Only supported on MongoDB 3.2 or higher, ignored otherwise)
472 * @param {number} value Number of milliseconds to wait before aborting the tailed query.
473 * @throws {MongoError}
476 Cursor.prototype.maxAwaitTimeMS = function(value) {
477 if(typeof value != 'number') throw MongoError.create({message: "maxAwaitTimeMS must be a number", driver:true});
478 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
479 this.s.cmd.maxAwaitTimeMS = value;
483 define.classMethod('maxAwaitTimeMS', {callback: false, promise:false, returns: [Cursor]});
486 * Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
488 * @param {number} value Number of milliseconds to wait before aborting the query.
489 * @throws {MongoError}
492 Cursor.prototype.maxTimeMS = function(value) {
493 if(typeof value != 'number') throw MongoError.create({message: "maxTimeMS must be a number", driver:true});
494 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
495 this.s.cmd.maxTimeMS = value;
499 define.classMethod('maxTimeMS', {callback: false, promise:false, returns: [Cursor]});
501 Cursor.prototype.maxTimeMs = Cursor.prototype.maxTimeMS;
503 define.classMethod('maxTimeMs', {callback: false, promise:false, returns: [Cursor]});
506 * Sets a field projection for the query.
508 * @param {object} value The field projection object.
509 * @throws {MongoError}
512 Cursor.prototype.project = function(value) {
513 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
514 this.s.cmd.fields = value;
518 define.classMethod('project', {callback: false, promise:false, returns: [Cursor]});
521 * Sets the sort order of the cursor query.
523 * @param {(string|array|object)} keyOrList The key or keys set for the sort.
524 * @param {number} [direction] The direction of the sorting (1 or -1).
525 * @throws {MongoError}
528 Cursor.prototype.sort = function(keyOrList, direction) {
529 if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support sorting", driver:true});
530 if(this.s.state == Cursor.CLOSED || this.s.state == Cursor.OPEN || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
531 var order = keyOrList;
533 // We have an array of arrays, we need to preserve the order of the sort
534 // so we will us a Map
535 if(Array.isArray(order) && Array.isArray(order[0])) {
536 order = new Map(order.map(function(x) {
537 var value = [x[0], null];
540 } else if(x[1] == 'desc') {
542 } else if(x[1] == 1 || x[1] == -1) {
545 throw new MongoError("Illegal sort clause, must be of the form [['field1', '(ascending|descending)'], ['field2', '(ascending|descending)']]");
552 if(direction != null) {
553 order = [[keyOrList, direction]];
556 this.s.cmd.sort = order;
557 this.sortValue = order;
561 define.classMethod('sort', {callback: false, promise:false, returns: [Cursor]});
564 * Set the batch size for the cursor.
566 * @param {number} value The batchSize for the cursor.
567 * @throws {MongoError}
570 Cursor.prototype.batchSize = function(value) {
571 if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support batchSize", driver:true});
572 if(this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
573 if(typeof value != 'number') throw MongoError.create({message: "batchSize requires an integer", driver:true});
574 this.s.cmd.batchSize = value;
575 this.setCursorBatchSize(value);
579 define.classMethod('batchSize', {callback: false, promise:false, returns: [Cursor]});
582 * Set the collation options for the cursor.
584 * @param {object} value The cursor collation options (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
585 * @throws {MongoError}
588 Cursor.prototype.collation = function(value) {
589 this.s.cmd.collation = value;
593 define.classMethod('collation', {callback: false, promise:false, returns: [Cursor]});
596 * Set the limit for the cursor.
598 * @param {number} value The limit for the cursor query.
599 * @throws {MongoError}
602 Cursor.prototype.limit = function(value) {
603 if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support limit", driver:true});
604 if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
605 if(typeof value != 'number') throw MongoError.create({message: "limit requires an integer", driver:true});
606 this.s.cmd.limit = value;
607 // this.cursorLimit = value;
608 this.setCursorLimit(value);
612 define.classMethod('limit', {callback: false, promise:false, returns: [Cursor]});
615 * Set the skip for the cursor.
617 * @param {number} value The skip for the cursor query.
618 * @throws {MongoError}
621 Cursor.prototype.skip = function(value) {
622 if(this.s.options.tailable) throw MongoError.create({message: "Tailable cursor doesn't support skip", driver:true});
623 if(this.s.state == Cursor.OPEN || this.s.state == Cursor.CLOSED || this.isDead()) throw MongoError.create({message: "Cursor is closed", driver:true});
624 if(typeof value != 'number') throw MongoError.create({message: "skip requires an integer", driver:true});
625 this.s.cmd.skip = value;
626 this.setCursorSkip(value);
630 define.classMethod('skip', {callback: false, promise:false, returns: [Cursor]});
633 * The callback format for results
634 * @callback Cursor~resultCallback
635 * @param {MongoError} error An error instance representing the error during the execution.
636 * @param {(object|null|boolean)} result The result object if the command was executed successfully.
641 * @function external:CoreCursor#clone
647 * @function external:CoreCursor#rewind
652 * Get the next available document from the cursor, returns null if no more documents are available.
654 * @param {Cursor~resultCallback} [callback] The result callback.
655 * @throws {MongoError}
657 * @return {Promise} returns Promise if no callback passed
659 Cursor.prototype.nextObject = Cursor.prototype.next;
661 var nextObject = function(self, callback) {
662 if(self.s.state == Cursor.CLOSED || self.isDead && self.isDead()) return handleCallback(callback, MongoError.create({message: "Cursor is closed", driver:true}));
663 if(self.s.state == Cursor.INIT && self.s.cmd.sort) {
665 self.s.cmd.sort = formattedOrderClause(self.s.cmd.sort);
667 return handleCallback(callback, err);
671 // Get the next object
672 self._next(function(err, doc) {
673 self.s.state = Cursor.OPEN;
674 if(err) return handleCallback(callback, err);
675 handleCallback(callback, null, doc);
679 define.classMethod('nextObject', {callback: true, promise:true});
681 // Trampoline emptying the number of retrieved items
682 // without incurring a nextTick operation
683 var loop = function(self, callback) {
684 // No more items we are done
685 if(self.bufferedCount() == 0) return;
686 // Get the next document
687 self._next(callback);
692 Cursor.prototype.next = Cursor.prototype.nextObject;
694 define.classMethod('next', {callback: true, promise:true});
697 * Iterates over all the documents for this cursor. As with **{cursor.toArray}**,
698 * not all of the elements will be iterated if this cursor had been previouly accessed.
699 * In that case, **{cursor.rewind}** can be used to reset the cursor. However, unlike
700 * **{cursor.toArray}**, the cursor will only hold a maximum of batch size elements
701 * at any given time if batch size is specified. Otherwise, the caller is responsible
702 * for making sure that the entire result can fit the memory.
705 * @param {Cursor~resultCallback} callback The result callback.
706 * @throws {MongoError}
709 Cursor.prototype.each = function(callback) {
710 // Rewind cursor state
712 // Set current cursor to INIT
713 this.s.state = Cursor.INIT;
715 _each(this, callback);
718 define.classMethod('each', {callback: true, promise:false});
721 var _each = function(self, callback) {
722 if(!callback) throw MongoError.create({message: 'callback is mandatory', driver:true});
723 if(self.isNotified()) return;
724 if(self.s.state == Cursor.CLOSED || self.isDead()) {
725 return handleCallback(callback, MongoError.create({message: "Cursor is closed", driver:true}));
728 if(self.s.state == Cursor.INIT) self.s.state = Cursor.OPEN;
730 // Define function to avoid global scope escape
732 // Trampoline all the entries
733 if(self.bufferedCount() > 0) {
734 while(fn = loop(self, callback)) fn(self, callback);
735 _each(self, callback);
737 self.next(function(err, item) {
738 if(err) return handleCallback(callback, err);
740 self.s.state = Cursor.CLOSED;
741 return handleCallback(callback, null, null);
744 if(handleCallback(callback, null, item) == false) return;
745 _each(self, callback);
751 * The callback format for the forEach iterator method
752 * @callback Cursor~iteratorCallback
753 * @param {Object} doc An emitted document for the iterator
757 * The callback error format for the forEach iterator method
758 * @callback Cursor~endCallback
759 * @param {MongoError} error An error instance representing the error during the execution.
763 * Iterates over all the documents for this cursor using the iterator, callback pattern.
765 * @param {Cursor~iteratorCallback} iterator The iteration callback.
766 * @param {Cursor~endCallback} callback The end callback.
767 * @throws {MongoError}
770 Cursor.prototype.forEach = function(iterator, callback) {
771 this.each(function(err, doc){
772 if(err) { callback(err); return false; }
773 if(doc != null) { iterator(doc); return true; }
774 if(doc == null && callback) {
775 var internalCallback = callback;
777 internalCallback(null);
783 define.classMethod('forEach', {callback: true, promise:false});
786 * Set the ReadPreference for the cursor.
788 * @param {(string|ReadPreference)} readPreference The new read preference for the cursor.
789 * @throws {MongoError}
792 Cursor.prototype.setReadPreference = function(r) {
793 if(this.s.state != Cursor.INIT) throw MongoError.create({message: 'cannot change cursor readPreference after cursor has been accessed', driver:true});
794 if(r instanceof ReadPreference) {
795 this.s.options.readPreference = new CoreReadPreference(r.mode, r.tags, {maxStalenessMS: r.maxStalenessMS});
796 } else if(typeof r == 'string'){
797 this.s.options.readPreference = new CoreReadPreference(r);
798 } else if(r instanceof CoreReadPreference) {
799 this.s.options.readPreference = r;
805 define.classMethod('setReadPreference', {callback: false, promise:false, returns: [Cursor]});
808 * The callback format for results
809 * @callback Cursor~toArrayResultCallback
810 * @param {MongoError} error An error instance representing the error during the execution.
811 * @param {object[]} documents All the documents the satisfy the cursor.
815 * Returns an array of documents. The caller is responsible for making sure that there
816 * is enough memory to store the results. Note that the array only contain partial
817 * results when this cursor had been previouly accessed. In that case,
818 * cursor.rewind() can be used to reset the cursor.
820 * @param {Cursor~toArrayResultCallback} [callback] The result callback.
821 * @throws {MongoError}
822 * @return {Promise} returns Promise if no callback passed
824 Cursor.prototype.toArray = function(callback) {
826 if(self.s.options.tailable) throw MongoError.create({message: 'Tailable cursor cannot be converted to array', driver:true});
828 // Execute using callback
829 if(typeof callback == 'function') return toArray(self, callback);
832 return new this.s.promiseLibrary(function(resolve, reject) {
833 toArray(self, function(err, r) {
834 if(err) return reject(err);
840 var toArray = function(self, callback) {
845 self.s.state = Cursor.INIT;
847 // Fetch all the documents
848 var fetchDocs = function() {
849 self._next(function(err, doc) {
850 if(err) return handleCallback(callback, err);
852 self.s.state = Cursor.CLOSED;
853 return handleCallback(callback, null, items);
859 // Get all buffered objects
860 if(self.bufferedCount() > 0) {
861 var docs = self.readBufferedDocuments(self.bufferedCount())
863 // Transform the doc if transform method added
864 if(self.s.transforms && typeof self.s.transforms.doc == 'function') {
865 docs = docs.map(self.s.transforms.doc);
868 push.apply(items, docs);
879 define.classMethod('toArray', {callback: true, promise:true});
882 * The callback format for results
883 * @callback Cursor~countResultCallback
884 * @param {MongoError} error An error instance representing the error during the execution.
885 * @param {number} count The count of documents.
889 * Get the count of documents for this cursor
891 * @param {boolean} [applySkipLimit=true] Should the count command apply limit and skip settings on the cursor or in the passed in options.
892 * @param {object} [options=null] Optional settings.
893 * @param {number} [options.skip=null] The number of documents to skip.
894 * @param {number} [options.limit=null] The maximum amounts to count before aborting.
895 * @param {number} [options.maxTimeMS=null] Number of miliseconds to wait before aborting the query.
896 * @param {string} [options.hint=null] An index name hint for the query.
897 * @param {(ReadPreference|string)} [options.readPreference=null] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
898 * @param {Cursor~countResultCallback} [callback] The result callback.
899 * @return {Promise} returns Promise if no callback passed
901 Cursor.prototype.count = function(applySkipLimit, opts, callback) {
903 if(self.s.cmd.query == null) throw MongoError.create({message: "count can only be used with find command", driver:true});
904 if(typeof opts == 'function') callback = opts, opts = {};
907 // Execute using callback
908 if(typeof callback == 'function') return count(self, applySkipLimit, opts, callback);
911 return new this.s.promiseLibrary(function(resolve, reject) {
912 count(self, applySkipLimit, opts, function(err, r) {
913 if(err) return reject(err);
919 var count = function(self, applySkipLimit, opts, callback) {
920 if(typeof applySkipLimit == 'function') {
921 callback = applySkipLimit;
922 applySkipLimit = true;
926 if(typeof self.cursorSkip() == 'number') opts.skip = self.cursorSkip();
927 if(typeof self.cursorLimit() == 'number') opts.limit = self.cursorLimit();
931 var delimiter = self.s.ns.indexOf('.');
934 'count': self.s.ns.substr(delimiter+1), 'query': self.s.cmd.query
937 if(typeof opts.maxTimeMS == 'number') {
938 command.maxTimeMS = opts.maxTimeMS;
939 } else if(self.s.cmd && typeof self.s.cmd.maxTimeMS == 'number') {
940 command.maxTimeMS = self.s.cmd.maxTimeMS;
943 // Merge in any options
944 if(opts.skip) command.skip = opts.skip;
945 if(opts.limit) command.limit = opts.limit;
946 if(self.s.options.hint) command.hint = self.s.options.hint;
948 // Execute the command
949 self.topology.command(f("%s.$cmd", self.s.ns.substr(0, delimiter))
950 , command, function(err, result) {
951 callback(err, result ? result.result.n : null)
955 define.classMethod('count', {callback: true, promise:true});
958 * Close the cursor, sending a KillCursor command and emitting close.
960 * @param {Cursor~resultCallback} [callback] The result callback.
961 * @return {Promise} returns Promise if no callback passed
963 Cursor.prototype.close = function(callback) {
964 this.s.state = Cursor.CLOSED;
967 // Emit the close event for the cursor
969 // Callback if provided
970 if(typeof callback == 'function') return handleCallback(callback, null, this);
972 return new this.s.promiseLibrary(function(resolve, reject) {
977 define.classMethod('close', {callback: true, promise:true});
980 * Map all documents using the provided function
982 * @param {function} [transform] The mapping transformation method.
985 Cursor.prototype.map = function(transform) {
986 this.cursorState.transforms = { doc: transform };
990 define.classMethod('map', {callback: false, promise:false, returns: [Cursor]});
993 * Is the cursor closed
997 Cursor.prototype.isClosed = function() {
998 return this.isDead();
1001 define.classMethod('isClosed', {callback: false, promise:false, returns: [Boolean]});
1003 Cursor.prototype.destroy = function(err) {
1004 if(err) this.emit('error', err);
1009 define.classMethod('destroy', {callback: false, promise:false});
1012 * Return a modified Readable stream including a possible transform method.
1014 * @param {object} [options=null] Optional settings.
1015 * @param {function} [options.transform=null] A transformation method applied to each document emitted by the stream.
1018 Cursor.prototype.stream = function(options) {
1019 this.s.streamOptions = options || {};
1023 define.classMethod('stream', {callback: false, promise:false, returns: [Cursor]});
1026 * Execute the explain for the cursor
1028 * @param {Cursor~resultCallback} [callback] The result callback.
1029 * @return {Promise} returns Promise if no callback passed
1031 Cursor.prototype.explain = function(callback) {
1033 this.s.cmd.explain = true;
1035 // Do we have a readConcern
1036 if(this.s.cmd.readConcern) {
1037 delete this.s.cmd['readConcern'];
1040 // Execute using callback
1041 if(typeof callback == 'function') return this._next(callback);
1044 return new this.s.promiseLibrary(function(resolve, reject) {
1045 self._next(function(err, r) {
1046 if(err) return reject(err);
1052 define.classMethod('explain', {callback: true, promise:true});
1054 Cursor.prototype._read = function(n) {
1056 if(self.s.state == Cursor.CLOSED || self.isDead()) {
1057 return self.push(null);
1060 // Get the next item
1061 self.nextObject(function(err, result) {
1063 if(self.listeners('error') && self.listeners('error').length > 0) {
1064 self.emit('error', err);
1066 if(!self.isDead()) self.close();
1070 return self.emit('finish');
1073 // If we provided a transformation method
1074 if(typeof self.s.streamOptions.transform == 'function' && result != null) {
1075 return self.push(self.s.streamOptions.transform(result));
1078 // If we provided a map function
1079 if(self.cursorState.transforms && typeof self.cursorState.transforms.doc == 'function' && result != null) {
1080 return self.push(self.cursorState.transforms.doc(result));
1083 // Return the result
1088 Object.defineProperty(Cursor.prototype, 'readPreference', {
1091 if (!this || !this.s) {
1095 return this.s.options.readPreference;
1099 Object.defineProperty(Cursor.prototype, 'namespace', {
1102 if (!this || !this.s) {
1106 // TODO: refactor this logic into core
1107 var ns = this.s.ns || '';
1108 var firstDot = ns.indexOf('.');
1111 database: this.s.ns,
1116 database: ns.substr(0, firstDot),
1117 collection: ns.substr(firstDot + 1)
1123 * The read() method pulls some data out of the internal buffer and returns it. If there is no data available, then it will return null.
1124 * @function external:Readable#read
1125 * @param {number} size Optional argument to specify how much data to read.
1126 * @return {(String | Buffer | null)}
1130 * Call this function to cause the stream to return strings of the specified encoding instead of Buffer objects.
1131 * @function external:Readable#setEncoding
1132 * @param {string} encoding The encoding to use.
1137 * This method will cause the readable stream to resume emitting data events.
1138 * @function external:Readable#resume
1143 * This method will cause a stream in flowing-mode to stop emitting data events. Any data that becomes available will remain in the internal buffer.
1144 * @function external:Readable#pause
1149 * This method pulls all the data out of a readable stream, and writes it to the supplied destination, automatically managing the flow so that the destination is not overwhelmed by a fast readable stream.
1150 * @function external:Readable#pipe
1151 * @param {Writable} destination The destination for writing data
1152 * @param {object} [options] Pipe options
1157 * This method will remove the hooks set up for a previous pipe() call.
1158 * @function external:Readable#unpipe
1159 * @param {Writable} [destination] The destination for writing data
1164 * This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.
1165 * @function external:Readable#unshift
1166 * @param {(Buffer|string)} chunk Chunk of data to unshift onto the read queue.
1171 * Versions of Node prior to v0.10 had streams that did not implement the entire Streams API as it is today. (See "Compatibility" below for more information.)
1172 * @function external:Readable#wrap
1173 * @param {Stream} stream An "old style" readable stream.
1180 Cursor.GET_MORE = 3;
1182 module.exports = Cursor;