3 var common = require('./common')
4 , utils = require('../utils')
5 , toError = require('../utils').toError
6 , f = require('util').format
7 , handleCallback = require('../utils').handleCallback
8 , shallowClone = utils.shallowClone
9 , WriteError = common.WriteError
10 , BulkWriteResult = common.BulkWriteResult
11 , LegacyOp = common.LegacyOp
12 , ObjectID = require('mongodb-core').BSON.ObjectID
13 , Define = require('../metadata')
14 , BSON = require('mongodb-core').BSON
15 , Batch = common.Batch
16 , mergeBatchResults = common.mergeBatchResults;
18 var bson = new BSON.BSONPure();
21 * Create a FindOperatorsOrdered instance (INTERNAL TYPE, do not instantiate directly)
23 * @return {FindOperatorsOrdered} a FindOperatorsOrdered instance.
25 var FindOperatorsOrdered = function(self) {
30 * Add a single update document to the bulk operation
33 * @param {object} doc update operations
34 * @throws {MongoError}
35 * @return {OrderedBulkOperation}
37 FindOperatorsOrdered.prototype.update = function(updateDocument) {
39 var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
41 // Establish the update command
43 q: this.s.currentOp.selector
49 // Clear out current Op
50 this.s.currentOp = null;
51 // Add the update document to the list
52 return addToOperationsList(this, common.UPDATE, document);
56 * Add a single update one document to the bulk operation
59 * @param {object} doc update operations
60 * @throws {MongoError}
61 * @return {OrderedBulkOperation}
63 FindOperatorsOrdered.prototype.updateOne = function(updateDocument) {
65 var upsert = typeof this.s.currentOp.upsert == 'boolean' ? this.s.currentOp.upsert : false;
67 // Establish the update command
69 q: this.s.currentOp.selector
75 // Clear out current Op
76 this.s.currentOp = null;
77 // Add the update document to the list
78 return addToOperationsList(this, common.UPDATE, document);
82 * Add a replace one operation to the bulk operation
85 * @param {object} doc the new document to replace the existing one with
86 * @throws {MongoError}
87 * @return {OrderedBulkOperation}
89 FindOperatorsOrdered.prototype.replaceOne = function(updateDocument) {
90 this.updateOne(updateDocument);
94 * Upsert modifier for update bulk operation
97 * @throws {MongoError}
98 * @return {FindOperatorsOrdered}
100 FindOperatorsOrdered.prototype.upsert = function() {
101 this.s.currentOp.upsert = true;
106 * Add a remove one operation to the bulk operation
109 * @throws {MongoError}
110 * @return {OrderedBulkOperation}
112 FindOperatorsOrdered.prototype.deleteOne = function() {
113 // Establish the update command
115 q: this.s.currentOp.selector
119 // Clear out current Op
120 this.s.currentOp = null;
121 // Add the remove document to the list
122 return addToOperationsList(this, common.REMOVE, document);
125 // Backward compatibility
126 FindOperatorsOrdered.prototype.removeOne = FindOperatorsOrdered.prototype.deleteOne;
129 * Add a remove operation to the bulk operation
132 * @throws {MongoError}
133 * @return {OrderedBulkOperation}
135 FindOperatorsOrdered.prototype.delete = function() {
136 // Establish the update command
138 q: this.s.currentOp.selector
142 // Clear out current Op
143 this.s.currentOp = null;
144 // Add the remove document to the list
145 return addToOperationsList(this, common.REMOVE, document);
148 // Backward compatibility
149 FindOperatorsOrdered.prototype.remove = FindOperatorsOrdered.prototype.delete;
151 // Add to internal list of documents
152 var addToOperationsList = function(_self, docType, document) {
154 var bsonSize = bson.calculateObjectSize(document, false);
156 // Throw error if the doc is bigger than the max BSON size
157 if(bsonSize >= _self.s.maxBatchSizeBytes) {
158 throw toError("document is larger than the maximum size " + _self.s.maxBatchSizeBytes);
161 // Create a new batch object if we don't have a current one
162 if(_self.s.currentBatch == null) _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
164 // Check if we need to create a new batch
165 if(((_self.s.currentBatchSize + 1) >= _self.s.maxWriteBatchSize)
166 || ((_self.s.currentBatchSizeBytes + _self.s.currentBatchSizeBytes) >= _self.s.maxBatchSizeBytes)
167 || (_self.s.currentBatch.batchType != docType)) {
168 // Save the batch to the execution stack
169 _self.s.batches.push(_self.s.currentBatch);
171 // Create a new batch
172 _self.s.currentBatch = new Batch(docType, _self.s.currentIndex);
174 // Reset the current size trackers
175 _self.s.currentBatchSize = 0;
176 _self.s.currentBatchSizeBytes = 0;
178 // Update current batch size
179 _self.s.currentBatchSize = _self.s.currentBatchSize + 1;
180 _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
183 if(docType == common.INSERT) {
184 _self.s.bulkResult.insertedIds.push({index: _self.s.currentIndex, _id: document._id});
187 // We have an array of documents
188 if(Array.isArray(document)) {
189 throw toError("operation passed in cannot be an Array");
191 _self.s.currentBatch.originalIndexes.push(_self.s.currentIndex);
192 _self.s.currentBatch.operations.push(document)
193 _self.s.currentBatchSizeBytes = _self.s.currentBatchSizeBytes + bsonSize;
194 _self.s.currentIndex = _self.s.currentIndex + 1;
202 * Create a new OrderedBulkOperation instance (INTERNAL TYPE, do not instantiate directly)
204 * @property {number} length Get the number of operations in the bulk.
205 * @return {OrderedBulkOperation} a OrderedBulkOperation instance.
207 function OrderedBulkOperation(topology, collection, options) {
208 options = options == null ? {} : options;
209 // TODO Bring from driver information in isMaster
211 var executed = false;
214 var currentOp = null;
216 // Handle to the bson serializer, used to calculate running sizes
217 var bson = topology.bson;
219 // Namespace for the operation
220 var namespace = collection.collectionName;
223 var maxBatchSizeBytes = topology.isMasterDoc && topology.isMasterDoc.maxBsonObjectSize
224 ? topology.isMasterDoc.maxBsonObjectSize : (1024*1025*16);
225 var maxWriteBatchSize = topology.isMasterDoc && topology.isMasterDoc.maxWriteBatchSize
226 ? topology.isMasterDoc.maxWriteBatchSize : 1000;
228 // Get the write concern
229 var writeConcern = common.writeConcern(shallowClone(options), collection, options);
231 // Get the promiseLibrary
232 var promiseLibrary = options.promiseLibrary;
234 // No promise library selected fall back
235 if(!promiseLibrary) {
236 promiseLibrary = typeof global.Promise == 'function' ?
237 global.Promise : require('es6-promise').Promise;
241 var currentBatch = null;
242 var currentIndex = 0;
243 var currentBatchSize = 0;
244 var currentBatchSizeBytes = 0;
251 , writeConcernErrors: []
264 bulkResult: bulkResult
265 // Current batch state
268 , currentBatchSize: 0
269 , currentBatchSizeBytes: 0
272 , writeConcern: writeConcern
273 // Max batch size options
274 , maxBatchSizeBytes: maxBatchSizeBytes
275 , maxWriteBatchSize: maxWriteBatchSize
277 , namespace: namespace
285 , currentOp: currentOp
289 , collection: collection
291 , promiseLibrary: promiseLibrary
295 , bypassDocumentValidation: typeof options.bypassDocumentValidation == 'boolean' ? options.bypassDocumentValidation : false
299 var define = OrderedBulkOperation.define = new Define('OrderedBulkOperation', OrderedBulkOperation, false);
301 OrderedBulkOperation.prototype.raw = function(op) {
302 var key = Object.keys(op)[0];
304 // Set up the force server object id
305 var forceServerObjectId = typeof this.s.options.forceServerObjectId == 'boolean'
306 ? this.s.options.forceServerObjectId : this.s.collection.s.db.options.forceServerObjectId;
309 if((op.updateOne && op.updateOne.q)
310 || (op.updateMany && op.updateMany.q)
311 || (op.replaceOne && op.replaceOne.q)) {
312 op[key].multi = op.updateOne || op.replaceOne ? false : true;
313 return addToOperationsList(this, common.UPDATE, op[key]);
316 // Crud spec update format
317 if(op.updateOne || op.updateMany || op.replaceOne) {
318 var multi = op.updateOne || op.replaceOne ? false : true;
319 var operation = {q: op[key].filter, u: op[key].update || op[key].replacement, multi: multi}
320 operation.upsert = op[key].upsert ? true: false;
321 if(op.collation) operation.collation = op.collation;
322 return addToOperationsList(this, common.UPDATE, operation);
326 if(op.removeOne || op.removeMany || (op.deleteOne && op.deleteOne.q) || op.deleteMany && op.deleteMany.q) {
327 op[key].limit = op.removeOne ? 1 : 0;
328 return addToOperationsList(this, common.REMOVE, op[key]);
331 // Crud spec delete operations, less efficient
332 if(op.deleteOne || op.deleteMany) {
333 var limit = op.deleteOne ? 1 : 0;
334 var operation = {q: op[key].filter, limit: limit}
335 if(op.collation) operation.collation = op.collation;
336 return addToOperationsList(this, common.REMOVE, operation);
340 if(op.insertOne && op.insertOne.document == null) {
341 if(forceServerObjectId !== true && op.insertOne._id == null) op.insertOne._id = new ObjectID();
342 return addToOperationsList(this, common.INSERT, op.insertOne);
343 } else if(op.insertOne && op.insertOne.document) {
344 if(forceServerObjectId !== true && op.insertOne.document._id == null) op.insertOne.document._id = new ObjectID();
345 return addToOperationsList(this, common.INSERT, op.insertOne.document);
349 for(var i = 0; i < op.insertMany.length; i++) {
350 if(forceServerObjectId !== true && op.insertMany[i]._id == null) op.insertMany[i]._id = new ObjectID();
351 addToOperationsList(this, common.INSERT, op.insertMany[i]);
357 // No valid type of operation
358 throw toError("bulkWrite only supports insertOne, insertMany, updateOne, updateMany, removeOne, removeMany, deleteOne, deleteMany");
362 * Add a single insert document to the bulk operation
364 * @param {object} doc the document to insert
365 * @throws {MongoError}
366 * @return {OrderedBulkOperation}
368 OrderedBulkOperation.prototype.insert = function(document) {
369 if(this.s.collection.s.db.options.forceServerObjectId !== true && document._id == null) document._id = new ObjectID();
370 return addToOperationsList(this, common.INSERT, document);
374 * Initiate a find operation for an update/updateOne/remove/removeOne/replaceOne
377 * @param {object} selector The selector for the bulk operation.
378 * @throws {MongoError}
379 * @return {FindOperatorsOrdered}
381 OrderedBulkOperation.prototype.find = function(selector) {
383 throw toError("Bulk find operation must specify a selector");
386 // Save a current selector
391 return new FindOperatorsOrdered(this);
394 Object.defineProperty(OrderedBulkOperation.prototype, 'length', {
397 return this.s.currentIndex;
402 // Execute next write command in a chain
403 var executeCommands = function(self, callback) {
404 if(self.s.batches.length == 0) {
405 return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
408 // Ordered execution of the command
409 var batch = self.s.batches.shift();
411 var resultHandler = function(err, result) {
412 // Error is a driver related error not a bulk op error, terminate
413 if(err && err.driver || err && err.message) {
414 return handleCallback(callback, err);
417 // If we have and error
419 // Merge the results together
420 var mergeResult = mergeBatchResults(true, batch, self.s.bulkResult, err, result);
421 if(mergeResult != null) {
422 return handleCallback(callback, null, new BulkWriteResult(self.s.bulkResult));
425 // If we are ordered and have errors and they are
426 // not all replication errors terminate the operation
427 if(self.s.bulkResult.writeErrors.length > 0) {
428 return handleCallback(callback, toError(self.s.bulkResult.writeErrors[0]), new BulkWriteResult(self.s.bulkResult));
431 // Execute the next command in line
432 executeCommands(self, callback);
435 var finalOptions = {ordered: true}
436 if(self.s.writeConcern != null) {
437 finalOptions.writeConcern = self.s.writeConcern;
440 // Set an operationIf if provided
441 if(self.operationId) {
442 resultHandler.operationId = self.operationId;
445 // Serialize functions
446 if(self.s.options.serializeFunctions) {
447 finalOptions.serializeFunctions = true
450 // Serialize functions
451 if(self.s.options.ignoreUndefined) {
452 finalOptions.ignoreUndefined = true
455 // Is the bypassDocumentValidation options specific
456 if(self.s.bypassDocumentValidation == true) {
457 finalOptions.bypassDocumentValidation = true;
461 if(batch.batchType == common.INSERT) {
462 self.s.topology.insert(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
463 } else if(batch.batchType == common.UPDATE) {
464 self.s.topology.update(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
465 } else if(batch.batchType == common.REMOVE) {
466 self.s.topology.remove(self.s.collection.namespace, batch.operations, finalOptions, resultHandler);
469 // Force top level error
471 // Merge top level error and return
472 handleCallback(callback, null, mergeBatchResults(false, batch, self.s.bulkResult, err, null));
477 * The callback format for results
478 * @callback OrderedBulkOperation~resultCallback
479 * @param {MongoError} error An error instance representing the error during the execution.
480 * @param {BulkWriteResult} result The bulk write result.
484 * Execute the ordered bulk operation
487 * @param {object} [options=null] Optional settings.
488 * @param {(number|string)} [options.w=null] The write concern.
489 * @param {number} [options.wtimeout=null] The write concern timeout.
490 * @param {boolean} [options.j=false] Specify a journal write concern.
491 * @param {boolean} [options.fsync=false] Specify a file sync write concern.
492 * @param {OrderedBulkOperation~resultCallback} [callback] The result callback
493 * @throws {MongoError}
494 * @return {Promise} returns Promise if no callback passed
496 OrderedBulkOperation.prototype.execute = function(_writeConcern, callback) {
498 if(this.s.executed) throw new toError("batch cannot be re-executed");
499 if(typeof _writeConcern == 'function') {
500 callback = _writeConcern;
502 this.s.writeConcern = _writeConcern;
505 // If we have current batch
506 if(this.s.currentBatch) this.s.batches.push(this.s.currentBatch)
508 // If we have no operations in the bulk raise an error
509 if(this.s.batches.length == 0) {
510 throw toError("Invalid Operation, No operations in bulk");
513 // Execute using callback
514 if(typeof callback == 'function') {
515 return executeCommands(this, callback);
519 return new this.s.promiseLibrary(function(resolve, reject) {
520 executeCommands(self, function(err, r) {
521 if(err) return reject(err);
527 define.classMethod('execute', {callback: true, promise:false});
530 * Returns an unordered batch object
533 var initializeOrderedBulkOp = function(topology, collection, options) {
534 return new OrderedBulkOperation(topology, collection, options);
537 initializeOrderedBulkOp.OrderedBulkOperation = OrderedBulkOperation;
538 module.exports = initializeOrderedBulkOp;
539 module.exports.Bulk = OrderedBulkOperation;